Skip to main content

espresso_node/api/
state.rs

1//! RewardApi trait implementations for espresso-node
2//!
3//! This module provides implementations for both v1::RewardApi (internal types)
4//! and v2::RewardApi (proto types), backed by the same data source.
5
6use std::time::Duration;
7
8use alloy::primitives::U256;
9use async_trait::async_trait;
10use espresso_api::{error::AvailabilityError, v1::HotShotAvailabilityApi};
11use espresso_types::{
12    NamespaceId, NamespaceProofQueryData, NsProof, SeqTypes,
13    v0::sparse_mt::KeccakNode,
14    v0_3::RewardAmount as InternalRewardAmount,
15    v0_4::{
16        RewardAccountProofV2 as InternalRewardAccountProofV2,
17        RewardAccountQueryDataV2 as InternalRewardAccountQueryData, RewardAccountV2,
18        RewardMerkleProofV2 as InternalRewardMerkleProofV2,
19    },
20    v0_6::RewardClaimError,
21};
22use futures::{StreamExt as _, join, stream::BoxStream};
23use hotshot_contract_adapter::reward::RewardClaimInput as InternalRewardClaimInput;
24use hotshot_new_protocol::message::Certificate2;
25use hotshot_query_service::{
26    Header as HsHeader,
27    availability::{
28        AvailabilityDataSource, BlockId as HsBlockId, BlockQueryData, BlockSummaryQueryData,
29        LeafId as HsLeafId, LeafQueryData, Limits as HsLimits, PayloadQueryData,
30        QueryablePayload as _, TransactionQueryData, TransactionWithProofQueryData,
31        VidCommonQueryData,
32    },
33    node::NodeDataSource as _,
34    types::HeightIndexed as _,
35};
36use hotshot_types::{
37    data::{EpochNumber, VidShare},
38    vid::avidm::AvidMShare,
39};
40use jf_merkle_tree_compat::prelude::{
41    MerkleNode as InternalMerkleNode, MerkleProof as InternalMerkleProof,
42};
43use serde_json;
44use serialization_api::v2::{
45    self, RewardAccountProofV2, RewardAccountQueryDataV2, RewardBalance, RewardBalances,
46    RewardClaimInput, RewardMerkleProofV2, RewardMerkleTreeV2Data, merkle_node,
47    reward_merkle_proof_v2::ProofType,
48};
49use tagged_base64::TaggedBase64;
50
51use super::{
52    RewardMerkleTreeDataSource, RewardMerkleTreeV2Data as InternalRewardTreeData,
53    data_source::{
54        RequestResponseDataSource as _, StakeTableDataSource, StateCertDataSource,
55        StateCertFetchingDataSource,
56    },
57};
58
59/// Node API state implementation
60///
61/// This struct implements both v1::RewardApi (internal types) and v2::RewardApi (proto types).
62#[derive(Clone)]
63pub struct NodeApiStateImpl<D> {
64    data_source: D,
65}
66
67impl<D> NodeApiStateImpl<D> {
68    pub fn new(data_source: D) -> Self {
69        Self { data_source }
70    }
71
72    /// Convert RewardAccountProofV2 to proto
73    fn convert_reward_account_proof_v2(
74        &self,
75        proof: &InternalRewardAccountProofV2,
76    ) -> anyhow::Result<RewardAccountProofV2> {
77        Ok(RewardAccountProofV2 {
78            account: format!("{:#x}", proof.account),
79            proof: Some(self.convert_reward_merkle_proof_v2(&proof.proof)?),
80        })
81    }
82
83    /// Convert RewardMerkleProofV2 enum to proto
84    fn convert_reward_merkle_proof_v2(
85        &self,
86        proof: &InternalRewardMerkleProofV2,
87    ) -> anyhow::Result<RewardMerkleProofV2> {
88        let proof_type = match proof {
89            InternalRewardMerkleProofV2::Presence(p) => {
90                ProofType::Presence(self.convert_merkle_proof(p)?)
91            },
92            InternalRewardMerkleProofV2::Absence(p) => {
93                ProofType::Absence(self.convert_merkle_proof(p)?)
94            },
95        };
96
97        Ok(RewardMerkleProofV2 {
98            proof_type: Some(proof_type),
99        })
100    }
101
102    /// Convert MerkleProof to proto
103    fn convert_merkle_proof(
104        &self,
105        proof: &InternalMerkleProof<InternalRewardAmount, RewardAccountV2, KeccakNode, 2>,
106    ) -> anyhow::Result<v2::MerkleProof> {
107        let proof_nodes: Result<Vec<v2::MerkleNode>, _> = proof
108            .proof
109            .iter()
110            .map(|node| self.convert_merkle_node(node))
111            .collect();
112
113        Ok(v2::MerkleProof {
114            pos: TaggedBase64::new("FIELD", proof.pos.0.as_slice())
115                .map_err(|e| anyhow::anyhow!("failed to encode proof pos: {}", e))?
116                .to_string(),
117            proof: proof_nodes?,
118        })
119    }
120
121    /// Convert MerkleNode to proto (recursive)
122    fn convert_merkle_node(
123        &self,
124        node: &InternalMerkleNode<InternalRewardAmount, RewardAccountV2, KeccakNode>,
125    ) -> anyhow::Result<v2::MerkleNode> {
126        let node_type = match node {
127            InternalMerkleNode::Empty => merkle_node::NodeType::Empty(v2::Empty {
128                dummy: Some(v2::EmptyData {}),
129            }),
130            InternalMerkleNode::Leaf { pos, elem, value } => {
131                merkle_node::NodeType::Leaf(v2::Leaf {
132                    pos: TaggedBase64::new("FIELD", pos.0.as_slice())
133                        .map_err(|e| anyhow::anyhow!("failed to encode leaf pos: {}", e))?
134                        .to_string(),
135                    elem: TaggedBase64::new("FIELD", &elem.0.to_le_bytes::<32>())
136                        .map_err(|e| anyhow::anyhow!("failed to encode leaf elem: {}", e))?
137                        .to_string(),
138                    value: TaggedBase64::new("FIELD", &value.0)
139                        .map_err(|e| anyhow::anyhow!("failed to encode leaf value: {}", e))?
140                        .to_string(),
141                })
142            },
143            InternalMerkleNode::Branch { value, children } => {
144                let proto_children: Result<Vec<v2::MerkleNode>, _> = children
145                    .iter()
146                    .map(|child| self.convert_merkle_node(child))
147                    .collect();
148
149                merkle_node::NodeType::Branch(v2::Branch {
150                    value: TaggedBase64::new("FIELD", &value.0)
151                        .map_err(|e| anyhow::anyhow!("failed to encode branch value: {}", e))?
152                        .to_string(),
153                    children: proto_children?,
154                })
155            },
156            InternalMerkleNode::ForgettenSubtree { value } => {
157                merkle_node::NodeType::ForgottenSubtree(v2::ForgottenSubtree {
158                    value: TaggedBase64::new("FIELD", &value.0)
159                        .map_err(|e| {
160                            anyhow::anyhow!("failed to encode forgotten subtree value: {}", e)
161                        })?
162                        .to_string(),
163                })
164            },
165        };
166
167        Ok(v2::MerkleNode {
168            node_type: Some(node_type),
169        })
170    }
171}
172
173// ============================================================================
174// ApiSerializations implementation (conversion layer)
175// ============================================================================
176
177impl<D> serialization_api::ApiSerializations for NodeApiStateImpl<D>
178where
179    D: std::ops::Deref + Send + Sync + 'static,
180    D::Target: RewardMerkleTreeDataSource + Send + Sync,
181{
182    // Request types
183    type Address = alloy::primitives::Address;
184
185    // Response types (internal types)
186    type RewardClaimInput = InternalRewardClaimInput;
187    type RewardBalance = U256;
188    type RewardAccountQueryData = InternalRewardAccountQueryData;
189    type RewardBalances = (Vec<(RewardAccountV2, InternalRewardAmount)>, u64); // (amounts, total)
190    type RewardMerkleTreeData = InternalRewardTreeData;
191
192    // Data API types
193    type NamespaceProof = espresso_types::NamespaceProofQueryData;
194    type IncorrectEncodingProof = espresso_types::v0_3::AvidMIncorrectEncodingNsProof;
195
196    // Consensus API types
197    type StateCertificate = espresso_types::StateCertQueryDataV2<espresso_types::SeqTypes>;
198    type StakeTable = Vec<hotshot_types::PeerConfig<espresso_types::SeqTypes>>;
199
200    // Helper conversion types
201    type PeerConfig = hotshot_types::PeerConfig<espresso_types::SeqTypes>;
202    type LightClientCert = hotshot_types::simple_certificate::LightClientStateUpdateCertificateV2<
203        espresso_types::SeqTypes,
204    >;
205    type NsProof = espresso_types::NsProof;
206
207    fn deserialize_address(&self, s: &str) -> anyhow::Result<Self::Address> {
208        s.parse()
209            .map_err(|_| anyhow::anyhow!("invalid ethereum address: {}", s))
210    }
211
212    // Serialize internal types → proto types
213    fn serialize_reward_claim_input(
214        &self,
215        address: &str,
216        value: &Self::RewardClaimInput,
217    ) -> anyhow::Result<RewardClaimInput> {
218        // Serialize auth_data directly - it serializes to a hex string via serde
219        let auth_data = serde_json::to_string(&value.auth_data)
220            .map_err(|e| anyhow::anyhow!("failed to serialize auth_data: {}", e))?
221            // Remove quotes added by JSON string serialization
222            .trim_matches('"')
223            .to_string();
224
225        Ok(RewardClaimInput {
226            address: address.to_string(),
227            lifetime_rewards: format!("{:#x}", value.lifetime_rewards), // Hex for contract
228            auth_data,
229        })
230    }
231
232    fn serialize_reward_balance(
233        &self,
234        value: &Self::RewardBalance,
235    ) -> anyhow::Result<RewardBalance> {
236        Ok(RewardBalance {
237            amount: value.to_string(), // Decimal string
238        })
239    }
240
241    fn serialize_reward_account_query_data(
242        &self,
243        value: &Self::RewardAccountQueryData,
244    ) -> anyhow::Result<RewardAccountQueryDataV2> {
245        // Convert balance to decimal string
246        let balance = value.balance.to_string();
247
248        // Convert the proof
249        let proof = Some(self.convert_reward_account_proof_v2(&value.proof)?);
250
251        Ok(RewardAccountQueryDataV2 { balance, proof })
252    }
253
254    fn serialize_reward_balances(
255        &self,
256        value: &Self::RewardBalances,
257    ) -> anyhow::Result<RewardBalances> {
258        let (amounts_vec, total) = value;
259
260        // Convert each account/amount pair to proto format
261        let amounts = amounts_vec
262            .iter()
263            .map(|(account, amount)| serialization_api::v2::RewardAmount {
264                address: format!("{:#x}", account.0),
265                amount: amount.0.to_string(), // Decimal string
266            })
267            .collect();
268
269        Ok(RewardBalances {
270            amounts,
271            total: *total,
272        })
273    }
274
275    fn serialize_reward_merkle_tree_data(
276        &self,
277        value: &Self::RewardMerkleTreeData,
278    ) -> anyhow::Result<RewardMerkleTreeV2Data> {
279        let bytes = bincode::serialize(value)
280            .map_err(|e| anyhow::anyhow!("failed to serialize RewardMerkleTreeV2Data: {}", e))?;
281        Ok(RewardMerkleTreeV2Data { data: bytes })
282    }
283
284    // Data API serialization methods
285
286    fn serialize_namespace_proof(
287        &self,
288        value: &Self::NamespaceProof,
289    ) -> anyhow::Result<v2::NamespaceProofResponse> {
290        // Serialize each transaction field explicitly using base64_bytes
291        let transactions: Vec<v2::Transaction> = value
292            .transactions
293            .iter()
294            .map(|tx| -> anyhow::Result<v2::Transaction> {
295                let mut payload_bytes = Vec::new();
296                base64_bytes::serialize(
297                    &tx.payload,
298                    &mut serde_json::Serializer::new(&mut payload_bytes),
299                )
300                .map_err(|e| anyhow::anyhow!("failed to serialize payload: {}", e))?;
301                // Convert to string and remove quotes added by JSON serializer
302                let payload_str = String::from_utf8(payload_bytes)?
303                    .trim_matches('"')
304                    .to_string();
305
306                Ok(v2::Transaction {
307                    namespace: tx.namespace.0,
308                    payload: payload_str,
309                })
310            })
311            .collect::<anyhow::Result<Vec<_>>>()?;
312
313        let proof = value
314            .proof
315            .as_ref()
316            .map(|p| self.serialize_ns_proof(p))
317            .transpose()?;
318
319        Ok(serialization_api::v2::NamespaceProofResponse {
320            transactions,
321            proof,
322        })
323    }
324
325    fn serialize_incorrect_encoding_proof(
326        &self,
327        value: &Self::IncorrectEncodingProof,
328    ) -> anyhow::Result<v2::IncorrectEncodingProofResponse> {
329        // Serialize the VID proof to JSON string
330        let proof_data = serde_json::to_string(&value.0)?;
331        Ok(serialization_api::v2::IncorrectEncodingProofResponse {
332            proof: Some(v2::AvidMIncorrectEncodingNsProof { proof_data }),
333        })
334    }
335
336    // Consensus API serialization methods
337
338    fn serialize_state_certificate(
339        &self,
340        value: &Self::StateCertificate,
341    ) -> anyhow::Result<v2::StateCertificateResponse> {
342        let certificate = self.serialize_light_client_cert(&value.0)?;
343
344        Ok(serialization_api::v2::StateCertificateResponse {
345            certificate: Some(certificate),
346        })
347    }
348
349    fn serialize_stake_table(
350        &self,
351        value: &Self::StakeTable,
352    ) -> anyhow::Result<v2::StakeTableResponse> {
353        let peers: Result<Vec<_>, _> = value
354            .iter()
355            .map(|peer| self.serialize_peer_config(peer))
356            .collect();
357
358        Ok(serialization_api::v2::StakeTableResponse { peers: peers? })
359    }
360
361    fn serialize_peer_config(&self, peer: &Self::PeerConfig) -> anyhow::Result<v2::PeerConfig> {
362        let stake_table_entry = v2::StakeTableEntry {
363            stake_key: Some(v2::BlsPublicKey {
364                key: peer.stake_table_entry.stake_key.to_string(),
365            }),
366            stake_amount: peer.stake_table_entry.stake_amount.to_string(),
367        };
368
369        let state_ver_key = v2::SchnorrPublicKey {
370            key: peer.state_ver_key.to_string(),
371        };
372
373        let connect_info = peer.connect_info.as_ref().map(|info| {
374            let p2p_addr = match &info.p2p_addr {
375                hotshot_types::addr::NetAddr::Inet(ip, port) => v2::NetAddr {
376                    addr_type: Some(v2::net_addr::AddrType::Inet(v2::InetAddr {
377                        host: match ip {
378                            std::net::IpAddr::V4(_) => ip.to_string(),
379                            std::net::IpAddr::V6(_) => format!("[{ip}]"),
380                        },
381                        port: *port as u32,
382                    })),
383                },
384                hotshot_types::addr::NetAddr::Name(name, port) => v2::NetAddr {
385                    addr_type: Some(v2::net_addr::AddrType::Name(v2::NameAddr {
386                        name: name.to_string(),
387                        port: *port as u32,
388                    })),
389                },
390            };
391
392            v2::PeerConnectInfo {
393                x25519_key: info.x25519_key.to_string(),
394                p2p_addr: Some(p2p_addr),
395            }
396        });
397
398        Ok(v2::PeerConfig {
399            stake_table_entry: Some(stake_table_entry),
400            state_ver_key: Some(state_ver_key),
401            connect_info,
402        })
403    }
404
405    fn serialize_light_client_cert(
406        &self,
407        cert: &Self::LightClientCert,
408    ) -> anyhow::Result<v2::LightClientStateUpdateCertificateV2> {
409        let signatures: Result<Vec<_>, anyhow::Error> = cert
410            .signatures
411            .iter()
412            .map(
413                |(key, lcv3_sig, lcv2_sig)| -> anyhow::Result<v2::StateSignatureTuple> {
414                    Ok(v2::StateSignatureTuple {
415                        state_signature_key: Some(v2::SchnorrPublicKey {
416                            key: key.to_string(),
417                        }),
418                        lcv3_signature: lcv3_sig.to_string(),
419                        lcv2_signature: lcv2_sig.to_string(),
420                    })
421                },
422            )
423            .collect();
424
425        Ok(v2::LightClientStateUpdateCertificateV2 {
426            epoch: cert.epoch.u64(),
427            light_client_state: cert.light_client_state.to_string(),
428            next_stake_table_state: cert.next_stake_table_state.to_string(),
429            signatures: signatures?,
430            auth_root: cert.auth_root.to_string(),
431        })
432    }
433
434    fn serialize_ns_proof(&self, proof: &Self::NsProof) -> anyhow::Result<v2::NsProof> {
435        let proof_version = match proof {
436            NsProof::V0(advz_proof) => {
437                // Serialize the inner fields directly
438                let json = serde_json::json!({
439                    "ns_index": advz_proof.ns_index,
440                    "ns_payload": advz_proof.ns_payload,
441                    "ns_proof": advz_proof.ns_proof,
442                });
443                v2::ns_proof::ProofVersion::V0(serde_json::from_value(json)?)
444            },
445            NsProof::V1(avidm_proof) => {
446                // Serialize ns_payload using base64_bytes
447                let mut ns_payload_bytes = Vec::new();
448                base64_bytes::serialize(
449                    &avidm_proof.0.ns_payload,
450                    &mut serde_json::Serializer::new(&mut ns_payload_bytes),
451                )
452                .map_err(|e| anyhow::anyhow!("failed to serialize ns_payload: {}", e))?;
453                let ns_payload_str = String::from_utf8(ns_payload_bytes)?
454                    .trim_matches('"')
455                    .to_string();
456
457                v2::ns_proof::ProofVersion::V1(v2::AvidMNsProof {
458                    ns_index: avidm_proof.0.ns_index as u64,
459                    ns_payload: ns_payload_str,
460                    ns_proof: avidm_proof.0.ns_proof.to_string(),
461                })
462            },
463            NsProof::V1IncorrectEncoding(incorrect_proof) => {
464                // Serialize the whole proof to JSON string
465                v2::ns_proof::ProofVersion::V1IncorrectEncoding(v2::AvidMIncorrectEncodingNsProof {
466                    proof_data: serde_json::to_string(&incorrect_proof.0)?,
467                })
468            },
469            NsProof::V2(gf2_proof) => {
470                // Serialize ns_payload using base64_bytes
471                let mut ns_payload_bytes = Vec::new();
472                base64_bytes::serialize(
473                    &gf2_proof.0.ns_payload,
474                    &mut serde_json::Serializer::new(&mut ns_payload_bytes),
475                )
476                .map_err(|e| anyhow::anyhow!("failed to serialize ns_payload: {}", e))?;
477                let ns_payload_str = String::from_utf8(ns_payload_bytes)?
478                    .trim_matches('"')
479                    .to_string();
480
481                v2::ns_proof::ProofVersion::V2(v2::AvidmGf2NsProof {
482                    ns_index: gf2_proof.0.ns_index as u64,
483                    ns_payload: ns_payload_str,
484                    ns_proof: gf2_proof.0.ns_proof.to_string(),
485                })
486            },
487        };
488
489        Ok(v2::NsProof {
490            proof_version: Some(proof_version),
491        })
492    }
493}
494
495// ============================================================================
496// RewardApiV2 implementation (business logic)
497// ============================================================================
498
499#[async_trait]
500impl<D> espresso_api::v2::RewardApi for NodeApiStateImpl<D>
501where
502    D: std::ops::Deref + Clone + Send + Sync + 'static,
503    D::Target: RewardMerkleTreeDataSource + Send + Sync,
504{
505    async fn get_reward_claim_input(
506        &self,
507        address: Self::Address,
508    ) -> anyhow::Result<Self::RewardClaimInput> {
509        // Load the latest reward account proof from the data source
510        let proof = self
511            .data_source
512            .load_latest_reward_account_proof_v2(address.into())
513            .await
514            .map_err(|err| {
515                anyhow::anyhow!(
516                    "failed to load latest reward account {:?}: {}",
517                    address,
518                    err
519                )
520            })?;
521
522        // Convert the proof to reward claim input and return internal type
523        proof.to_reward_claim_input().map_err(|err| match err {
524            RewardClaimError::ZeroRewardError => {
525                anyhow::anyhow!("zero reward balance for {:?}", address)
526            },
527            RewardClaimError::ProofConversionError(e) => {
528                anyhow::anyhow!("failed to create solidity proof for {:?}: {}", address, e)
529            },
530        })
531    }
532
533    async fn get_reward_balance(
534        &self,
535        address: Self::Address,
536    ) -> anyhow::Result<Self::RewardBalance> {
537        // Load the latest reward account proof from the data source
538        let proof = self
539            .data_source
540            .load_latest_reward_account_proof_v2(address.into())
541            .await
542            .map_err(|err| {
543                anyhow::anyhow!(
544                    "failed to load latest reward account {:?}: {}",
545                    address,
546                    err
547                )
548            })?;
549
550        // Return internal balance type
551        Ok(proof.balance)
552    }
553
554    async fn get_reward_account_proof(
555        &self,
556        address: Self::Address,
557    ) -> anyhow::Result<Self::RewardAccountQueryData> {
558        // Load the latest reward account proof from the data source and return internal type
559        self.data_source
560            .load_latest_reward_account_proof_v2(address.into())
561            .await
562            .map_err(|err| {
563                anyhow::anyhow!(
564                    "failed to load latest reward account proof for {:?}: {}",
565                    address,
566                    err
567                )
568            })
569    }
570
571    async fn get_reward_balances(
572        &self,
573        height: u64,
574        offset: u64,
575        limit: u64,
576    ) -> anyhow::Result<Self::RewardBalances> {
577        // Validate limit (from reward.toml: limit <= 10000)
578        if limit > 10000 {
579            return Err(anyhow::anyhow!(
580                "limit {} exceeds maximum allowed value of 10000",
581                limit
582            ));
583        }
584
585        // Load the merkle tree at the given height
586        let tree_bytes = self.data_source.load_tree(height).await.map_err(|err| {
587            anyhow::anyhow!("failed to load reward tree at height {}: {}", height, err)
588        })?;
589
590        // Deserialize the tree into internal format
591        let tree_data: InternalRewardTreeData =
592            bincode::deserialize(&tree_bytes).map_err(|err| {
593                anyhow::anyhow!(
594                    "failed to deserialize RewardMerkleTreeV2Data at height {}: {}",
595                    height,
596                    err
597                )
598            })?;
599
600        let offset_usize = offset as usize;
601        let limit_usize = limit as usize;
602        let total = tree_data.balances.len() as u64;
603
604        // Validate offset is within bounds
605        if offset_usize > tree_data.balances.len() {
606            return Err(anyhow::anyhow!("offset {} out of bounds", offset));
607        }
608
609        let end = std::cmp::min(offset_usize + limit_usize, tree_data.balances.len());
610        let slice = &tree_data.balances[offset_usize..end];
611
612        // Reverse order (matching Tide implementation) and return internal type with total
613        let reversed: Vec<_> = slice.iter().rev().copied().collect();
614        Ok((reversed, total))
615    }
616
617    async fn get_reward_merkle_tree_v2(
618        &self,
619        height: u64,
620    ) -> anyhow::Result<Self::RewardMerkleTreeData> {
621        // Load the raw merkle tree bytes
622        let tree_bytes = self.data_source.load_tree(height).await.map_err(|err| {
623            anyhow::anyhow!("failed to load reward tree at height {}: {}", height, err)
624        })?;
625
626        // Deserialize and return internal type
627        bincode::deserialize(&tree_bytes).map_err(|err| {
628            anyhow::anyhow!(
629                "failed to deserialize RewardMerkleTreeV2Data at height {}: {}",
630                height,
631                err
632            )
633        })
634    }
635}
636
637// ============================================================================
638// RewardApiV1 implementation (internal types, no proto conversion)
639// ============================================================================
640
641#[async_trait]
642impl<D> espresso_api::v1::RewardApi for NodeApiStateImpl<D>
643where
644    D: RewardMerkleTreeDataSource,
645{
646    type RewardClaimInput = InternalRewardClaimInput;
647    type RewardBalance = InternalRewardAmount;
648    type RewardAccountQueryData = InternalRewardAccountQueryData;
649    type RewardAmounts = Vec<(alloy::primitives::Address, InternalRewardAmount)>;
650    type RewardMerkleTreeData = Vec<u8>;
651
652    async fn get_reward_claim_input(
653        &self,
654        block_height: u64,
655        address: String,
656    ) -> anyhow::Result<Self::RewardClaimInput> {
657        // Parse the Ethereum address
658        let addr: alloy::primitives::Address = address
659            .parse()
660            .map_err(|_| anyhow::anyhow!("invalid ethereum address: {}", address))?;
661
662        // Load the reward account proof from the data source
663        let proof = self
664            .data_source
665            .load_reward_account_proof_v2(block_height, addr.into())
666            .await
667            .map_err(|err| {
668                anyhow::anyhow!(
669                    "failed to load reward account {} at height {}: {}",
670                    address,
671                    block_height,
672                    err
673                )
674            })?;
675
676        // Convert the proof to reward claim input (internal type)
677        let claim_input = proof.to_reward_claim_input().map_err(|err| match err {
678            RewardClaimError::ZeroRewardError => {
679                anyhow::anyhow!(
680                    "zero reward balance for {} at height {}",
681                    address,
682                    block_height
683                )
684            },
685            RewardClaimError::ProofConversionError(e) => {
686                anyhow::anyhow!(
687                    "failed to create solidity proof for {} at height {}: {}",
688                    address,
689                    block_height,
690                    e
691                )
692            },
693        })?;
694
695        Ok(claim_input)
696    }
697
698    async fn get_reward_balance(
699        &self,
700        height: u64,
701        address: String,
702    ) -> anyhow::Result<Self::RewardBalance> {
703        // Parse the Ethereum address
704        let addr: alloy::primitives::Address = address
705            .parse()
706            .map_err(|_| anyhow::anyhow!("invalid ethereum address: {}", address))?;
707
708        // Load the reward account proof from the data source
709        let proof = self
710            .data_source
711            .load_reward_account_proof_v2(height, addr.into())
712            .await
713            .map_err(|err| {
714                anyhow::anyhow!(
715                    "failed to load reward account {} at height {}: {}",
716                    address,
717                    height,
718                    err
719                )
720            })?;
721
722        Ok(InternalRewardAmount(proof.balance))
723    }
724
725    async fn get_latest_reward_balance(
726        &self,
727        address: String,
728    ) -> anyhow::Result<Self::RewardBalance> {
729        let addr: alloy::primitives::Address = address
730            .parse()
731            .map_err(|_| anyhow::anyhow!("invalid ethereum address: {}", address))?;
732
733        let proof = self
734            .data_source
735            .load_latest_reward_account_proof_v2(addr.into())
736            .await
737            .map_err(|err| {
738                anyhow::anyhow!("failed to load latest reward account {}: {}", address, err)
739            })?;
740
741        Ok(InternalRewardAmount(proof.balance))
742    }
743
744    async fn get_reward_account_proof(
745        &self,
746        height: u64,
747        address: String,
748    ) -> anyhow::Result<Self::RewardAccountQueryData> {
749        // Parse the Ethereum address
750        let addr: alloy::primitives::Address = address
751            .parse()
752            .map_err(|_| anyhow::anyhow!("invalid ethereum address: {}", address))?;
753
754        // Load and return the reward account proof directly (internal type)
755        let proof = self
756            .data_source
757            .load_reward_account_proof_v2(height, addr.into())
758            .await
759            .map_err(|err| {
760                anyhow::anyhow!(
761                    "failed to load reward account {} at height {}: {}",
762                    address,
763                    height,
764                    err
765                )
766            })?;
767
768        Ok(proof)
769    }
770
771    async fn get_latest_reward_account_proof(
772        &self,
773        address: String,
774    ) -> anyhow::Result<Self::RewardAccountQueryData> {
775        // Parse the Ethereum address
776        let addr: alloy::primitives::Address = address
777            .parse()
778            .map_err(|_| anyhow::anyhow!("invalid ethereum address: {}", address))?;
779
780        // Load and return the latest reward account proof directly (internal type)
781        let proof = self
782            .data_source
783            .load_latest_reward_account_proof_v2(addr.into())
784            .await
785            .map_err(|err| {
786                anyhow::anyhow!("failed to load latest reward account {}: {}", address, err)
787            })?;
788
789        Ok(proof)
790    }
791
792    async fn get_reward_amounts(
793        &self,
794        height: u64,
795        offset: u64,
796        limit: u64,
797    ) -> anyhow::Result<Self::RewardAmounts> {
798        // Validate limit (from reward.toml: limit <= 10000)
799        if limit > 10000 {
800            return Err(anyhow::anyhow!(
801                "limit {} exceeds maximum allowed value of 10000",
802                limit
803            ));
804        }
805
806        // Load the merkle tree at the given height
807        let tree_bytes = self.data_source.load_tree(height).await.map_err(|err| {
808            anyhow::anyhow!("failed to load reward tree at height {}: {}", height, err)
809        })?;
810
811        // Deserialize the tree into internal format
812        let tree_data: InternalRewardTreeData =
813            bincode::deserialize(&tree_bytes).map_err(|err| {
814                anyhow::anyhow!(
815                    "failed to deserialize RewardMerkleTreeV2Data at height {}: {}",
816                    height,
817                    err
818                )
819            })?;
820
821        let offset_usize = offset as usize;
822        let limit_usize = limit as usize;
823
824        // Validate offset is within bounds
825        if offset_usize > tree_data.balances.len() {
826            return Err(anyhow::anyhow!("offset {} out of bounds", offset));
827        }
828
829        let end = std::cmp::min(offset_usize + limit_usize, tree_data.balances.len());
830        let slice = &tree_data.balances[offset_usize..end];
831
832        let result: Vec<(alloy::primitives::Address, InternalRewardAmount)> = slice
833            .iter()
834            .rev()
835            .map(|(account, amount)| (account.0, *amount))
836            .collect();
837
838        Ok(result)
839    }
840
841    async fn get_reward_merkle_tree_v2(
842        &self,
843        height: u64,
844    ) -> anyhow::Result<Self::RewardMerkleTreeData> {
845        self.data_source.load_tree(height).await.map_err(|err| {
846            anyhow::anyhow!("failed to load reward tree at height {}: {}", height, err)
847        })
848    }
849}
850
851// ============================================================================
852// v2::DataApi implementation
853// ============================================================================
854
855#[async_trait]
856impl<D> espresso_api::v2::DataApi for NodeApiStateImpl<D>
857where
858    D: std::ops::Deref + Clone + Send + Sync + 'static,
859    D::Target: RewardMerkleTreeDataSource
860        + hotshot_query_service::availability::AvailabilityDataSource<espresso_types::SeqTypes>
861        + hotshot_query_service::node::NodeDataSource<espresso_types::SeqTypes>
862        + super::data_source::RequestResponseDataSource<espresso_types::SeqTypes>
863        + Sync
864        + Send,
865{
866    async fn get_namespace_proof(
867        &self,
868        namespace_id: u64,
869        block_height: u64,
870    ) -> anyhow::Result<Self::NamespaceProof> {
871        let ns_id = NamespaceId(namespace_id);
872        let block_id = HsBlockId::Number(block_height as usize);
873
874        // Fetch block and VID common data concurrently
875        let ds = &*self.data_source;
876        let timeout = std::time::Duration::from_millis(500);
877        let (block_fetch, vid_fetch) = join!(ds.get_block(block_id), ds.get_vid_common(block_id));
878        let (block_opt, vid_opt) = join!(
879            block_fetch.with_timeout(timeout),
880            vid_fetch.with_timeout(timeout)
881        );
882
883        let block = block_opt.ok_or_else(|| anyhow::anyhow!("block {} not found", block_height))?;
884        let vid_common = vid_opt.ok_or_else(|| {
885            anyhow::anyhow!("VID common data for block {} not found", block_height)
886        })?;
887
888        // Generate namespace proof
889        let ns_table = block.payload().ns_table();
890        let ns_index = ns_table.find_ns_id(&ns_id).ok_or_else(|| {
891            anyhow::anyhow!(
892                "namespace {} not present in block {}",
893                namespace_id,
894                block_height
895            )
896        })?;
897
898        let proof =
899            NsProof::new(block.payload(), &ns_index, vid_common.common()).ok_or_else(|| {
900                anyhow::anyhow!(
901                    "failed to generate namespace proof for block {}",
902                    block_height
903                )
904            })?;
905
906        let transactions = proof.export_all_txs(&ns_id);
907
908        Ok(espresso_types::NamespaceProofQueryData {
909            transactions,
910            proof: Some(proof),
911        })
912    }
913
914    async fn get_namespace_proof_range(
915        &self,
916        namespace_id: u64,
917        from: u64,
918        until: u64,
919    ) -> anyhow::Result<Vec<Self::NamespaceProof>> {
920        let ns_id = NamespaceId(namespace_id);
921
922        // Validate range
923        if until <= from {
924            return Err(anyhow::anyhow!(
925                "invalid range: until ({}) must be greater than from ({})",
926                until,
927                from
928            ));
929        }
930
931        let range_size = until - from;
932        const MAX_RANGE: u64 = 100; // Match limit from availability.toml
933        if range_size > MAX_RANGE {
934            return Err(anyhow::anyhow!(
935                "range too large: {} blocks (max {})",
936                range_size,
937                MAX_RANGE
938            ));
939        }
940
941        // Fetch blocks and VID common data for the range
942        let (blocks_stream, vids_stream) = join!(
943            self.data_source
944                .get_block_range(from as usize..until as usize),
945            self.data_source
946                .get_vid_common_range(from as usize..until as usize)
947        );
948
949        let blocks: Vec<_> = blocks_stream
950            .then(|block| async move { block.resolve().await })
951            .collect()
952            .await;
953        let vids: Vec<_> = vids_stream
954            .then(|vid| async move { vid.resolve().await })
955            .collect()
956            .await;
957
958        if blocks.len() != vids.len() {
959            return Err(anyhow::anyhow!(
960                "mismatch between blocks and VID common data"
961            ));
962        }
963
964        // Generate proofs for each block
965        let mut proofs = Vec::new();
966
967        for (block, vid) in blocks.into_iter().zip(vids) {
968            let ns_table = block.payload().ns_table();
969
970            // Check if namespace exists in this block
971            if let Some(ns_index) = ns_table.find_ns_id(&ns_id) {
972                if let Some(proof) = NsProof::new(block.payload(), &ns_index, vid.common()) {
973                    let transactions = proof.export_all_txs(&ns_id);
974                    proofs.push(espresso_types::NamespaceProofQueryData {
975                        transactions,
976                        proof: Some(proof),
977                    });
978                } else {
979                    // Failed to generate proof - return empty result for this block
980                    proofs.push(espresso_types::NamespaceProofQueryData {
981                        transactions: vec![],
982                        proof: None,
983                    });
984                }
985            } else {
986                // Namespace not present in this block
987                proofs.push(espresso_types::NamespaceProofQueryData {
988                    transactions: vec![],
989                    proof: None,
990                });
991            }
992        }
993
994        Ok(proofs)
995    }
996
997    async fn get_incorrect_encoding_proof(
998        &self,
999        namespace_id: u64,
1000        block_height: u64,
1001    ) -> anyhow::Result<Self::IncorrectEncodingProof> {
1002        let ns_id = NamespaceId(namespace_id);
1003        let block_id = HsBlockId::Number(block_height as usize);
1004
1005        let ds = &*self.data_source;
1006        let timeout = std::time::Duration::from_millis(500);
1007        let (block_fetch, vid_fetch) = join!(ds.get_block(block_id), ds.get_vid_common(block_id));
1008        let (block, vid_common) = join!(
1009            block_fetch.with_timeout(timeout),
1010            vid_fetch.with_timeout(timeout)
1011        );
1012
1013        let block = block.ok_or_else(|| anyhow::anyhow!("block {} not found", block_height))?;
1014        let vid_common = vid_common.ok_or_else(|| {
1015            anyhow::anyhow!("VID common data for block {} not found", block_height)
1016        })?;
1017
1018        let ns_table = block.payload().ns_table();
1019        let ns_index = ns_table.find_ns_id(&ns_id).ok_or_else(|| {
1020            anyhow::anyhow!(
1021                "namespace {} not present in block {}",
1022                namespace_id,
1023                block_height
1024            )
1025        })?;
1026
1027        if NsProof::new(block.payload(), &ns_index, vid_common.common()).is_some() {
1028            return Err(anyhow::anyhow!(
1029                "block {} was correctly encoded",
1030                block_height
1031            ));
1032        }
1033
1034        // Block has incorrect encoding — fetch VID shares to construct the proof.
1035        let vid_shares_future = ds
1036            .request_vid_shares(block_height, vid_common.clone(), Duration::from_secs(40))
1037            .await;
1038        let mut vid_shares = vid_shares_future
1039            .await
1040            .map_err(|e| anyhow::anyhow!("failed to fetch VID shares: {e:#}"))?;
1041
1042        if let Ok(local_share) = ds.vid_share(block_height as usize).await {
1043            vid_shares.push(local_share);
1044        }
1045
1046        let avidm_shares: Vec<AvidMShare> = vid_shares
1047            .into_iter()
1048            .filter_map(|s| {
1049                if let VidShare::V1(s) = s {
1050                    Some(s)
1051                } else {
1052                    None
1053                }
1054            })
1055            .collect();
1056
1057        match NsProof::v1_1_new_with_incorrect_encoding(
1058            &avidm_shares,
1059            ns_table,
1060            &ns_index,
1061            &vid_common.payload_hash(),
1062            vid_common.common(),
1063        ) {
1064            Some(NsProof::V1IncorrectEncoding(proof)) => Ok(proof),
1065            _ => Err(anyhow::anyhow!(
1066                "failed to generate incorrect encoding proof"
1067            )),
1068        }
1069    }
1070}
1071
1072// ============================================================================
1073// v2::ConsensusApi implementation
1074// ============================================================================
1075
1076#[async_trait]
1077impl<D> espresso_api::v2::ConsensusApi for NodeApiStateImpl<D>
1078where
1079    D: std::ops::Deref + Clone + Send + Sync + 'static,
1080    D::Target: RewardMerkleTreeDataSource
1081        + super::data_source::StateCertDataSource
1082        + super::data_source::StateCertFetchingDataSource<espresso_types::SeqTypes>
1083        + super::data_source::StakeTableDataSource<espresso_types::SeqTypes>
1084        + Send
1085        + Sync,
1086{
1087    async fn get_state_certificate(&self, epoch: u64) -> anyhow::Result<Self::StateCertificate> {
1088        let ds = &*self.data_source;
1089
1090        // Try to get from local storage first
1091        let state_cert = ds.get_state_cert_by_epoch(epoch).await?;
1092
1093        let cert = match state_cert {
1094            Some(cert) => cert,
1095            None => {
1096                // Not found locally, try to fetch from peers
1097                const TIMEOUT: Duration = Duration::from_secs(40);
1098                let cert = ds.request_state_cert(epoch, TIMEOUT).await.map_err(|e| {
1099                    anyhow::anyhow!("failed to fetch state cert for epoch {}: {}", epoch, e)
1100                })?;
1101
1102                // Store the fetched certificate
1103                ds.insert_state_cert(epoch, cert.clone()).await?;
1104
1105                cert
1106            },
1107        };
1108
1109        Ok(espresso_types::StateCertQueryDataV2(cert))
1110    }
1111
1112    async fn get_stake_table(&self, epoch: u64) -> anyhow::Result<Self::StakeTable> {
1113        let ds = &*self.data_source;
1114        ds.get_stake_table(Some(EpochNumber::new(epoch))).await
1115    }
1116}
1117
1118// ============================================================================
1119// v1::AvailabilityApi implementation
1120// ============================================================================
1121
1122#[async_trait]
1123impl<D> espresso_api::v1::AvailabilityApi for NodeApiStateImpl<D>
1124where
1125    D: std::ops::Deref + Clone + Send + Sync + 'static,
1126    D::Target: RewardMerkleTreeDataSource
1127        + hotshot_query_service::availability::AvailabilityDataSource<espresso_types::SeqTypes>
1128        + hotshot_query_service::node::NodeDataSource<espresso_types::SeqTypes>
1129        + super::data_source::RequestResponseDataSource<espresso_types::SeqTypes>
1130        + super::data_source::StateCertDataSource
1131        + super::data_source::StateCertFetchingDataSource<espresso_types::SeqTypes>
1132        + Send
1133        + Sync,
1134{
1135    type NamespaceProofQueryData = espresso_types::NamespaceProofQueryData;
1136    type IncorrectEncodingProof = espresso_types::v0_3::AvidMIncorrectEncodingNsProof;
1137    type StateCertQueryDataV1 = espresso_types::StateCertQueryDataV1<espresso_types::SeqTypes>;
1138    type StateCertQueryDataV2 = espresso_types::StateCertQueryDataV2<espresso_types::SeqTypes>;
1139
1140    async fn get_namespace_proof(
1141        &self,
1142        block_id: espresso_api::v1::availability::BlockId,
1143        namespace: u32,
1144    ) -> anyhow::Result<Option<Self::NamespaceProofQueryData>> {
1145        let ns_id = NamespaceId::from(namespace);
1146
1147        // Convert v1 BlockId to hotshot BlockId
1148        let hs_block_id = match block_id {
1149            espresso_api::v1::availability::BlockId::Height(h) => HsBlockId::Number(h as usize),
1150            espresso_api::v1::availability::BlockId::Hash(h) => {
1151                let hash = h
1152                    .parse()
1153                    .map_err(|_| anyhow::anyhow!("invalid block hash: {}", h))?;
1154                HsBlockId::Hash(hash)
1155            },
1156            espresso_api::v1::availability::BlockId::PayloadHash(h) => {
1157                let payload_hash = h
1158                    .parse()
1159                    .map_err(|_| anyhow::anyhow!("invalid payload hash: {}", h))?;
1160                HsBlockId::PayloadHash(payload_hash)
1161            },
1162        };
1163
1164        // Fetch block and VID common data
1165        let ds = &*self.data_source;
1166        let timeout = std::time::Duration::from_millis(500);
1167        let (block_fetch, vid_fetch) =
1168            join!(ds.get_block(hs_block_id), ds.get_vid_common(hs_block_id));
1169        let (block, vid_common) = join!(
1170            block_fetch.with_timeout(timeout),
1171            vid_fetch.with_timeout(timeout)
1172        );
1173
1174        let Some(block) = block else {
1175            return Ok(None);
1176        };
1177        let Some(vid_common) = vid_common else {
1178            return Ok(None);
1179        };
1180
1181        // Check if namespace is present
1182        let ns_table = block.payload().ns_table();
1183        let Some(ns_index) = ns_table.find_ns_id(&ns_id) else {
1184            return Ok(None);
1185        };
1186
1187        // Generate namespace proof
1188        let Some(proof) = NsProof::new(block.payload(), &ns_index, vid_common.common()) else {
1189            // Failed to generate proof - namespace exists but proof generation failed
1190            return Ok(Some(espresso_types::NamespaceProofQueryData {
1191                transactions: vec![],
1192                proof: None,
1193            }));
1194        };
1195
1196        let transactions = proof.export_all_txs(&ns_id);
1197
1198        Ok(Some(espresso_types::NamespaceProofQueryData {
1199            transactions,
1200            proof: Some(proof),
1201        }))
1202    }
1203
1204    async fn get_namespace_proof_range(
1205        &self,
1206        from: u64,
1207        until: u64,
1208        namespace: u32,
1209    ) -> anyhow::Result<Vec<Self::NamespaceProofQueryData>> {
1210        let ns_id = NamespaceId::from(namespace);
1211
1212        // Validate range
1213        if until <= from {
1214            return Err(bad_request(format!(
1215                "invalid range: until ({}) must be greater than from ({})",
1216                until, from
1217            )));
1218        }
1219
1220        let range_size = until - from;
1221        const MAX_RANGE: u64 = 100;
1222        if range_size > MAX_RANGE {
1223            return Err(range_exceeded(format!(
1224                "range too large: {} blocks (max {})",
1225                range_size, MAX_RANGE
1226            )));
1227        }
1228
1229        // Fetch blocks and VID common data for the range
1230        let (blocks_stream, vids_stream) = join!(
1231            self.data_source
1232                .get_block_range(from as usize..until as usize),
1233            self.data_source
1234                .get_vid_common_range(from as usize..until as usize)
1235        );
1236
1237        let blocks: Vec<_> = blocks_stream
1238            .then(|block| async move { block.resolve().await })
1239            .collect()
1240            .await;
1241        let vids: Vec<_> = vids_stream
1242            .then(|vid| async move { vid.resolve().await })
1243            .collect()
1244            .await;
1245
1246        if blocks.len() != vids.len() {
1247            return Err(anyhow::anyhow!(
1248                "mismatch between blocks and VID common data"
1249            ));
1250        }
1251
1252        // Generate proofs for each block
1253        let mut proofs = Vec::new();
1254
1255        for (block, vid) in blocks.into_iter().zip(vids) {
1256            let ns_table = block.payload().ns_table();
1257
1258            // Check if namespace exists in this block
1259            if let Some(ns_index) = ns_table.find_ns_id(&ns_id) {
1260                if let Some(proof) = NsProof::new(block.payload(), &ns_index, vid.common()) {
1261                    let transactions = proof.export_all_txs(&ns_id);
1262                    proofs.push(espresso_types::NamespaceProofQueryData {
1263                        transactions,
1264                        proof: Some(proof),
1265                    });
1266                } else {
1267                    // Failed to generate proof - return empty result for this block
1268                    proofs.push(espresso_types::NamespaceProofQueryData {
1269                        transactions: vec![],
1270                        proof: None,
1271                    });
1272                }
1273            } else {
1274                // Namespace not present in this block
1275                proofs.push(espresso_types::NamespaceProofQueryData {
1276                    transactions: vec![],
1277                    proof: None,
1278                });
1279            }
1280        }
1281
1282        Ok(proofs)
1283    }
1284
1285    async fn stream_namespace_proofs(
1286        &self,
1287        from: usize,
1288        namespace: u32,
1289    ) -> anyhow::Result<BoxStream<'static, Self::NamespaceProofQueryData>> {
1290        let ns_id = NamespaceId::from(namespace);
1291        let ds = self.data_source.clone();
1292        let blocks = (*ds).subscribe_blocks(from).await;
1293        let vids = (*ds).subscribe_vid_common(from).await;
1294
1295        let stream = blocks
1296            .zip(vids)
1297            .map(move |(block, vid)| {
1298                let ns_table = block.payload().ns_table();
1299                if let Some(ns_index) = ns_table.find_ns_id(&ns_id) {
1300                    if let Some(proof) = NsProof::new(block.payload(), &ns_index, vid.common()) {
1301                        let transactions = proof.export_all_txs(&ns_id);
1302                        NamespaceProofQueryData {
1303                            transactions,
1304                            proof: Some(proof),
1305                        }
1306                    } else {
1307                        NamespaceProofQueryData {
1308                            transactions: vec![],
1309                            proof: None,
1310                        }
1311                    }
1312                } else {
1313                    NamespaceProofQueryData {
1314                        transactions: vec![],
1315                        proof: None,
1316                    }
1317                }
1318            })
1319            .boxed();
1320
1321        Ok(stream)
1322    }
1323
1324    async fn get_incorrect_encoding_proof(
1325        &self,
1326        block_id: espresso_api::v1::availability::BlockId,
1327        namespace: u32,
1328    ) -> anyhow::Result<Self::IncorrectEncodingProof> {
1329        let ns_id = NamespaceId::from(namespace);
1330
1331        let hs_block_id = match block_id {
1332            espresso_api::v1::availability::BlockId::Height(h) => HsBlockId::Number(h as usize),
1333            espresso_api::v1::availability::BlockId::Hash(h) => {
1334                let hash = h
1335                    .parse()
1336                    .map_err(|_| anyhow::anyhow!("invalid block hash: {}", h))?;
1337                HsBlockId::Hash(hash)
1338            },
1339            espresso_api::v1::availability::BlockId::PayloadHash(h) => {
1340                let payload_hash = h
1341                    .parse()
1342                    .map_err(|_| anyhow::anyhow!("invalid payload hash: {}", h))?;
1343                HsBlockId::PayloadHash(payload_hash)
1344            },
1345        };
1346
1347        let ds = &*self.data_source;
1348        let timeout = std::time::Duration::from_millis(500);
1349        let (block_fetch, vid_fetch) =
1350            join!(ds.get_block(hs_block_id), ds.get_vid_common(hs_block_id));
1351        let (block, vid_common) = join!(
1352            block_fetch.with_timeout(timeout),
1353            vid_fetch.with_timeout(timeout)
1354        );
1355
1356        let block = block.ok_or_else(|| anyhow::anyhow!("block not found"))?;
1357        let vid_common = vid_common.ok_or_else(|| anyhow::anyhow!("VID common data not found"))?;
1358
1359        let ns_table = block.payload().ns_table();
1360        let ns_index = ns_table
1361            .find_ns_id(&ns_id)
1362            .ok_or_else(|| anyhow::anyhow!("namespace {} not present in block", namespace))?;
1363
1364        if NsProof::new(block.payload(), &ns_index, vid_common.common()).is_some() {
1365            return Err(anyhow::anyhow!("block was correctly encoded"));
1366        }
1367
1368        // Block has incorrect encoding — fetch VID shares to construct the proof.
1369        let vid_shares_future = ds
1370            .request_vid_shares(block.height(), vid_common.clone(), Duration::from_secs(40))
1371            .await;
1372        let mut vid_shares = vid_shares_future
1373            .await
1374            .map_err(|e| anyhow::anyhow!("failed to fetch VID shares: {e:#}"))?;
1375
1376        if let Ok(local_share) = ds.vid_share(block.height() as usize).await {
1377            vid_shares.push(local_share);
1378        }
1379
1380        let avidm_shares: Vec<AvidMShare> = vid_shares
1381            .into_iter()
1382            .filter_map(|s| {
1383                if let VidShare::V1(s) = s {
1384                    Some(s)
1385                } else {
1386                    None
1387                }
1388            })
1389            .collect();
1390
1391        match NsProof::v1_1_new_with_incorrect_encoding(
1392            &avidm_shares,
1393            ns_table,
1394            &ns_index,
1395            &vid_common.payload_hash(),
1396            vid_common.common(),
1397        ) {
1398            Some(NsProof::V1IncorrectEncoding(proof)) => Ok(proof),
1399            _ => Err(anyhow::anyhow!(
1400                "failed to generate incorrect encoding proof"
1401            )),
1402        }
1403    }
1404
1405    async fn get_state_cert(&self, epoch: u64) -> anyhow::Result<Self::StateCertQueryDataV1> {
1406        // Try to get from local storage first
1407        let state_cert = self.data_source.get_state_cert_by_epoch(epoch).await?;
1408
1409        let cert = match state_cert {
1410            Some(cert) => cert,
1411            None => {
1412                // Not found locally, try to fetch from peers
1413                const TIMEOUT: Duration = Duration::from_secs(40);
1414                let cert = self
1415                    .data_source
1416                    .request_state_cert(epoch, TIMEOUT)
1417                    .await
1418                    .map_err(|e| {
1419                        anyhow::anyhow!("failed to fetch state cert for epoch {}: {}", epoch, e)
1420                    })?;
1421
1422                // Store the fetched certificate
1423                self.data_source
1424                    .insert_state_cert(epoch, cert.clone())
1425                    .await?;
1426
1427                cert
1428            },
1429        };
1430
1431        Ok(espresso_types::StateCertQueryDataV1::from(
1432            espresso_types::StateCertQueryDataV2(cert),
1433        ))
1434    }
1435
1436    async fn get_state_cert_v2(&self, epoch: u64) -> anyhow::Result<Self::StateCertQueryDataV2> {
1437        // Try to get from local storage first
1438        let state_cert = self.data_source.get_state_cert_by_epoch(epoch).await?;
1439
1440        let cert = match state_cert {
1441            Some(cert) => cert,
1442            None => {
1443                // Not found locally, try to fetch from peers
1444                const TIMEOUT: Duration = Duration::from_secs(40);
1445                let cert = self
1446                    .data_source
1447                    .request_state_cert(epoch, TIMEOUT)
1448                    .await
1449                    .map_err(|e| {
1450                        anyhow::anyhow!("failed to fetch state cert for epoch {}: {}", epoch, e)
1451                    })?;
1452
1453                // Store the fetched certificate
1454                self.data_source
1455                    .insert_state_cert(epoch, cert.clone())
1456                    .await?;
1457
1458                cert
1459            },
1460        };
1461
1462        Ok(espresso_types::StateCertQueryDataV2(cert))
1463    }
1464}
1465
1466// ============================================================================
1467// v1::HotShotAvailabilityApi implementation
1468// ============================================================================
1469
1470fn not_found(msg: impl Into<String>) -> anyhow::Error {
1471    AvailabilityError::NotFound(msg.into()).into()
1472}
1473
1474fn bad_request(msg: impl Into<String>) -> anyhow::Error {
1475    AvailabilityError::BadRequest(msg.into()).into()
1476}
1477
1478fn range_exceeded(msg: impl Into<String>) -> anyhow::Error {
1479    AvailabilityError::RangeExceeded(msg.into()).into()
1480}
1481
1482fn enforce_range(from: usize, until: usize, limit: usize) -> anyhow::Result<()> {
1483    if until.saturating_sub(from) > limit {
1484        return Err(range_exceeded(format!(
1485            "range {from}..{until} exceeds limit {limit}"
1486        )));
1487    }
1488    Ok(())
1489}
1490
1491#[async_trait]
1492impl<D> HotShotAvailabilityApi for NodeApiStateImpl<D>
1493where
1494    D: std::ops::Deref + Clone + Send + Sync + 'static,
1495    D::Target: AvailabilityDataSource<espresso_types::SeqTypes> + Send + Sync,
1496{
1497    type Leaf = LeafQueryData<espresso_types::SeqTypes>;
1498    type Block = BlockQueryData<espresso_types::SeqTypes>;
1499    type Header = HsHeader<espresso_types::SeqTypes>;
1500    type Payload = PayloadQueryData<espresso_types::SeqTypes>;
1501    type VidCommon = VidCommonQueryData<espresso_types::SeqTypes>;
1502    type Transaction = TransactionQueryData<espresso_types::SeqTypes>;
1503    type TransactionWithProof = TransactionWithProofQueryData<espresso_types::SeqTypes>;
1504    type BlockSummary = BlockSummaryQueryData<espresso_types::SeqTypes>;
1505    type Limits = HsLimits;
1506    type Cert2 = Certificate2<espresso_types::SeqTypes>;
1507
1508    async fn get_leaf(
1509        &self,
1510        id: espresso_api::v1::availability::LeafId,
1511    ) -> anyhow::Result<Self::Leaf> {
1512        let hs_id = match id {
1513            espresso_api::v1::availability::LeafId::Height(h) => HsLeafId::Number(h as usize),
1514            espresso_api::v1::availability::LeafId::Hash(h) => {
1515                HsLeafId::Hash(h.parse().map_err(|_| bad_request("invalid leaf hash"))?)
1516            },
1517        };
1518        let ds = &*self.data_source;
1519        ds.get_leaf(hs_id)
1520            .await
1521            .with_timeout(Duration::from_millis(500))
1522            .await
1523            .ok_or_else(|| not_found("leaf not found"))
1524    }
1525
1526    async fn get_leaf_range(&self, from: usize, until: usize) -> anyhow::Result<Vec<Self::Leaf>> {
1527        enforce_range(from, until, 500)?;
1528        let timeout = Duration::from_millis(500);
1529        let ds = &*self.data_source;
1530        let stream = ds.get_leaf_range(from..until).await;
1531        let mut results = Vec::new();
1532        futures::pin_mut!(stream);
1533        let mut i = from;
1534        while let Some(fetch) = stream.next().await {
1535            let item = fetch
1536                .with_timeout(timeout)
1537                .await
1538                .ok_or_else(|| not_found(format!("leaf {} not found", i)))?;
1539            results.push(item);
1540            i += 1;
1541        }
1542        Ok(results)
1543    }
1544
1545    async fn get_header(
1546        &self,
1547        id: espresso_api::v1::availability::BlockId,
1548    ) -> anyhow::Result<Self::Header> {
1549        let hs_id = block_id_to_hs(id)?;
1550        let ds = &*self.data_source;
1551        ds.get_header(hs_id)
1552            .await
1553            .with_timeout(Duration::from_millis(500))
1554            .await
1555            .ok_or_else(|| not_found(format!("header not found for {}", hs_id)))
1556    }
1557
1558    async fn get_header_range(
1559        &self,
1560        from: usize,
1561        until: usize,
1562    ) -> anyhow::Result<Vec<Self::Header>> {
1563        enforce_range(from, until, 100)?;
1564        let timeout = Duration::from_millis(500);
1565        let ds = &*self.data_source;
1566        let stream = ds.get_header_range(from..until).await;
1567        let mut results = Vec::new();
1568        futures::pin_mut!(stream);
1569        let mut i = from;
1570        while let Some(fetch) = stream.next().await {
1571            let item = fetch
1572                .with_timeout(timeout)
1573                .await
1574                .ok_or_else(|| not_found(format!("header {} not found", i)))?;
1575            results.push(item);
1576            i += 1;
1577        }
1578        Ok(results)
1579    }
1580
1581    async fn get_block(
1582        &self,
1583        id: espresso_api::v1::availability::BlockId,
1584    ) -> anyhow::Result<Self::Block> {
1585        let hs_id = block_id_to_hs(id)?;
1586        let ds = &*self.data_source;
1587        ds.get_block(hs_id)
1588            .await
1589            .with_timeout(Duration::from_millis(500))
1590            .await
1591            .ok_or_else(|| not_found(format!("block not found for {}", hs_id)))
1592    }
1593
1594    async fn get_block_range(&self, from: usize, until: usize) -> anyhow::Result<Vec<Self::Block>> {
1595        enforce_range(from, until, 100)?;
1596        let timeout = Duration::from_millis(500);
1597        let ds = &*self.data_source;
1598        let stream = ds.get_block_range(from..until).await;
1599        let mut results = Vec::new();
1600        futures::pin_mut!(stream);
1601        let mut i = from;
1602        while let Some(fetch) = stream.next().await {
1603            let item = fetch
1604                .with_timeout(timeout)
1605                .await
1606                .ok_or_else(|| not_found(format!("block {} not found", i)))?;
1607            results.push(item);
1608            i += 1;
1609        }
1610        Ok(results)
1611    }
1612
1613    async fn get_payload(
1614        &self,
1615        id: espresso_api::v1::availability::PayloadId,
1616    ) -> anyhow::Result<Self::Payload> {
1617        let hs_id = payload_id_to_hs(id)?;
1618        let ds = &*self.data_source;
1619        ds.get_payload(hs_id)
1620            .await
1621            .with_timeout(Duration::from_millis(500))
1622            .await
1623            .ok_or_else(|| not_found(format!("payload not found for {}", hs_id)))
1624    }
1625
1626    async fn get_payload_range(
1627        &self,
1628        from: usize,
1629        until: usize,
1630    ) -> anyhow::Result<Vec<Self::Payload>> {
1631        enforce_range(from, until, 100)?;
1632        let timeout = Duration::from_millis(500);
1633        let ds = &*self.data_source;
1634        let stream = ds.get_payload_range(from..until).await;
1635        let mut results = Vec::new();
1636        futures::pin_mut!(stream);
1637        let mut i = from;
1638        while let Some(fetch) = stream.next().await {
1639            let item = fetch
1640                .with_timeout(timeout)
1641                .await
1642                .ok_or_else(|| not_found(format!("payload {} not found", i)))?;
1643            results.push(item);
1644            i += 1;
1645        }
1646        Ok(results)
1647    }
1648
1649    async fn get_vid_common(
1650        &self,
1651        id: espresso_api::v1::availability::BlockId,
1652    ) -> anyhow::Result<Self::VidCommon> {
1653        let hs_id = block_id_to_hs(id)?;
1654        let ds = &*self.data_source;
1655        ds.get_vid_common(hs_id)
1656            .await
1657            .with_timeout(Duration::from_millis(500))
1658            .await
1659            .ok_or_else(|| not_found(format!("VID common not found for {}", hs_id)))
1660    }
1661
1662    async fn get_vid_common_range(
1663        &self,
1664        from: usize,
1665        until: usize,
1666    ) -> anyhow::Result<Vec<Self::VidCommon>> {
1667        enforce_range(from, until, 500)?;
1668        let timeout = Duration::from_millis(500);
1669        let ds = &*self.data_source;
1670        let stream = ds.get_vid_common_range(from..until).await;
1671        let mut results = Vec::new();
1672        futures::pin_mut!(stream);
1673        let mut i = from;
1674        while let Some(fetch) = stream.next().await {
1675            let item = fetch
1676                .with_timeout(timeout)
1677                .await
1678                .ok_or_else(|| not_found(format!("VID common {} not found", i)))?;
1679            results.push(item);
1680            i += 1;
1681        }
1682        Ok(results)
1683    }
1684
1685    async fn get_transaction_by_position(
1686        &self,
1687        height: u64,
1688        index: u64,
1689    ) -> anyhow::Result<Self::Transaction> {
1690        let ds = &*self.data_source;
1691        let block = ds
1692            .get_block(HsBlockId::Number(height as usize))
1693            .await
1694            .with_timeout(Duration::from_millis(500))
1695            .await
1696            .ok_or_else(|| not_found(format!("block {} not found", height)))?;
1697
1698        let idx = block
1699            .payload()
1700            .nth(block.metadata(), index as usize)
1701            .ok_or_else(|| {
1702                not_found(format!(
1703                    "transaction index {} out of bounds in block {}",
1704                    index, height
1705                ))
1706            })?;
1707        let tx = block
1708            .transaction(&idx)
1709            .ok_or_else(|| not_found(format!("transaction not found at index {}", index)))?;
1710        TransactionQueryData::new(tx, &block, &idx, index)
1711            .ok_or_else(|| anyhow::anyhow!("failed to build transaction query data"))
1712    }
1713
1714    async fn get_transaction_by_hash(&self, hash: String) -> anyhow::Result<Self::Transaction> {
1715        let ds = &*self.data_source;
1716        let tx_hash: hotshot_query_service::availability::TransactionHash<
1717            espresso_types::SeqTypes,
1718        > = hash
1719            .parse()
1720            .map_err(|_| bad_request(format!("invalid transaction hash: {}", hash)))?;
1721        let bwt = ds
1722            .get_block_containing_transaction(tx_hash)
1723            .await
1724            .with_timeout(Duration::from_millis(500))
1725            .await
1726            .ok_or_else(|| not_found("transaction not found"))?;
1727        Ok(bwt.transaction)
1728    }
1729
1730    async fn get_transaction_proof_by_position(
1731        &self,
1732        height: u64,
1733        index: u64,
1734    ) -> anyhow::Result<Self::TransactionWithProof> {
1735        let ds = &*self.data_source;
1736        let timeout = Duration::from_millis(500);
1737
1738        let (block_fetch, vid_fetch) = futures::join!(
1739            ds.get_block(HsBlockId::Number(height as usize)),
1740            ds.get_vid_common(HsBlockId::Number(height as usize))
1741        );
1742        let (block, vid) = futures::join!(
1743            block_fetch.with_timeout(timeout),
1744            vid_fetch.with_timeout(timeout)
1745        );
1746
1747        let block = block.ok_or_else(|| not_found(format!("block {} not found", height)))?;
1748        let vid =
1749            vid.ok_or_else(|| not_found(format!("VID common not found for block {}", height)))?;
1750
1751        let idx = block
1752            .payload()
1753            .nth(block.metadata(), index as usize)
1754            .ok_or_else(|| {
1755                not_found(format!(
1756                    "transaction index {} out of bounds in block {}",
1757                    index, height
1758                ))
1759            })?;
1760        let tx = block
1761            .transaction(&idx)
1762            .ok_or_else(|| not_found(format!("transaction not found at index {}", index)))?;
1763        let tx_data = TransactionQueryData::new(tx, &block, &idx, index)
1764            .ok_or_else(|| anyhow::anyhow!("failed to build transaction query data"))?;
1765        let proof = block
1766            .transaction_proof(&vid, &idx)
1767            .ok_or_else(|| anyhow::anyhow!("failed to build transaction proof"))?;
1768        Ok(TransactionWithProofQueryData::new(tx_data, proof))
1769    }
1770
1771    async fn get_transaction_proof_by_hash(
1772        &self,
1773        hash: String,
1774    ) -> anyhow::Result<Self::TransactionWithProof> {
1775        let ds = &*self.data_source;
1776        let timeout = Duration::from_millis(500);
1777
1778        let tx_hash: hotshot_query_service::availability::TransactionHash<
1779            espresso_types::SeqTypes,
1780        > = hash
1781            .parse()
1782            .map_err(|_| bad_request(format!("invalid transaction hash: {}", hash)))?;
1783        let bwt = ds
1784            .get_block_containing_transaction(tx_hash)
1785            .await
1786            .with_timeout(timeout)
1787            .await
1788            .ok_or_else(|| not_found("transaction not found"))?;
1789
1790        let vid = ds
1791            .get_vid_common(HsBlockId::Number(bwt.block.height() as usize))
1792            .await
1793            .with_timeout(timeout)
1794            .await
1795            .ok_or_else(|| {
1796                not_found(format!(
1797                    "VID common not found for block {}",
1798                    bwt.block.height()
1799                ))
1800            })?;
1801
1802        let proof = bwt
1803            .block
1804            .transaction_proof(&vid, &bwt.index)
1805            .ok_or_else(|| anyhow::anyhow!("failed to build transaction proof"))?;
1806        Ok(TransactionWithProofQueryData::new(bwt.transaction, proof))
1807    }
1808
1809    async fn get_block_summary(&self, height: usize) -> anyhow::Result<Self::BlockSummary> {
1810        let ds = &*self.data_source;
1811        let block = ds
1812            .get_block(HsBlockId::Number(height))
1813            .await
1814            .with_timeout(Duration::from_millis(500))
1815            .await
1816            .ok_or_else(|| not_found(format!("block {} not found", height)))?;
1817        Ok(BlockSummaryQueryData::from(block))
1818    }
1819
1820    async fn get_block_summary_range(
1821        &self,
1822        from: usize,
1823        until: usize,
1824    ) -> anyhow::Result<Vec<Self::BlockSummary>> {
1825        enforce_range(from, until, 100)?;
1826        let timeout = Duration::from_millis(500);
1827        let ds = &*self.data_source;
1828        let stream = ds.get_block_range(from..until).await;
1829        let mut results = Vec::new();
1830        futures::pin_mut!(stream);
1831        let mut i = from;
1832        while let Some(fetch) = stream.next().await {
1833            let block = fetch
1834                .with_timeout(timeout)
1835                .await
1836                .ok_or_else(|| not_found(format!("block {} not found", i)))?;
1837            results.push(BlockSummaryQueryData::from(block));
1838            i += 1;
1839        }
1840        Ok(results)
1841    }
1842
1843    async fn get_limits(&self) -> anyhow::Result<Self::Limits> {
1844        Ok(HsLimits {
1845            small_object_range_limit: 500,
1846            large_object_range_limit: 100,
1847        })
1848    }
1849
1850    async fn get_cert2(&self, height: u64) -> anyhow::Result<Option<Self::Cert2>> {
1851        self.data_source
1852            .get_cert2(height)
1853            .await
1854            .map_err(|e| anyhow::anyhow!("{}", e))
1855    }
1856
1857    async fn stream_leaves(&self, from: usize) -> anyhow::Result<BoxStream<'static, Self::Leaf>> {
1858        let ds = self.data_source.clone();
1859        Ok((*ds).subscribe_leaves(from).await.boxed())
1860    }
1861
1862    async fn stream_headers(
1863        &self,
1864        from: usize,
1865    ) -> anyhow::Result<BoxStream<'static, Self::Header>> {
1866        let ds = self.data_source.clone();
1867        Ok((*ds).subscribe_headers(from).await.boxed())
1868    }
1869
1870    async fn stream_blocks(&self, from: usize) -> anyhow::Result<BoxStream<'static, Self::Block>> {
1871        let ds = self.data_source.clone();
1872        Ok((*ds).subscribe_blocks(from).await.boxed())
1873    }
1874
1875    async fn stream_payloads(
1876        &self,
1877        from: usize,
1878    ) -> anyhow::Result<BoxStream<'static, Self::Payload>> {
1879        let ds = self.data_source.clone();
1880        Ok((*ds).subscribe_payloads(from).await.boxed())
1881    }
1882
1883    async fn stream_vid_common(
1884        &self,
1885        from: usize,
1886    ) -> anyhow::Result<BoxStream<'static, Self::VidCommon>> {
1887        let ds = self.data_source.clone();
1888        Ok((*ds).subscribe_vid_common(from).await.boxed())
1889    }
1890
1891    async fn stream_transactions(
1892        &self,
1893        from: usize,
1894        namespace: Option<u32>,
1895    ) -> anyhow::Result<BoxStream<'static, Self::Transaction>> {
1896        let ds = self.data_source.clone();
1897        let stream = (*ds)
1898            .subscribe_blocks(from)
1899            .await
1900            .flat_map(move |block| {
1901                let ns_filter = namespace.map(NamespaceId::from);
1902                let txs: Vec<Self::Transaction> = block
1903                    .enumerate()
1904                    .enumerate()
1905                    .filter_map(|(position_in_block, (tx_index, _tx))| {
1906                        let tx = block.transaction(&tx_index)?;
1907                        if let Some(ns) = ns_filter
1908                            && tx.namespace() != ns
1909                        {
1910                            return None;
1911                        }
1912                        TransactionQueryData::new(tx, &block, &tx_index, position_in_block as u64)
1913                    })
1914                    .collect();
1915                futures::stream::iter(txs)
1916            })
1917            .boxed();
1918        Ok(stream)
1919    }
1920}
1921
1922fn block_id_to_hs(
1923    id: espresso_api::v1::availability::BlockId,
1924) -> anyhow::Result<HsBlockId<SeqTypes>> {
1925    match id {
1926        espresso_api::v1::availability::BlockId::Height(h) => Ok(HsBlockId::Number(h as usize)),
1927        espresso_api::v1::availability::BlockId::Hash(h) => {
1928            let hash = h
1929                .parse()
1930                .map_err(|_| bad_request(format!("invalid block hash: {}", h)))?;
1931            Ok(HsBlockId::Hash(hash))
1932        },
1933        espresso_api::v1::availability::BlockId::PayloadHash(h) => {
1934            let payload_hash = h
1935                .parse()
1936                .map_err(|_| bad_request(format!("invalid payload hash: {}", h)))?;
1937            Ok(HsBlockId::PayloadHash(payload_hash))
1938        },
1939    }
1940}
1941
1942fn payload_id_to_hs(
1943    id: espresso_api::v1::availability::PayloadId,
1944) -> anyhow::Result<HsBlockId<SeqTypes>> {
1945    match id {
1946        espresso_api::v1::availability::PayloadId::Height(h) => Ok(HsBlockId::Number(h as usize)),
1947        espresso_api::v1::availability::PayloadId::Hash(h) => {
1948            let payload_hash = h
1949                .parse()
1950                .map_err(|_| bad_request(format!("invalid payload hash: {}", h)))?;
1951            Ok(HsBlockId::PayloadHash(payload_hash))
1952        },
1953        espresso_api::v1::availability::PayloadId::BlockHash(h) => {
1954            let hash = h
1955                .parse()
1956                .map_err(|_| bad_request(format!("invalid block hash: {}", h)))?;
1957            Ok(HsBlockId::Hash(hash))
1958        },
1959    }
1960}