1use std::time::Duration;
7
8use alloy::primitives::U256;
9use async_trait::async_trait;
10use espresso_api::{error::AvailabilityError, v1::HotShotAvailabilityApi};
11use espresso_types::{
12 NamespaceId, NamespaceProofQueryData, NsProof, SeqTypes,
13 v0::sparse_mt::KeccakNode,
14 v0_3::RewardAmount as InternalRewardAmount,
15 v0_4::{
16 RewardAccountProofV2 as InternalRewardAccountProofV2,
17 RewardAccountQueryDataV2 as InternalRewardAccountQueryData, RewardAccountV2,
18 RewardMerkleProofV2 as InternalRewardMerkleProofV2,
19 },
20 v0_6::RewardClaimError,
21};
22use futures::{StreamExt as _, join, stream::BoxStream};
23use hotshot_contract_adapter::reward::RewardClaimInput as InternalRewardClaimInput;
24use hotshot_new_protocol::message::Certificate2;
25use hotshot_query_service::{
26 Header as HsHeader,
27 availability::{
28 AvailabilityDataSource, BlockId as HsBlockId, BlockQueryData, BlockSummaryQueryData,
29 LeafId as HsLeafId, LeafQueryData, Limits as HsLimits, PayloadQueryData,
30 QueryablePayload as _, TransactionQueryData, TransactionWithProofQueryData,
31 VidCommonQueryData,
32 },
33 node::NodeDataSource as _,
34 types::HeightIndexed as _,
35};
36use hotshot_types::{
37 data::{EpochNumber, VidShare},
38 vid::avidm::AvidMShare,
39};
40use jf_merkle_tree_compat::prelude::{
41 MerkleNode as InternalMerkleNode, MerkleProof as InternalMerkleProof,
42};
43use serde_json;
44use serialization_api::v2::{
45 self, RewardAccountProofV2, RewardAccountQueryDataV2, RewardBalance, RewardBalances,
46 RewardClaimInput, RewardMerkleProofV2, RewardMerkleTreeV2Data, merkle_node,
47 reward_merkle_proof_v2::ProofType,
48};
49use tagged_base64::TaggedBase64;
50
51use super::{
52 RewardMerkleTreeDataSource, RewardMerkleTreeV2Data as InternalRewardTreeData,
53 data_source::{
54 RequestResponseDataSource as _, StakeTableDataSource, StateCertDataSource,
55 StateCertFetchingDataSource,
56 },
57};
58
59#[derive(Clone)]
63pub struct NodeApiStateImpl<D> {
64 data_source: D,
65}
66
67impl<D> NodeApiStateImpl<D> {
68 pub fn new(data_source: D) -> Self {
69 Self { data_source }
70 }
71
72 fn convert_reward_account_proof_v2(
74 &self,
75 proof: &InternalRewardAccountProofV2,
76 ) -> anyhow::Result<RewardAccountProofV2> {
77 Ok(RewardAccountProofV2 {
78 account: format!("{:#x}", proof.account),
79 proof: Some(self.convert_reward_merkle_proof_v2(&proof.proof)?),
80 })
81 }
82
83 fn convert_reward_merkle_proof_v2(
85 &self,
86 proof: &InternalRewardMerkleProofV2,
87 ) -> anyhow::Result<RewardMerkleProofV2> {
88 let proof_type = match proof {
89 InternalRewardMerkleProofV2::Presence(p) => {
90 ProofType::Presence(self.convert_merkle_proof(p)?)
91 },
92 InternalRewardMerkleProofV2::Absence(p) => {
93 ProofType::Absence(self.convert_merkle_proof(p)?)
94 },
95 };
96
97 Ok(RewardMerkleProofV2 {
98 proof_type: Some(proof_type),
99 })
100 }
101
102 fn convert_merkle_proof(
104 &self,
105 proof: &InternalMerkleProof<InternalRewardAmount, RewardAccountV2, KeccakNode, 2>,
106 ) -> anyhow::Result<v2::MerkleProof> {
107 let proof_nodes: Result<Vec<v2::MerkleNode>, _> = proof
108 .proof
109 .iter()
110 .map(|node| self.convert_merkle_node(node))
111 .collect();
112
113 Ok(v2::MerkleProof {
114 pos: TaggedBase64::new("FIELD", proof.pos.0.as_slice())
115 .map_err(|e| anyhow::anyhow!("failed to encode proof pos: {}", e))?
116 .to_string(),
117 proof: proof_nodes?,
118 })
119 }
120
121 fn convert_merkle_node(
123 &self,
124 node: &InternalMerkleNode<InternalRewardAmount, RewardAccountV2, KeccakNode>,
125 ) -> anyhow::Result<v2::MerkleNode> {
126 let node_type = match node {
127 InternalMerkleNode::Empty => merkle_node::NodeType::Empty(v2::Empty {
128 dummy: Some(v2::EmptyData {}),
129 }),
130 InternalMerkleNode::Leaf { pos, elem, value } => {
131 merkle_node::NodeType::Leaf(v2::Leaf {
132 pos: TaggedBase64::new("FIELD", pos.0.as_slice())
133 .map_err(|e| anyhow::anyhow!("failed to encode leaf pos: {}", e))?
134 .to_string(),
135 elem: TaggedBase64::new("FIELD", &elem.0.to_le_bytes::<32>())
136 .map_err(|e| anyhow::anyhow!("failed to encode leaf elem: {}", e))?
137 .to_string(),
138 value: TaggedBase64::new("FIELD", &value.0)
139 .map_err(|e| anyhow::anyhow!("failed to encode leaf value: {}", e))?
140 .to_string(),
141 })
142 },
143 InternalMerkleNode::Branch { value, children } => {
144 let proto_children: Result<Vec<v2::MerkleNode>, _> = children
145 .iter()
146 .map(|child| self.convert_merkle_node(child))
147 .collect();
148
149 merkle_node::NodeType::Branch(v2::Branch {
150 value: TaggedBase64::new("FIELD", &value.0)
151 .map_err(|e| anyhow::anyhow!("failed to encode branch value: {}", e))?
152 .to_string(),
153 children: proto_children?,
154 })
155 },
156 InternalMerkleNode::ForgettenSubtree { value } => {
157 merkle_node::NodeType::ForgottenSubtree(v2::ForgottenSubtree {
158 value: TaggedBase64::new("FIELD", &value.0)
159 .map_err(|e| {
160 anyhow::anyhow!("failed to encode forgotten subtree value: {}", e)
161 })?
162 .to_string(),
163 })
164 },
165 };
166
167 Ok(v2::MerkleNode {
168 node_type: Some(node_type),
169 })
170 }
171}
172
173impl<D> serialization_api::ApiSerializations for NodeApiStateImpl<D>
178where
179 D: std::ops::Deref + Send + Sync + 'static,
180 D::Target: RewardMerkleTreeDataSource + Send + Sync,
181{
182 type Address = alloy::primitives::Address;
184
185 type RewardClaimInput = InternalRewardClaimInput;
187 type RewardBalance = U256;
188 type RewardAccountQueryData = InternalRewardAccountQueryData;
189 type RewardBalances = (Vec<(RewardAccountV2, InternalRewardAmount)>, u64); type RewardMerkleTreeData = InternalRewardTreeData;
191
192 type NamespaceProof = espresso_types::NamespaceProofQueryData;
194 type IncorrectEncodingProof = espresso_types::v0_3::AvidMIncorrectEncodingNsProof;
195
196 type StateCertificate = espresso_types::StateCertQueryDataV2<espresso_types::SeqTypes>;
198 type StakeTable = Vec<hotshot_types::PeerConfig<espresso_types::SeqTypes>>;
199
200 type PeerConfig = hotshot_types::PeerConfig<espresso_types::SeqTypes>;
202 type LightClientCert = hotshot_types::simple_certificate::LightClientStateUpdateCertificateV2<
203 espresso_types::SeqTypes,
204 >;
205 type NsProof = espresso_types::NsProof;
206
207 fn deserialize_address(&self, s: &str) -> anyhow::Result<Self::Address> {
208 s.parse()
209 .map_err(|_| anyhow::anyhow!("invalid ethereum address: {}", s))
210 }
211
212 fn serialize_reward_claim_input(
214 &self,
215 address: &str,
216 value: &Self::RewardClaimInput,
217 ) -> anyhow::Result<RewardClaimInput> {
218 let auth_data = serde_json::to_string(&value.auth_data)
220 .map_err(|e| anyhow::anyhow!("failed to serialize auth_data: {}", e))?
221 .trim_matches('"')
223 .to_string();
224
225 Ok(RewardClaimInput {
226 address: address.to_string(),
227 lifetime_rewards: format!("{:#x}", value.lifetime_rewards), auth_data,
229 })
230 }
231
232 fn serialize_reward_balance(
233 &self,
234 value: &Self::RewardBalance,
235 ) -> anyhow::Result<RewardBalance> {
236 Ok(RewardBalance {
237 amount: value.to_string(), })
239 }
240
241 fn serialize_reward_account_query_data(
242 &self,
243 value: &Self::RewardAccountQueryData,
244 ) -> anyhow::Result<RewardAccountQueryDataV2> {
245 let balance = value.balance.to_string();
247
248 let proof = Some(self.convert_reward_account_proof_v2(&value.proof)?);
250
251 Ok(RewardAccountQueryDataV2 { balance, proof })
252 }
253
254 fn serialize_reward_balances(
255 &self,
256 value: &Self::RewardBalances,
257 ) -> anyhow::Result<RewardBalances> {
258 let (amounts_vec, total) = value;
259
260 let amounts = amounts_vec
262 .iter()
263 .map(|(account, amount)| serialization_api::v2::RewardAmount {
264 address: format!("{:#x}", account.0),
265 amount: amount.0.to_string(), })
267 .collect();
268
269 Ok(RewardBalances {
270 amounts,
271 total: *total,
272 })
273 }
274
275 fn serialize_reward_merkle_tree_data(
276 &self,
277 value: &Self::RewardMerkleTreeData,
278 ) -> anyhow::Result<RewardMerkleTreeV2Data> {
279 let bytes = bincode::serialize(value)
280 .map_err(|e| anyhow::anyhow!("failed to serialize RewardMerkleTreeV2Data: {}", e))?;
281 Ok(RewardMerkleTreeV2Data { data: bytes })
282 }
283
284 fn serialize_namespace_proof(
287 &self,
288 value: &Self::NamespaceProof,
289 ) -> anyhow::Result<v2::NamespaceProofResponse> {
290 let transactions: Vec<v2::Transaction> = value
292 .transactions
293 .iter()
294 .map(|tx| -> anyhow::Result<v2::Transaction> {
295 let mut payload_bytes = Vec::new();
296 base64_bytes::serialize(
297 &tx.payload,
298 &mut serde_json::Serializer::new(&mut payload_bytes),
299 )
300 .map_err(|e| anyhow::anyhow!("failed to serialize payload: {}", e))?;
301 let payload_str = String::from_utf8(payload_bytes)?
303 .trim_matches('"')
304 .to_string();
305
306 Ok(v2::Transaction {
307 namespace: tx.namespace.0,
308 payload: payload_str,
309 })
310 })
311 .collect::<anyhow::Result<Vec<_>>>()?;
312
313 let proof = value
314 .proof
315 .as_ref()
316 .map(|p| self.serialize_ns_proof(p))
317 .transpose()?;
318
319 Ok(serialization_api::v2::NamespaceProofResponse {
320 transactions,
321 proof,
322 })
323 }
324
325 fn serialize_incorrect_encoding_proof(
326 &self,
327 value: &Self::IncorrectEncodingProof,
328 ) -> anyhow::Result<v2::IncorrectEncodingProofResponse> {
329 let proof_data = serde_json::to_string(&value.0)?;
331 Ok(serialization_api::v2::IncorrectEncodingProofResponse {
332 proof: Some(v2::AvidMIncorrectEncodingNsProof { proof_data }),
333 })
334 }
335
336 fn serialize_state_certificate(
339 &self,
340 value: &Self::StateCertificate,
341 ) -> anyhow::Result<v2::StateCertificateResponse> {
342 let certificate = self.serialize_light_client_cert(&value.0)?;
343
344 Ok(serialization_api::v2::StateCertificateResponse {
345 certificate: Some(certificate),
346 })
347 }
348
349 fn serialize_stake_table(
350 &self,
351 value: &Self::StakeTable,
352 ) -> anyhow::Result<v2::StakeTableResponse> {
353 let peers: Result<Vec<_>, _> = value
354 .iter()
355 .map(|peer| self.serialize_peer_config(peer))
356 .collect();
357
358 Ok(serialization_api::v2::StakeTableResponse { peers: peers? })
359 }
360
361 fn serialize_peer_config(&self, peer: &Self::PeerConfig) -> anyhow::Result<v2::PeerConfig> {
362 let stake_table_entry = v2::StakeTableEntry {
363 stake_key: Some(v2::BlsPublicKey {
364 key: peer.stake_table_entry.stake_key.to_string(),
365 }),
366 stake_amount: peer.stake_table_entry.stake_amount.to_string(),
367 };
368
369 let state_ver_key = v2::SchnorrPublicKey {
370 key: peer.state_ver_key.to_string(),
371 };
372
373 let connect_info = peer.connect_info.as_ref().map(|info| {
374 let p2p_addr = match &info.p2p_addr {
375 hotshot_types::addr::NetAddr::Inet(ip, port) => v2::NetAddr {
376 addr_type: Some(v2::net_addr::AddrType::Inet(v2::InetAddr {
377 host: match ip {
378 std::net::IpAddr::V4(_) => ip.to_string(),
379 std::net::IpAddr::V6(_) => format!("[{ip}]"),
380 },
381 port: *port as u32,
382 })),
383 },
384 hotshot_types::addr::NetAddr::Name(name, port) => v2::NetAddr {
385 addr_type: Some(v2::net_addr::AddrType::Name(v2::NameAddr {
386 name: name.to_string(),
387 port: *port as u32,
388 })),
389 },
390 };
391
392 v2::PeerConnectInfo {
393 x25519_key: info.x25519_key.to_string(),
394 p2p_addr: Some(p2p_addr),
395 }
396 });
397
398 Ok(v2::PeerConfig {
399 stake_table_entry: Some(stake_table_entry),
400 state_ver_key: Some(state_ver_key),
401 connect_info,
402 })
403 }
404
405 fn serialize_light_client_cert(
406 &self,
407 cert: &Self::LightClientCert,
408 ) -> anyhow::Result<v2::LightClientStateUpdateCertificateV2> {
409 let signatures: Result<Vec<_>, anyhow::Error> = cert
410 .signatures
411 .iter()
412 .map(
413 |(key, lcv3_sig, lcv2_sig)| -> anyhow::Result<v2::StateSignatureTuple> {
414 Ok(v2::StateSignatureTuple {
415 state_signature_key: Some(v2::SchnorrPublicKey {
416 key: key.to_string(),
417 }),
418 lcv3_signature: lcv3_sig.to_string(),
419 lcv2_signature: lcv2_sig.to_string(),
420 })
421 },
422 )
423 .collect();
424
425 Ok(v2::LightClientStateUpdateCertificateV2 {
426 epoch: cert.epoch.u64(),
427 light_client_state: cert.light_client_state.to_string(),
428 next_stake_table_state: cert.next_stake_table_state.to_string(),
429 signatures: signatures?,
430 auth_root: cert.auth_root.to_string(),
431 })
432 }
433
434 fn serialize_ns_proof(&self, proof: &Self::NsProof) -> anyhow::Result<v2::NsProof> {
435 let proof_version = match proof {
436 NsProof::V0(advz_proof) => {
437 let json = serde_json::json!({
439 "ns_index": advz_proof.ns_index,
440 "ns_payload": advz_proof.ns_payload,
441 "ns_proof": advz_proof.ns_proof,
442 });
443 v2::ns_proof::ProofVersion::V0(serde_json::from_value(json)?)
444 },
445 NsProof::V1(avidm_proof) => {
446 let mut ns_payload_bytes = Vec::new();
448 base64_bytes::serialize(
449 &avidm_proof.0.ns_payload,
450 &mut serde_json::Serializer::new(&mut ns_payload_bytes),
451 )
452 .map_err(|e| anyhow::anyhow!("failed to serialize ns_payload: {}", e))?;
453 let ns_payload_str = String::from_utf8(ns_payload_bytes)?
454 .trim_matches('"')
455 .to_string();
456
457 v2::ns_proof::ProofVersion::V1(v2::AvidMNsProof {
458 ns_index: avidm_proof.0.ns_index as u64,
459 ns_payload: ns_payload_str,
460 ns_proof: avidm_proof.0.ns_proof.to_string(),
461 })
462 },
463 NsProof::V1IncorrectEncoding(incorrect_proof) => {
464 v2::ns_proof::ProofVersion::V1IncorrectEncoding(v2::AvidMIncorrectEncodingNsProof {
466 proof_data: serde_json::to_string(&incorrect_proof.0)?,
467 })
468 },
469 NsProof::V2(gf2_proof) => {
470 let mut ns_payload_bytes = Vec::new();
472 base64_bytes::serialize(
473 &gf2_proof.0.ns_payload,
474 &mut serde_json::Serializer::new(&mut ns_payload_bytes),
475 )
476 .map_err(|e| anyhow::anyhow!("failed to serialize ns_payload: {}", e))?;
477 let ns_payload_str = String::from_utf8(ns_payload_bytes)?
478 .trim_matches('"')
479 .to_string();
480
481 v2::ns_proof::ProofVersion::V2(v2::AvidmGf2NsProof {
482 ns_index: gf2_proof.0.ns_index as u64,
483 ns_payload: ns_payload_str,
484 ns_proof: gf2_proof.0.ns_proof.to_string(),
485 })
486 },
487 };
488
489 Ok(v2::NsProof {
490 proof_version: Some(proof_version),
491 })
492 }
493}
494
495#[async_trait]
500impl<D> espresso_api::v2::RewardApi for NodeApiStateImpl<D>
501where
502 D: std::ops::Deref + Clone + Send + Sync + 'static,
503 D::Target: RewardMerkleTreeDataSource + Send + Sync,
504{
505 async fn get_reward_claim_input(
506 &self,
507 address: Self::Address,
508 ) -> anyhow::Result<Self::RewardClaimInput> {
509 let proof = self
511 .data_source
512 .load_latest_reward_account_proof_v2(address.into())
513 .await
514 .map_err(|err| {
515 anyhow::anyhow!(
516 "failed to load latest reward account {:?}: {}",
517 address,
518 err
519 )
520 })?;
521
522 proof.to_reward_claim_input().map_err(|err| match err {
524 RewardClaimError::ZeroRewardError => {
525 anyhow::anyhow!("zero reward balance for {:?}", address)
526 },
527 RewardClaimError::ProofConversionError(e) => {
528 anyhow::anyhow!("failed to create solidity proof for {:?}: {}", address, e)
529 },
530 })
531 }
532
533 async fn get_reward_balance(
534 &self,
535 address: Self::Address,
536 ) -> anyhow::Result<Self::RewardBalance> {
537 let proof = self
539 .data_source
540 .load_latest_reward_account_proof_v2(address.into())
541 .await
542 .map_err(|err| {
543 anyhow::anyhow!(
544 "failed to load latest reward account {:?}: {}",
545 address,
546 err
547 )
548 })?;
549
550 Ok(proof.balance)
552 }
553
554 async fn get_reward_account_proof(
555 &self,
556 address: Self::Address,
557 ) -> anyhow::Result<Self::RewardAccountQueryData> {
558 self.data_source
560 .load_latest_reward_account_proof_v2(address.into())
561 .await
562 .map_err(|err| {
563 anyhow::anyhow!(
564 "failed to load latest reward account proof for {:?}: {}",
565 address,
566 err
567 )
568 })
569 }
570
571 async fn get_reward_balances(
572 &self,
573 height: u64,
574 offset: u64,
575 limit: u64,
576 ) -> anyhow::Result<Self::RewardBalances> {
577 if limit > 10000 {
579 return Err(anyhow::anyhow!(
580 "limit {} exceeds maximum allowed value of 10000",
581 limit
582 ));
583 }
584
585 let tree_bytes = self.data_source.load_tree(height).await.map_err(|err| {
587 anyhow::anyhow!("failed to load reward tree at height {}: {}", height, err)
588 })?;
589
590 let tree_data: InternalRewardTreeData =
592 bincode::deserialize(&tree_bytes).map_err(|err| {
593 anyhow::anyhow!(
594 "failed to deserialize RewardMerkleTreeV2Data at height {}: {}",
595 height,
596 err
597 )
598 })?;
599
600 let offset_usize = offset as usize;
601 let limit_usize = limit as usize;
602 let total = tree_data.balances.len() as u64;
603
604 if offset_usize > tree_data.balances.len() {
606 return Err(anyhow::anyhow!("offset {} out of bounds", offset));
607 }
608
609 let end = std::cmp::min(offset_usize + limit_usize, tree_data.balances.len());
610 let slice = &tree_data.balances[offset_usize..end];
611
612 let reversed: Vec<_> = slice.iter().rev().copied().collect();
614 Ok((reversed, total))
615 }
616
617 async fn get_reward_merkle_tree_v2(
618 &self,
619 height: u64,
620 ) -> anyhow::Result<Self::RewardMerkleTreeData> {
621 let tree_bytes = self.data_source.load_tree(height).await.map_err(|err| {
623 anyhow::anyhow!("failed to load reward tree at height {}: {}", height, err)
624 })?;
625
626 bincode::deserialize(&tree_bytes).map_err(|err| {
628 anyhow::anyhow!(
629 "failed to deserialize RewardMerkleTreeV2Data at height {}: {}",
630 height,
631 err
632 )
633 })
634 }
635}
636
637#[async_trait]
642impl<D> espresso_api::v1::RewardApi for NodeApiStateImpl<D>
643where
644 D: RewardMerkleTreeDataSource,
645{
646 type RewardClaimInput = InternalRewardClaimInput;
647 type RewardBalance = InternalRewardAmount;
648 type RewardAccountQueryData = InternalRewardAccountQueryData;
649 type RewardAmounts = Vec<(alloy::primitives::Address, InternalRewardAmount)>;
650 type RewardMerkleTreeData = Vec<u8>;
651
652 async fn get_reward_claim_input(
653 &self,
654 block_height: u64,
655 address: String,
656 ) -> anyhow::Result<Self::RewardClaimInput> {
657 let addr: alloy::primitives::Address = address
659 .parse()
660 .map_err(|_| anyhow::anyhow!("invalid ethereum address: {}", address))?;
661
662 let proof = self
664 .data_source
665 .load_reward_account_proof_v2(block_height, addr.into())
666 .await
667 .map_err(|err| {
668 anyhow::anyhow!(
669 "failed to load reward account {} at height {}: {}",
670 address,
671 block_height,
672 err
673 )
674 })?;
675
676 let claim_input = proof.to_reward_claim_input().map_err(|err| match err {
678 RewardClaimError::ZeroRewardError => {
679 anyhow::anyhow!(
680 "zero reward balance for {} at height {}",
681 address,
682 block_height
683 )
684 },
685 RewardClaimError::ProofConversionError(e) => {
686 anyhow::anyhow!(
687 "failed to create solidity proof for {} at height {}: {}",
688 address,
689 block_height,
690 e
691 )
692 },
693 })?;
694
695 Ok(claim_input)
696 }
697
698 async fn get_reward_balance(
699 &self,
700 height: u64,
701 address: String,
702 ) -> anyhow::Result<Self::RewardBalance> {
703 let addr: alloy::primitives::Address = address
705 .parse()
706 .map_err(|_| anyhow::anyhow!("invalid ethereum address: {}", address))?;
707
708 let proof = self
710 .data_source
711 .load_reward_account_proof_v2(height, addr.into())
712 .await
713 .map_err(|err| {
714 anyhow::anyhow!(
715 "failed to load reward account {} at height {}: {}",
716 address,
717 height,
718 err
719 )
720 })?;
721
722 Ok(InternalRewardAmount(proof.balance))
723 }
724
725 async fn get_latest_reward_balance(
726 &self,
727 address: String,
728 ) -> anyhow::Result<Self::RewardBalance> {
729 let addr: alloy::primitives::Address = address
730 .parse()
731 .map_err(|_| anyhow::anyhow!("invalid ethereum address: {}", address))?;
732
733 let proof = self
734 .data_source
735 .load_latest_reward_account_proof_v2(addr.into())
736 .await
737 .map_err(|err| {
738 anyhow::anyhow!("failed to load latest reward account {}: {}", address, err)
739 })?;
740
741 Ok(InternalRewardAmount(proof.balance))
742 }
743
744 async fn get_reward_account_proof(
745 &self,
746 height: u64,
747 address: String,
748 ) -> anyhow::Result<Self::RewardAccountQueryData> {
749 let addr: alloy::primitives::Address = address
751 .parse()
752 .map_err(|_| anyhow::anyhow!("invalid ethereum address: {}", address))?;
753
754 let proof = self
756 .data_source
757 .load_reward_account_proof_v2(height, addr.into())
758 .await
759 .map_err(|err| {
760 anyhow::anyhow!(
761 "failed to load reward account {} at height {}: {}",
762 address,
763 height,
764 err
765 )
766 })?;
767
768 Ok(proof)
769 }
770
771 async fn get_latest_reward_account_proof(
772 &self,
773 address: String,
774 ) -> anyhow::Result<Self::RewardAccountQueryData> {
775 let addr: alloy::primitives::Address = address
777 .parse()
778 .map_err(|_| anyhow::anyhow!("invalid ethereum address: {}", address))?;
779
780 let proof = self
782 .data_source
783 .load_latest_reward_account_proof_v2(addr.into())
784 .await
785 .map_err(|err| {
786 anyhow::anyhow!("failed to load latest reward account {}: {}", address, err)
787 })?;
788
789 Ok(proof)
790 }
791
792 async fn get_reward_amounts(
793 &self,
794 height: u64,
795 offset: u64,
796 limit: u64,
797 ) -> anyhow::Result<Self::RewardAmounts> {
798 if limit > 10000 {
800 return Err(anyhow::anyhow!(
801 "limit {} exceeds maximum allowed value of 10000",
802 limit
803 ));
804 }
805
806 let tree_bytes = self.data_source.load_tree(height).await.map_err(|err| {
808 anyhow::anyhow!("failed to load reward tree at height {}: {}", height, err)
809 })?;
810
811 let tree_data: InternalRewardTreeData =
813 bincode::deserialize(&tree_bytes).map_err(|err| {
814 anyhow::anyhow!(
815 "failed to deserialize RewardMerkleTreeV2Data at height {}: {}",
816 height,
817 err
818 )
819 })?;
820
821 let offset_usize = offset as usize;
822 let limit_usize = limit as usize;
823
824 if offset_usize > tree_data.balances.len() {
826 return Err(anyhow::anyhow!("offset {} out of bounds", offset));
827 }
828
829 let end = std::cmp::min(offset_usize + limit_usize, tree_data.balances.len());
830 let slice = &tree_data.balances[offset_usize..end];
831
832 let result: Vec<(alloy::primitives::Address, InternalRewardAmount)> = slice
833 .iter()
834 .rev()
835 .map(|(account, amount)| (account.0, *amount))
836 .collect();
837
838 Ok(result)
839 }
840
841 async fn get_reward_merkle_tree_v2(
842 &self,
843 height: u64,
844 ) -> anyhow::Result<Self::RewardMerkleTreeData> {
845 self.data_source.load_tree(height).await.map_err(|err| {
846 anyhow::anyhow!("failed to load reward tree at height {}: {}", height, err)
847 })
848 }
849}
850
851#[async_trait]
856impl<D> espresso_api::v2::DataApi for NodeApiStateImpl<D>
857where
858 D: std::ops::Deref + Clone + Send + Sync + 'static,
859 D::Target: RewardMerkleTreeDataSource
860 + hotshot_query_service::availability::AvailabilityDataSource<espresso_types::SeqTypes>
861 + hotshot_query_service::node::NodeDataSource<espresso_types::SeqTypes>
862 + super::data_source::RequestResponseDataSource<espresso_types::SeqTypes>
863 + Sync
864 + Send,
865{
866 async fn get_namespace_proof(
867 &self,
868 namespace_id: u64,
869 block_height: u64,
870 ) -> anyhow::Result<Self::NamespaceProof> {
871 let ns_id = NamespaceId(namespace_id);
872 let block_id = HsBlockId::Number(block_height as usize);
873
874 let ds = &*self.data_source;
876 let timeout = std::time::Duration::from_millis(500);
877 let (block_fetch, vid_fetch) = join!(ds.get_block(block_id), ds.get_vid_common(block_id));
878 let (block_opt, vid_opt) = join!(
879 block_fetch.with_timeout(timeout),
880 vid_fetch.with_timeout(timeout)
881 );
882
883 let block = block_opt.ok_or_else(|| anyhow::anyhow!("block {} not found", block_height))?;
884 let vid_common = vid_opt.ok_or_else(|| {
885 anyhow::anyhow!("VID common data for block {} not found", block_height)
886 })?;
887
888 let ns_table = block.payload().ns_table();
890 let ns_index = ns_table.find_ns_id(&ns_id).ok_or_else(|| {
891 anyhow::anyhow!(
892 "namespace {} not present in block {}",
893 namespace_id,
894 block_height
895 )
896 })?;
897
898 let proof =
899 NsProof::new(block.payload(), &ns_index, vid_common.common()).ok_or_else(|| {
900 anyhow::anyhow!(
901 "failed to generate namespace proof for block {}",
902 block_height
903 )
904 })?;
905
906 let transactions = proof.export_all_txs(&ns_id);
907
908 Ok(espresso_types::NamespaceProofQueryData {
909 transactions,
910 proof: Some(proof),
911 })
912 }
913
914 async fn get_namespace_proof_range(
915 &self,
916 namespace_id: u64,
917 from: u64,
918 until: u64,
919 ) -> anyhow::Result<Vec<Self::NamespaceProof>> {
920 let ns_id = NamespaceId(namespace_id);
921
922 if until <= from {
924 return Err(anyhow::anyhow!(
925 "invalid range: until ({}) must be greater than from ({})",
926 until,
927 from
928 ));
929 }
930
931 let range_size = until - from;
932 const MAX_RANGE: u64 = 100; if range_size > MAX_RANGE {
934 return Err(anyhow::anyhow!(
935 "range too large: {} blocks (max {})",
936 range_size,
937 MAX_RANGE
938 ));
939 }
940
941 let (blocks_stream, vids_stream) = join!(
943 self.data_source
944 .get_block_range(from as usize..until as usize),
945 self.data_source
946 .get_vid_common_range(from as usize..until as usize)
947 );
948
949 let blocks: Vec<_> = blocks_stream
950 .then(|block| async move { block.resolve().await })
951 .collect()
952 .await;
953 let vids: Vec<_> = vids_stream
954 .then(|vid| async move { vid.resolve().await })
955 .collect()
956 .await;
957
958 if blocks.len() != vids.len() {
959 return Err(anyhow::anyhow!(
960 "mismatch between blocks and VID common data"
961 ));
962 }
963
964 let mut proofs = Vec::new();
966
967 for (block, vid) in blocks.into_iter().zip(vids) {
968 let ns_table = block.payload().ns_table();
969
970 if let Some(ns_index) = ns_table.find_ns_id(&ns_id) {
972 if let Some(proof) = NsProof::new(block.payload(), &ns_index, vid.common()) {
973 let transactions = proof.export_all_txs(&ns_id);
974 proofs.push(espresso_types::NamespaceProofQueryData {
975 transactions,
976 proof: Some(proof),
977 });
978 } else {
979 proofs.push(espresso_types::NamespaceProofQueryData {
981 transactions: vec![],
982 proof: None,
983 });
984 }
985 } else {
986 proofs.push(espresso_types::NamespaceProofQueryData {
988 transactions: vec![],
989 proof: None,
990 });
991 }
992 }
993
994 Ok(proofs)
995 }
996
997 async fn get_incorrect_encoding_proof(
998 &self,
999 namespace_id: u64,
1000 block_height: u64,
1001 ) -> anyhow::Result<Self::IncorrectEncodingProof> {
1002 let ns_id = NamespaceId(namespace_id);
1003 let block_id = HsBlockId::Number(block_height as usize);
1004
1005 let ds = &*self.data_source;
1006 let timeout = std::time::Duration::from_millis(500);
1007 let (block_fetch, vid_fetch) = join!(ds.get_block(block_id), ds.get_vid_common(block_id));
1008 let (block, vid_common) = join!(
1009 block_fetch.with_timeout(timeout),
1010 vid_fetch.with_timeout(timeout)
1011 );
1012
1013 let block = block.ok_or_else(|| anyhow::anyhow!("block {} not found", block_height))?;
1014 let vid_common = vid_common.ok_or_else(|| {
1015 anyhow::anyhow!("VID common data for block {} not found", block_height)
1016 })?;
1017
1018 let ns_table = block.payload().ns_table();
1019 let ns_index = ns_table.find_ns_id(&ns_id).ok_or_else(|| {
1020 anyhow::anyhow!(
1021 "namespace {} not present in block {}",
1022 namespace_id,
1023 block_height
1024 )
1025 })?;
1026
1027 if NsProof::new(block.payload(), &ns_index, vid_common.common()).is_some() {
1028 return Err(anyhow::anyhow!(
1029 "block {} was correctly encoded",
1030 block_height
1031 ));
1032 }
1033
1034 let vid_shares_future = ds
1036 .request_vid_shares(block_height, vid_common.clone(), Duration::from_secs(40))
1037 .await;
1038 let mut vid_shares = vid_shares_future
1039 .await
1040 .map_err(|e| anyhow::anyhow!("failed to fetch VID shares: {e:#}"))?;
1041
1042 if let Ok(local_share) = ds.vid_share(block_height as usize).await {
1043 vid_shares.push(local_share);
1044 }
1045
1046 let avidm_shares: Vec<AvidMShare> = vid_shares
1047 .into_iter()
1048 .filter_map(|s| {
1049 if let VidShare::V1(s) = s {
1050 Some(s)
1051 } else {
1052 None
1053 }
1054 })
1055 .collect();
1056
1057 match NsProof::v1_1_new_with_incorrect_encoding(
1058 &avidm_shares,
1059 ns_table,
1060 &ns_index,
1061 &vid_common.payload_hash(),
1062 vid_common.common(),
1063 ) {
1064 Some(NsProof::V1IncorrectEncoding(proof)) => Ok(proof),
1065 _ => Err(anyhow::anyhow!(
1066 "failed to generate incorrect encoding proof"
1067 )),
1068 }
1069 }
1070}
1071
1072#[async_trait]
1077impl<D> espresso_api::v2::ConsensusApi for NodeApiStateImpl<D>
1078where
1079 D: std::ops::Deref + Clone + Send + Sync + 'static,
1080 D::Target: RewardMerkleTreeDataSource
1081 + super::data_source::StateCertDataSource
1082 + super::data_source::StateCertFetchingDataSource<espresso_types::SeqTypes>
1083 + super::data_source::StakeTableDataSource<espresso_types::SeqTypes>
1084 + Send
1085 + Sync,
1086{
1087 async fn get_state_certificate(&self, epoch: u64) -> anyhow::Result<Self::StateCertificate> {
1088 let ds = &*self.data_source;
1089
1090 let state_cert = ds.get_state_cert_by_epoch(epoch).await?;
1092
1093 let cert = match state_cert {
1094 Some(cert) => cert,
1095 None => {
1096 const TIMEOUT: Duration = Duration::from_secs(40);
1098 let cert = ds.request_state_cert(epoch, TIMEOUT).await.map_err(|e| {
1099 anyhow::anyhow!("failed to fetch state cert for epoch {}: {}", epoch, e)
1100 })?;
1101
1102 ds.insert_state_cert(epoch, cert.clone()).await?;
1104
1105 cert
1106 },
1107 };
1108
1109 Ok(espresso_types::StateCertQueryDataV2(cert))
1110 }
1111
1112 async fn get_stake_table(&self, epoch: u64) -> anyhow::Result<Self::StakeTable> {
1113 let ds = &*self.data_source;
1114 ds.get_stake_table(Some(EpochNumber::new(epoch))).await
1115 }
1116}
1117
1118#[async_trait]
1123impl<D> espresso_api::v1::AvailabilityApi for NodeApiStateImpl<D>
1124where
1125 D: std::ops::Deref + Clone + Send + Sync + 'static,
1126 D::Target: RewardMerkleTreeDataSource
1127 + hotshot_query_service::availability::AvailabilityDataSource<espresso_types::SeqTypes>
1128 + hotshot_query_service::node::NodeDataSource<espresso_types::SeqTypes>
1129 + super::data_source::RequestResponseDataSource<espresso_types::SeqTypes>
1130 + super::data_source::StateCertDataSource
1131 + super::data_source::StateCertFetchingDataSource<espresso_types::SeqTypes>
1132 + Send
1133 + Sync,
1134{
1135 type NamespaceProofQueryData = espresso_types::NamespaceProofQueryData;
1136 type IncorrectEncodingProof = espresso_types::v0_3::AvidMIncorrectEncodingNsProof;
1137 type StateCertQueryDataV1 = espresso_types::StateCertQueryDataV1<espresso_types::SeqTypes>;
1138 type StateCertQueryDataV2 = espresso_types::StateCertQueryDataV2<espresso_types::SeqTypes>;
1139
1140 async fn get_namespace_proof(
1141 &self,
1142 block_id: espresso_api::v1::availability::BlockId,
1143 namespace: u32,
1144 ) -> anyhow::Result<Option<Self::NamespaceProofQueryData>> {
1145 let ns_id = NamespaceId::from(namespace);
1146
1147 let hs_block_id = match block_id {
1149 espresso_api::v1::availability::BlockId::Height(h) => HsBlockId::Number(h as usize),
1150 espresso_api::v1::availability::BlockId::Hash(h) => {
1151 let hash = h
1152 .parse()
1153 .map_err(|_| anyhow::anyhow!("invalid block hash: {}", h))?;
1154 HsBlockId::Hash(hash)
1155 },
1156 espresso_api::v1::availability::BlockId::PayloadHash(h) => {
1157 let payload_hash = h
1158 .parse()
1159 .map_err(|_| anyhow::anyhow!("invalid payload hash: {}", h))?;
1160 HsBlockId::PayloadHash(payload_hash)
1161 },
1162 };
1163
1164 let ds = &*self.data_source;
1166 let timeout = std::time::Duration::from_millis(500);
1167 let (block_fetch, vid_fetch) =
1168 join!(ds.get_block(hs_block_id), ds.get_vid_common(hs_block_id));
1169 let (block, vid_common) = join!(
1170 block_fetch.with_timeout(timeout),
1171 vid_fetch.with_timeout(timeout)
1172 );
1173
1174 let Some(block) = block else {
1175 return Ok(None);
1176 };
1177 let Some(vid_common) = vid_common else {
1178 return Ok(None);
1179 };
1180
1181 let ns_table = block.payload().ns_table();
1183 let Some(ns_index) = ns_table.find_ns_id(&ns_id) else {
1184 return Ok(None);
1185 };
1186
1187 let Some(proof) = NsProof::new(block.payload(), &ns_index, vid_common.common()) else {
1189 return Ok(Some(espresso_types::NamespaceProofQueryData {
1191 transactions: vec![],
1192 proof: None,
1193 }));
1194 };
1195
1196 let transactions = proof.export_all_txs(&ns_id);
1197
1198 Ok(Some(espresso_types::NamespaceProofQueryData {
1199 transactions,
1200 proof: Some(proof),
1201 }))
1202 }
1203
1204 async fn get_namespace_proof_range(
1205 &self,
1206 from: u64,
1207 until: u64,
1208 namespace: u32,
1209 ) -> anyhow::Result<Vec<Self::NamespaceProofQueryData>> {
1210 let ns_id = NamespaceId::from(namespace);
1211
1212 if until <= from {
1214 return Err(bad_request(format!(
1215 "invalid range: until ({}) must be greater than from ({})",
1216 until, from
1217 )));
1218 }
1219
1220 let range_size = until - from;
1221 const MAX_RANGE: u64 = 100;
1222 if range_size > MAX_RANGE {
1223 return Err(range_exceeded(format!(
1224 "range too large: {} blocks (max {})",
1225 range_size, MAX_RANGE
1226 )));
1227 }
1228
1229 let (blocks_stream, vids_stream) = join!(
1231 self.data_source
1232 .get_block_range(from as usize..until as usize),
1233 self.data_source
1234 .get_vid_common_range(from as usize..until as usize)
1235 );
1236
1237 let blocks: Vec<_> = blocks_stream
1238 .then(|block| async move { block.resolve().await })
1239 .collect()
1240 .await;
1241 let vids: Vec<_> = vids_stream
1242 .then(|vid| async move { vid.resolve().await })
1243 .collect()
1244 .await;
1245
1246 if blocks.len() != vids.len() {
1247 return Err(anyhow::anyhow!(
1248 "mismatch between blocks and VID common data"
1249 ));
1250 }
1251
1252 let mut proofs = Vec::new();
1254
1255 for (block, vid) in blocks.into_iter().zip(vids) {
1256 let ns_table = block.payload().ns_table();
1257
1258 if let Some(ns_index) = ns_table.find_ns_id(&ns_id) {
1260 if let Some(proof) = NsProof::new(block.payload(), &ns_index, vid.common()) {
1261 let transactions = proof.export_all_txs(&ns_id);
1262 proofs.push(espresso_types::NamespaceProofQueryData {
1263 transactions,
1264 proof: Some(proof),
1265 });
1266 } else {
1267 proofs.push(espresso_types::NamespaceProofQueryData {
1269 transactions: vec![],
1270 proof: None,
1271 });
1272 }
1273 } else {
1274 proofs.push(espresso_types::NamespaceProofQueryData {
1276 transactions: vec![],
1277 proof: None,
1278 });
1279 }
1280 }
1281
1282 Ok(proofs)
1283 }
1284
1285 async fn stream_namespace_proofs(
1286 &self,
1287 from: usize,
1288 namespace: u32,
1289 ) -> anyhow::Result<BoxStream<'static, Self::NamespaceProofQueryData>> {
1290 let ns_id = NamespaceId::from(namespace);
1291 let ds = self.data_source.clone();
1292 let blocks = (*ds).subscribe_blocks(from).await;
1293 let vids = (*ds).subscribe_vid_common(from).await;
1294
1295 let stream = blocks
1296 .zip(vids)
1297 .map(move |(block, vid)| {
1298 let ns_table = block.payload().ns_table();
1299 if let Some(ns_index) = ns_table.find_ns_id(&ns_id) {
1300 if let Some(proof) = NsProof::new(block.payload(), &ns_index, vid.common()) {
1301 let transactions = proof.export_all_txs(&ns_id);
1302 NamespaceProofQueryData {
1303 transactions,
1304 proof: Some(proof),
1305 }
1306 } else {
1307 NamespaceProofQueryData {
1308 transactions: vec![],
1309 proof: None,
1310 }
1311 }
1312 } else {
1313 NamespaceProofQueryData {
1314 transactions: vec![],
1315 proof: None,
1316 }
1317 }
1318 })
1319 .boxed();
1320
1321 Ok(stream)
1322 }
1323
1324 async fn get_incorrect_encoding_proof(
1325 &self,
1326 block_id: espresso_api::v1::availability::BlockId,
1327 namespace: u32,
1328 ) -> anyhow::Result<Self::IncorrectEncodingProof> {
1329 let ns_id = NamespaceId::from(namespace);
1330
1331 let hs_block_id = match block_id {
1332 espresso_api::v1::availability::BlockId::Height(h) => HsBlockId::Number(h as usize),
1333 espresso_api::v1::availability::BlockId::Hash(h) => {
1334 let hash = h
1335 .parse()
1336 .map_err(|_| anyhow::anyhow!("invalid block hash: {}", h))?;
1337 HsBlockId::Hash(hash)
1338 },
1339 espresso_api::v1::availability::BlockId::PayloadHash(h) => {
1340 let payload_hash = h
1341 .parse()
1342 .map_err(|_| anyhow::anyhow!("invalid payload hash: {}", h))?;
1343 HsBlockId::PayloadHash(payload_hash)
1344 },
1345 };
1346
1347 let ds = &*self.data_source;
1348 let timeout = std::time::Duration::from_millis(500);
1349 let (block_fetch, vid_fetch) =
1350 join!(ds.get_block(hs_block_id), ds.get_vid_common(hs_block_id));
1351 let (block, vid_common) = join!(
1352 block_fetch.with_timeout(timeout),
1353 vid_fetch.with_timeout(timeout)
1354 );
1355
1356 let block = block.ok_or_else(|| anyhow::anyhow!("block not found"))?;
1357 let vid_common = vid_common.ok_or_else(|| anyhow::anyhow!("VID common data not found"))?;
1358
1359 let ns_table = block.payload().ns_table();
1360 let ns_index = ns_table
1361 .find_ns_id(&ns_id)
1362 .ok_or_else(|| anyhow::anyhow!("namespace {} not present in block", namespace))?;
1363
1364 if NsProof::new(block.payload(), &ns_index, vid_common.common()).is_some() {
1365 return Err(anyhow::anyhow!("block was correctly encoded"));
1366 }
1367
1368 let vid_shares_future = ds
1370 .request_vid_shares(block.height(), vid_common.clone(), Duration::from_secs(40))
1371 .await;
1372 let mut vid_shares = vid_shares_future
1373 .await
1374 .map_err(|e| anyhow::anyhow!("failed to fetch VID shares: {e:#}"))?;
1375
1376 if let Ok(local_share) = ds.vid_share(block.height() as usize).await {
1377 vid_shares.push(local_share);
1378 }
1379
1380 let avidm_shares: Vec<AvidMShare> = vid_shares
1381 .into_iter()
1382 .filter_map(|s| {
1383 if let VidShare::V1(s) = s {
1384 Some(s)
1385 } else {
1386 None
1387 }
1388 })
1389 .collect();
1390
1391 match NsProof::v1_1_new_with_incorrect_encoding(
1392 &avidm_shares,
1393 ns_table,
1394 &ns_index,
1395 &vid_common.payload_hash(),
1396 vid_common.common(),
1397 ) {
1398 Some(NsProof::V1IncorrectEncoding(proof)) => Ok(proof),
1399 _ => Err(anyhow::anyhow!(
1400 "failed to generate incorrect encoding proof"
1401 )),
1402 }
1403 }
1404
1405 async fn get_state_cert(&self, epoch: u64) -> anyhow::Result<Self::StateCertQueryDataV1> {
1406 let state_cert = self.data_source.get_state_cert_by_epoch(epoch).await?;
1408
1409 let cert = match state_cert {
1410 Some(cert) => cert,
1411 None => {
1412 const TIMEOUT: Duration = Duration::from_secs(40);
1414 let cert = self
1415 .data_source
1416 .request_state_cert(epoch, TIMEOUT)
1417 .await
1418 .map_err(|e| {
1419 anyhow::anyhow!("failed to fetch state cert for epoch {}: {}", epoch, e)
1420 })?;
1421
1422 self.data_source
1424 .insert_state_cert(epoch, cert.clone())
1425 .await?;
1426
1427 cert
1428 },
1429 };
1430
1431 Ok(espresso_types::StateCertQueryDataV1::from(
1432 espresso_types::StateCertQueryDataV2(cert),
1433 ))
1434 }
1435
1436 async fn get_state_cert_v2(&self, epoch: u64) -> anyhow::Result<Self::StateCertQueryDataV2> {
1437 let state_cert = self.data_source.get_state_cert_by_epoch(epoch).await?;
1439
1440 let cert = match state_cert {
1441 Some(cert) => cert,
1442 None => {
1443 const TIMEOUT: Duration = Duration::from_secs(40);
1445 let cert = self
1446 .data_source
1447 .request_state_cert(epoch, TIMEOUT)
1448 .await
1449 .map_err(|e| {
1450 anyhow::anyhow!("failed to fetch state cert for epoch {}: {}", epoch, e)
1451 })?;
1452
1453 self.data_source
1455 .insert_state_cert(epoch, cert.clone())
1456 .await?;
1457
1458 cert
1459 },
1460 };
1461
1462 Ok(espresso_types::StateCertQueryDataV2(cert))
1463 }
1464}
1465
1466fn not_found(msg: impl Into<String>) -> anyhow::Error {
1471 AvailabilityError::NotFound(msg.into()).into()
1472}
1473
1474fn bad_request(msg: impl Into<String>) -> anyhow::Error {
1475 AvailabilityError::BadRequest(msg.into()).into()
1476}
1477
1478fn range_exceeded(msg: impl Into<String>) -> anyhow::Error {
1479 AvailabilityError::RangeExceeded(msg.into()).into()
1480}
1481
1482fn enforce_range(from: usize, until: usize, limit: usize) -> anyhow::Result<()> {
1483 if until.saturating_sub(from) > limit {
1484 return Err(range_exceeded(format!(
1485 "range {from}..{until} exceeds limit {limit}"
1486 )));
1487 }
1488 Ok(())
1489}
1490
1491#[async_trait]
1492impl<D> HotShotAvailabilityApi for NodeApiStateImpl<D>
1493where
1494 D: std::ops::Deref + Clone + Send + Sync + 'static,
1495 D::Target: AvailabilityDataSource<espresso_types::SeqTypes> + Send + Sync,
1496{
1497 type Leaf = LeafQueryData<espresso_types::SeqTypes>;
1498 type Block = BlockQueryData<espresso_types::SeqTypes>;
1499 type Header = HsHeader<espresso_types::SeqTypes>;
1500 type Payload = PayloadQueryData<espresso_types::SeqTypes>;
1501 type VidCommon = VidCommonQueryData<espresso_types::SeqTypes>;
1502 type Transaction = TransactionQueryData<espresso_types::SeqTypes>;
1503 type TransactionWithProof = TransactionWithProofQueryData<espresso_types::SeqTypes>;
1504 type BlockSummary = BlockSummaryQueryData<espresso_types::SeqTypes>;
1505 type Limits = HsLimits;
1506 type Cert2 = Certificate2<espresso_types::SeqTypes>;
1507
1508 async fn get_leaf(
1509 &self,
1510 id: espresso_api::v1::availability::LeafId,
1511 ) -> anyhow::Result<Self::Leaf> {
1512 let hs_id = match id {
1513 espresso_api::v1::availability::LeafId::Height(h) => HsLeafId::Number(h as usize),
1514 espresso_api::v1::availability::LeafId::Hash(h) => {
1515 HsLeafId::Hash(h.parse().map_err(|_| bad_request("invalid leaf hash"))?)
1516 },
1517 };
1518 let ds = &*self.data_source;
1519 ds.get_leaf(hs_id)
1520 .await
1521 .with_timeout(Duration::from_millis(500))
1522 .await
1523 .ok_or_else(|| not_found("leaf not found"))
1524 }
1525
1526 async fn get_leaf_range(&self, from: usize, until: usize) -> anyhow::Result<Vec<Self::Leaf>> {
1527 enforce_range(from, until, 500)?;
1528 let timeout = Duration::from_millis(500);
1529 let ds = &*self.data_source;
1530 let stream = ds.get_leaf_range(from..until).await;
1531 let mut results = Vec::new();
1532 futures::pin_mut!(stream);
1533 let mut i = from;
1534 while let Some(fetch) = stream.next().await {
1535 let item = fetch
1536 .with_timeout(timeout)
1537 .await
1538 .ok_or_else(|| not_found(format!("leaf {} not found", i)))?;
1539 results.push(item);
1540 i += 1;
1541 }
1542 Ok(results)
1543 }
1544
1545 async fn get_header(
1546 &self,
1547 id: espresso_api::v1::availability::BlockId,
1548 ) -> anyhow::Result<Self::Header> {
1549 let hs_id = block_id_to_hs(id)?;
1550 let ds = &*self.data_source;
1551 ds.get_header(hs_id)
1552 .await
1553 .with_timeout(Duration::from_millis(500))
1554 .await
1555 .ok_or_else(|| not_found(format!("header not found for {}", hs_id)))
1556 }
1557
1558 async fn get_header_range(
1559 &self,
1560 from: usize,
1561 until: usize,
1562 ) -> anyhow::Result<Vec<Self::Header>> {
1563 enforce_range(from, until, 100)?;
1564 let timeout = Duration::from_millis(500);
1565 let ds = &*self.data_source;
1566 let stream = ds.get_header_range(from..until).await;
1567 let mut results = Vec::new();
1568 futures::pin_mut!(stream);
1569 let mut i = from;
1570 while let Some(fetch) = stream.next().await {
1571 let item = fetch
1572 .with_timeout(timeout)
1573 .await
1574 .ok_or_else(|| not_found(format!("header {} not found", i)))?;
1575 results.push(item);
1576 i += 1;
1577 }
1578 Ok(results)
1579 }
1580
1581 async fn get_block(
1582 &self,
1583 id: espresso_api::v1::availability::BlockId,
1584 ) -> anyhow::Result<Self::Block> {
1585 let hs_id = block_id_to_hs(id)?;
1586 let ds = &*self.data_source;
1587 ds.get_block(hs_id)
1588 .await
1589 .with_timeout(Duration::from_millis(500))
1590 .await
1591 .ok_or_else(|| not_found(format!("block not found for {}", hs_id)))
1592 }
1593
1594 async fn get_block_range(&self, from: usize, until: usize) -> anyhow::Result<Vec<Self::Block>> {
1595 enforce_range(from, until, 100)?;
1596 let timeout = Duration::from_millis(500);
1597 let ds = &*self.data_source;
1598 let stream = ds.get_block_range(from..until).await;
1599 let mut results = Vec::new();
1600 futures::pin_mut!(stream);
1601 let mut i = from;
1602 while let Some(fetch) = stream.next().await {
1603 let item = fetch
1604 .with_timeout(timeout)
1605 .await
1606 .ok_or_else(|| not_found(format!("block {} not found", i)))?;
1607 results.push(item);
1608 i += 1;
1609 }
1610 Ok(results)
1611 }
1612
1613 async fn get_payload(
1614 &self,
1615 id: espresso_api::v1::availability::PayloadId,
1616 ) -> anyhow::Result<Self::Payload> {
1617 let hs_id = payload_id_to_hs(id)?;
1618 let ds = &*self.data_source;
1619 ds.get_payload(hs_id)
1620 .await
1621 .with_timeout(Duration::from_millis(500))
1622 .await
1623 .ok_or_else(|| not_found(format!("payload not found for {}", hs_id)))
1624 }
1625
1626 async fn get_payload_range(
1627 &self,
1628 from: usize,
1629 until: usize,
1630 ) -> anyhow::Result<Vec<Self::Payload>> {
1631 enforce_range(from, until, 100)?;
1632 let timeout = Duration::from_millis(500);
1633 let ds = &*self.data_source;
1634 let stream = ds.get_payload_range(from..until).await;
1635 let mut results = Vec::new();
1636 futures::pin_mut!(stream);
1637 let mut i = from;
1638 while let Some(fetch) = stream.next().await {
1639 let item = fetch
1640 .with_timeout(timeout)
1641 .await
1642 .ok_or_else(|| not_found(format!("payload {} not found", i)))?;
1643 results.push(item);
1644 i += 1;
1645 }
1646 Ok(results)
1647 }
1648
1649 async fn get_vid_common(
1650 &self,
1651 id: espresso_api::v1::availability::BlockId,
1652 ) -> anyhow::Result<Self::VidCommon> {
1653 let hs_id = block_id_to_hs(id)?;
1654 let ds = &*self.data_source;
1655 ds.get_vid_common(hs_id)
1656 .await
1657 .with_timeout(Duration::from_millis(500))
1658 .await
1659 .ok_or_else(|| not_found(format!("VID common not found for {}", hs_id)))
1660 }
1661
1662 async fn get_vid_common_range(
1663 &self,
1664 from: usize,
1665 until: usize,
1666 ) -> anyhow::Result<Vec<Self::VidCommon>> {
1667 enforce_range(from, until, 500)?;
1668 let timeout = Duration::from_millis(500);
1669 let ds = &*self.data_source;
1670 let stream = ds.get_vid_common_range(from..until).await;
1671 let mut results = Vec::new();
1672 futures::pin_mut!(stream);
1673 let mut i = from;
1674 while let Some(fetch) = stream.next().await {
1675 let item = fetch
1676 .with_timeout(timeout)
1677 .await
1678 .ok_or_else(|| not_found(format!("VID common {} not found", i)))?;
1679 results.push(item);
1680 i += 1;
1681 }
1682 Ok(results)
1683 }
1684
1685 async fn get_transaction_by_position(
1686 &self,
1687 height: u64,
1688 index: u64,
1689 ) -> anyhow::Result<Self::Transaction> {
1690 let ds = &*self.data_source;
1691 let block = ds
1692 .get_block(HsBlockId::Number(height as usize))
1693 .await
1694 .with_timeout(Duration::from_millis(500))
1695 .await
1696 .ok_or_else(|| not_found(format!("block {} not found", height)))?;
1697
1698 let idx = block
1699 .payload()
1700 .nth(block.metadata(), index as usize)
1701 .ok_or_else(|| {
1702 not_found(format!(
1703 "transaction index {} out of bounds in block {}",
1704 index, height
1705 ))
1706 })?;
1707 let tx = block
1708 .transaction(&idx)
1709 .ok_or_else(|| not_found(format!("transaction not found at index {}", index)))?;
1710 TransactionQueryData::new(tx, &block, &idx, index)
1711 .ok_or_else(|| anyhow::anyhow!("failed to build transaction query data"))
1712 }
1713
1714 async fn get_transaction_by_hash(&self, hash: String) -> anyhow::Result<Self::Transaction> {
1715 let ds = &*self.data_source;
1716 let tx_hash: hotshot_query_service::availability::TransactionHash<
1717 espresso_types::SeqTypes,
1718 > = hash
1719 .parse()
1720 .map_err(|_| bad_request(format!("invalid transaction hash: {}", hash)))?;
1721 let bwt = ds
1722 .get_block_containing_transaction(tx_hash)
1723 .await
1724 .with_timeout(Duration::from_millis(500))
1725 .await
1726 .ok_or_else(|| not_found("transaction not found"))?;
1727 Ok(bwt.transaction)
1728 }
1729
1730 async fn get_transaction_proof_by_position(
1731 &self,
1732 height: u64,
1733 index: u64,
1734 ) -> anyhow::Result<Self::TransactionWithProof> {
1735 let ds = &*self.data_source;
1736 let timeout = Duration::from_millis(500);
1737
1738 let (block_fetch, vid_fetch) = futures::join!(
1739 ds.get_block(HsBlockId::Number(height as usize)),
1740 ds.get_vid_common(HsBlockId::Number(height as usize))
1741 );
1742 let (block, vid) = futures::join!(
1743 block_fetch.with_timeout(timeout),
1744 vid_fetch.with_timeout(timeout)
1745 );
1746
1747 let block = block.ok_or_else(|| not_found(format!("block {} not found", height)))?;
1748 let vid =
1749 vid.ok_or_else(|| not_found(format!("VID common not found for block {}", height)))?;
1750
1751 let idx = block
1752 .payload()
1753 .nth(block.metadata(), index as usize)
1754 .ok_or_else(|| {
1755 not_found(format!(
1756 "transaction index {} out of bounds in block {}",
1757 index, height
1758 ))
1759 })?;
1760 let tx = block
1761 .transaction(&idx)
1762 .ok_or_else(|| not_found(format!("transaction not found at index {}", index)))?;
1763 let tx_data = TransactionQueryData::new(tx, &block, &idx, index)
1764 .ok_or_else(|| anyhow::anyhow!("failed to build transaction query data"))?;
1765 let proof = block
1766 .transaction_proof(&vid, &idx)
1767 .ok_or_else(|| anyhow::anyhow!("failed to build transaction proof"))?;
1768 Ok(TransactionWithProofQueryData::new(tx_data, proof))
1769 }
1770
1771 async fn get_transaction_proof_by_hash(
1772 &self,
1773 hash: String,
1774 ) -> anyhow::Result<Self::TransactionWithProof> {
1775 let ds = &*self.data_source;
1776 let timeout = Duration::from_millis(500);
1777
1778 let tx_hash: hotshot_query_service::availability::TransactionHash<
1779 espresso_types::SeqTypes,
1780 > = hash
1781 .parse()
1782 .map_err(|_| bad_request(format!("invalid transaction hash: {}", hash)))?;
1783 let bwt = ds
1784 .get_block_containing_transaction(tx_hash)
1785 .await
1786 .with_timeout(timeout)
1787 .await
1788 .ok_or_else(|| not_found("transaction not found"))?;
1789
1790 let vid = ds
1791 .get_vid_common(HsBlockId::Number(bwt.block.height() as usize))
1792 .await
1793 .with_timeout(timeout)
1794 .await
1795 .ok_or_else(|| {
1796 not_found(format!(
1797 "VID common not found for block {}",
1798 bwt.block.height()
1799 ))
1800 })?;
1801
1802 let proof = bwt
1803 .block
1804 .transaction_proof(&vid, &bwt.index)
1805 .ok_or_else(|| anyhow::anyhow!("failed to build transaction proof"))?;
1806 Ok(TransactionWithProofQueryData::new(bwt.transaction, proof))
1807 }
1808
1809 async fn get_block_summary(&self, height: usize) -> anyhow::Result<Self::BlockSummary> {
1810 let ds = &*self.data_source;
1811 let block = ds
1812 .get_block(HsBlockId::Number(height))
1813 .await
1814 .with_timeout(Duration::from_millis(500))
1815 .await
1816 .ok_or_else(|| not_found(format!("block {} not found", height)))?;
1817 Ok(BlockSummaryQueryData::from(block))
1818 }
1819
1820 async fn get_block_summary_range(
1821 &self,
1822 from: usize,
1823 until: usize,
1824 ) -> anyhow::Result<Vec<Self::BlockSummary>> {
1825 enforce_range(from, until, 100)?;
1826 let timeout = Duration::from_millis(500);
1827 let ds = &*self.data_source;
1828 let stream = ds.get_block_range(from..until).await;
1829 let mut results = Vec::new();
1830 futures::pin_mut!(stream);
1831 let mut i = from;
1832 while let Some(fetch) = stream.next().await {
1833 let block = fetch
1834 .with_timeout(timeout)
1835 .await
1836 .ok_or_else(|| not_found(format!("block {} not found", i)))?;
1837 results.push(BlockSummaryQueryData::from(block));
1838 i += 1;
1839 }
1840 Ok(results)
1841 }
1842
1843 async fn get_limits(&self) -> anyhow::Result<Self::Limits> {
1844 Ok(HsLimits {
1845 small_object_range_limit: 500,
1846 large_object_range_limit: 100,
1847 })
1848 }
1849
1850 async fn get_cert2(&self, height: u64) -> anyhow::Result<Option<Self::Cert2>> {
1851 self.data_source
1852 .get_cert2(height)
1853 .await
1854 .map_err(|e| anyhow::anyhow!("{}", e))
1855 }
1856
1857 async fn stream_leaves(&self, from: usize) -> anyhow::Result<BoxStream<'static, Self::Leaf>> {
1858 let ds = self.data_source.clone();
1859 Ok((*ds).subscribe_leaves(from).await.boxed())
1860 }
1861
1862 async fn stream_headers(
1863 &self,
1864 from: usize,
1865 ) -> anyhow::Result<BoxStream<'static, Self::Header>> {
1866 let ds = self.data_source.clone();
1867 Ok((*ds).subscribe_headers(from).await.boxed())
1868 }
1869
1870 async fn stream_blocks(&self, from: usize) -> anyhow::Result<BoxStream<'static, Self::Block>> {
1871 let ds = self.data_source.clone();
1872 Ok((*ds).subscribe_blocks(from).await.boxed())
1873 }
1874
1875 async fn stream_payloads(
1876 &self,
1877 from: usize,
1878 ) -> anyhow::Result<BoxStream<'static, Self::Payload>> {
1879 let ds = self.data_source.clone();
1880 Ok((*ds).subscribe_payloads(from).await.boxed())
1881 }
1882
1883 async fn stream_vid_common(
1884 &self,
1885 from: usize,
1886 ) -> anyhow::Result<BoxStream<'static, Self::VidCommon>> {
1887 let ds = self.data_source.clone();
1888 Ok((*ds).subscribe_vid_common(from).await.boxed())
1889 }
1890
1891 async fn stream_transactions(
1892 &self,
1893 from: usize,
1894 namespace: Option<u32>,
1895 ) -> anyhow::Result<BoxStream<'static, Self::Transaction>> {
1896 let ds = self.data_source.clone();
1897 let stream = (*ds)
1898 .subscribe_blocks(from)
1899 .await
1900 .flat_map(move |block| {
1901 let ns_filter = namespace.map(NamespaceId::from);
1902 let txs: Vec<Self::Transaction> = block
1903 .enumerate()
1904 .enumerate()
1905 .filter_map(|(position_in_block, (tx_index, _tx))| {
1906 let tx = block.transaction(&tx_index)?;
1907 if let Some(ns) = ns_filter
1908 && tx.namespace() != ns
1909 {
1910 return None;
1911 }
1912 TransactionQueryData::new(tx, &block, &tx_index, position_in_block as u64)
1913 })
1914 .collect();
1915 futures::stream::iter(txs)
1916 })
1917 .boxed();
1918 Ok(stream)
1919 }
1920}
1921
1922fn block_id_to_hs(
1923 id: espresso_api::v1::availability::BlockId,
1924) -> anyhow::Result<HsBlockId<SeqTypes>> {
1925 match id {
1926 espresso_api::v1::availability::BlockId::Height(h) => Ok(HsBlockId::Number(h as usize)),
1927 espresso_api::v1::availability::BlockId::Hash(h) => {
1928 let hash = h
1929 .parse()
1930 .map_err(|_| bad_request(format!("invalid block hash: {}", h)))?;
1931 Ok(HsBlockId::Hash(hash))
1932 },
1933 espresso_api::v1::availability::BlockId::PayloadHash(h) => {
1934 let payload_hash = h
1935 .parse()
1936 .map_err(|_| bad_request(format!("invalid payload hash: {}", h)))?;
1937 Ok(HsBlockId::PayloadHash(payload_hash))
1938 },
1939 }
1940}
1941
1942fn payload_id_to_hs(
1943 id: espresso_api::v1::availability::PayloadId,
1944) -> anyhow::Result<HsBlockId<SeqTypes>> {
1945 match id {
1946 espresso_api::v1::availability::PayloadId::Height(h) => Ok(HsBlockId::Number(h as usize)),
1947 espresso_api::v1::availability::PayloadId::Hash(h) => {
1948 let payload_hash = h
1949 .parse()
1950 .map_err(|_| bad_request(format!("invalid payload hash: {}", h)))?;
1951 Ok(HsBlockId::PayloadHash(payload_hash))
1952 },
1953 espresso_api::v1::availability::PayloadId::BlockHash(h) => {
1954 let hash = h
1955 .parse()
1956 .map_err(|_| bad_request(format!("invalid block hash: {}", h)))?;
1957 Ok(HsBlockId::Hash(hash))
1958 },
1959 }
1960}