Skip to main content

espresso_node/api/
data_source.rs

1use std::{collections::HashMap, time::Duration};
2
3use alloy::primitives::Address;
4use anyhow::Context;
5use async_trait::async_trait;
6use committable::Commitment;
7use espresso_types::{
8    Certificate2, FeeAccount, FeeAccountProof, FeeMerkleTree, Leaf2, NodeState, PubKey,
9    Transaction,
10    config::PublicNetworkConfig,
11    v0::traits::{PersistenceOptions, SequencerPersistence},
12    v0_3::{
13        AuthenticatedValidator, ChainConfig, RegisteredValidator, RewardAccountProofV1,
14        RewardAccountQueryDataV1, RewardAccountV1, RewardAmount, RewardMerkleTreeV1,
15        StakeTableEvent,
16    },
17    v0_4::{RewardAccountProofV2, RewardAccountQueryDataV2, RewardAccountV2, RewardMerkleTreeV2},
18};
19use futures::future::{BoxFuture, Future};
20use hotshot::types::BLSPubKey;
21use hotshot_query_service::{
22    availability::{AvailabilityDataSource, BlockQueryData, LeafQueryData, VidCommonQueryData},
23    data_source::{UpdateDataSource, VersionedDataSource},
24    fetching::provider::AnyProvider,
25    node::NodeDataSource,
26    status::StatusDataSource,
27};
28use hotshot_types::{
29    PeerConfig,
30    data::{EpochNumber, VidShare, ViewNumber},
31    light_client::LCV3StateSignatureRequestBody,
32    simple_certificate::LightClientStateUpdateCertificateV2,
33    traits::{network::ConnectedNetwork, node_implementation::NodeType},
34};
35use indexmap::IndexMap;
36use light_client::{state::LightClientOptions, storage::LightClientSqliteOptions};
37use serde::{Deserialize, Serialize};
38use tide_disco::Url;
39
40use super::{
41    AccountQueryData, BlocksFrontier, fs,
42    options::{Options, Query},
43    sql,
44};
45use crate::{
46    SeqTypes, U256,
47    api::{ApiState, LightClientProvider},
48    persistence,
49    state_cert::StateCertFetchError,
50};
51
52pub trait DataSourceOptions: PersistenceOptions {
53    type DataSource: SequencerDataSource<Options = Self>;
54
55    fn enable_query_module(&self, opt: Options, query: Query) -> Options;
56}
57
58impl DataSourceOptions for persistence::sql::Options {
59    type DataSource = sql::DataSource;
60
61    fn enable_query_module(&self, opt: Options, query: Query) -> Options {
62        opt.query_sql(query, self.clone())
63    }
64}
65
66impl DataSourceOptions for persistence::fs::Options {
67    type DataSource = fs::DataSource;
68
69    fn enable_query_module(&self, opt: Options, query: Query) -> Options {
70        opt.query_fs(query, self.clone())
71    }
72}
73
74/// A data source with sequencer-specific functionality.
75///
76/// This trait extends the generic [`AvailabilityDataSource`] with some additional data needed to
77/// provided sequencer-specific endpoints.
78#[async_trait]
79pub trait SequencerDataSource:
80    AvailabilityDataSource<SeqTypes>
81    + NodeDataSource<SeqTypes>
82    + StatusDataSource
83    + UpdateDataSource<SeqTypes>
84    + VersionedDataSource
85    + Sized
86{
87    type Options: DataSourceOptions<DataSource = Self>;
88
89    /// Instantiate a data source from command line options.
90    async fn create(opt: Self::Options, provider: Provider, reset: bool) -> anyhow::Result<Self>;
91}
92
93/// Provider for fetching missing data for the query service.
94pub type Provider = AnyProvider<SeqTypes>;
95
96/// Create a provider for fetching missing data from a list of peer query services.
97pub(super) async fn provider<N, P>(
98    peers: impl IntoIterator<Item = Url>,
99    state: &ApiState<N, P>,
100    opt: LightClientOptions,
101    db_opt: LightClientSqliteOptions,
102) -> anyhow::Result<Provider>
103where
104    N: ConnectedNetwork<PubKey>,
105    P: SequencerPersistence,
106{
107    Ok(Provider::default()
108        .with_provider(LightClientProvider::new(peers, state.clone(), opt, db_opt).await?))
109}
110
111pub(crate) trait SubmitDataSource<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> {
112    fn submit(&self, tx: Transaction) -> impl Send + Future<Output = anyhow::Result<()>>;
113}
114
115pub(crate) trait HotShotConfigDataSource {
116    fn get_config(&self) -> impl Send + Future<Output = PublicNetworkConfig>;
117}
118
119#[async_trait]
120pub(crate) trait StateSignatureDataSource<N: ConnectedNetwork<PubKey>> {
121    async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody>;
122}
123
124pub(crate) trait NodeStateDataSource {
125    fn node_state(&self) -> impl Send + Future<Output = NodeState>;
126}
127
128pub(crate) trait TokenDataSource<T: NodeType> {
129    fn get_initial_supply_l1(&self) -> impl Send + Future<Output = anyhow::Result<U256>>;
130    fn get_total_supply_l1(&self) -> impl Send + Future<Output = anyhow::Result<U256>>;
131    fn get_decided_header(&self) -> impl Send + Future<Output = espresso_types::Header>;
132}
133
134#[derive(Serialize, Deserialize)]
135#[serde(bound = "T: NodeType")]
136pub struct StakeTableWithEpochNumber<T: NodeType> {
137    pub epoch: Option<EpochNumber>,
138    pub stake_table: Vec<PeerConfig<T>>,
139}
140
141pub(crate) trait StakeTableDataSource<T: NodeType> {
142    /// Get the stake table for a given epoch
143    fn get_stake_table(
144        &self,
145        epoch: Option<EpochNumber>,
146    ) -> impl Send + Future<Output = anyhow::Result<Vec<PeerConfig<T>>>>;
147
148    /// Get the stake table for the current epoch if not provided
149    fn get_stake_table_current(
150        &self,
151    ) -> impl Send + Future<Output = anyhow::Result<StakeTableWithEpochNumber<T>>>;
152
153    /// Get the DA stake table for a given epoch
154    fn get_da_stake_table(
155        &self,
156        epoch: Option<EpochNumber>,
157    ) -> impl Send + Future<Output = anyhow::Result<Vec<PeerConfig<T>>>>;
158
159    /// Get the DA stake table for the current epoch if not provided
160    fn get_da_stake_table_current(
161        &self,
162    ) -> impl Send + Future<Output = anyhow::Result<StakeTableWithEpochNumber<T>>>;
163
164    /// Get all the validators
165    fn get_validators(
166        &self,
167        epoch: EpochNumber,
168    ) -> impl Send + Future<Output = anyhow::Result<IndexMap<Address, AuthenticatedValidator<BLSPubKey>>>>;
169
170    fn get_block_reward(
171        &self,
172        epoch: Option<EpochNumber>,
173    ) -> impl Send + Future<Output = anyhow::Result<Option<RewardAmount>>>;
174    /// Get the current proposal participation.
175    fn current_proposal_participation(
176        &self,
177    ) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>>;
178
179    /// Get the proposal participation for a given epoch.
180    fn proposal_participation(
181        &self,
182        epoch: EpochNumber,
183    ) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>>;
184    /// Get the current vote participation.
185    fn current_vote_participation(&self) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>>;
186
187    /// Get the vote participation for a given epoch.
188    fn vote_participation(
189        &self,
190        epoch: EpochNumber,
191    ) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>>;
192
193    fn get_all_validators(
194        &self,
195        epoch: EpochNumber,
196        offset: u64,
197        limit: u64,
198    ) -> impl Send + Future<Output = anyhow::Result<Vec<RegisteredValidator<PubKey>>>>;
199
200    /// Get stake table events from L1 blocks `from_l1_block..=to_l1_block`.
201    fn stake_table_events(
202        &self,
203        from_l1_block: u64,
204        to_l1_block: u64,
205    ) -> impl Send + Future<Output = anyhow::Result<Vec<StakeTableEvent>>>;
206}
207
208// Thin wrapper trait to access persistence methods from API handlers
209#[async_trait]
210pub(crate) trait StateCertDataSource {
211    async fn get_state_cert_by_epoch(
212        &self,
213        epoch: u64,
214    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
215
216    async fn insert_state_cert(
217        &self,
218        epoch: u64,
219        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
220    ) -> anyhow::Result<()>;
221}
222
223pub(crate) trait CatchupDataSource: Sync {
224    /// Get the state of the requested `account`.
225    ///
226    /// The state is fetched from a snapshot at the given height and view, which _must_ correspond!
227    /// `height` is provided to simplify lookups for backends where data is not indexed by view.
228    /// This function is intended to be used for catchup, so `view` should be no older than the last
229    /// decided view.
230    fn get_account(
231        &self,
232        instance: &NodeState,
233        height: u64,
234        view: ViewNumber,
235        account: FeeAccount,
236    ) -> impl Send + Future<Output = anyhow::Result<AccountQueryData>> {
237        async move {
238            let tree = self
239                .get_accounts(instance, height, view, &[account])
240                .await?;
241            let (proof, balance) = FeeAccountProof::prove(&tree, account.into()).context(
242                format!("account {account} not available for height {height}, view {view}"),
243            )?;
244            Ok(AccountQueryData { balance, proof })
245        }
246    }
247
248    /// Get the state of the requested `accounts`.
249    ///
250    /// The state is fetched from a snapshot at the given height and view, which _must_ correspond!
251    /// `height` is provided to simplify lookups for backends where data is not indexed by view.
252    /// This function is intended to be used for catchup, so `view` should be no older than the last
253    /// decided view.
254    fn get_accounts(
255        &self,
256        instance: &NodeState,
257        height: u64,
258        view: ViewNumber,
259        accounts: &[FeeAccount],
260    ) -> impl Send + Future<Output = anyhow::Result<FeeMerkleTree>>;
261
262    /// Get the blocks Merkle tree frontier.
263    ///
264    /// The state is fetched from a snapshot at the given height and view, which _must_ correspond!
265    /// `height` is provided to simplify lookups for backends where data is not indexed by view.
266    /// This function is intended to be used for catchup, so `view` should be no older than the last
267    /// decided view.
268    fn get_frontier(
269        &self,
270        instance: &NodeState,
271        height: u64,
272        view: ViewNumber,
273    ) -> impl Send + Future<Output = anyhow::Result<BlocksFrontier>>;
274
275    fn get_chain_config(
276        &self,
277        commitment: Commitment<ChainConfig>,
278    ) -> impl Send + Future<Output = anyhow::Result<ChainConfig>>;
279
280    fn get_leaf_chain(
281        &self,
282        height: u64,
283    ) -> impl Send + Future<Output = anyhow::Result<Vec<Leaf2>>>;
284
285    /// Load the earliest cert2 whose finalized block height is at or above `height`.
286    ///
287    /// Returns `None` when no cert2 height >= `height` is locally available
288    fn get_cert2(
289        &self,
290        _height: u64,
291    ) -> impl Send + Future<Output = anyhow::Result<Option<Certificate2<SeqTypes>>>> {
292        async { Ok(None) }
293    }
294
295    /// Get the state of the requested `account`.
296    ///
297    /// The state is fetched from a snapshot at the given height and view, which _must_ correspond!
298    /// `height` is provided to simplify lookups for backends where data is not indexed by view.
299    /// This function is intended to be used for catchup, so `view` should be no older than the last
300    /// decided view.
301    fn get_reward_account_v2(
302        &self,
303        instance: &NodeState,
304        height: u64,
305        view: ViewNumber,
306        account: RewardAccountV2,
307    ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV2>> {
308        async move {
309            let tree = self
310                .get_reward_accounts_v2(instance, height, view, &[account])
311                .await?;
312            let (proof, balance) = RewardAccountProofV2::prove(&tree, account.into()).context(
313                format!("reward account {account} not available for height {height}, view {view}"),
314            )?;
315            Ok(RewardAccountQueryDataV2 { balance, proof })
316        }
317    }
318
319    fn get_reward_accounts_v2(
320        &self,
321        instance: &NodeState,
322        height: u64,
323        view: ViewNumber,
324        accounts: &[RewardAccountV2],
325    ) -> impl Send + Future<Output = anyhow::Result<RewardMerkleTreeV2>>;
326
327    fn get_reward_account_v1(
328        &self,
329        instance: &NodeState,
330        height: u64,
331        view: ViewNumber,
332        account: RewardAccountV1,
333    ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV1>> {
334        async move {
335            let tree = self
336                .get_reward_accounts_v1(instance, height, view, &[account])
337                .await?;
338            let (proof, balance) = RewardAccountProofV1::prove(&tree, account.into()).context(
339                format!("reward account {account} not available for height {height}, view {view}"),
340            )?;
341            Ok(RewardAccountQueryDataV1 { balance, proof })
342        }
343    }
344
345    fn get_reward_accounts_v1(
346        &self,
347        instance: &NodeState,
348        height: u64,
349        view: ViewNumber,
350        accounts: &[RewardAccountV1],
351    ) -> impl Send + Future<Output = anyhow::Result<RewardMerkleTreeV1>>;
352
353    fn get_reward_merkle_tree_v2(
354        &self,
355        height: u64,
356        view: ViewNumber,
357    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>>;
358
359    fn get_state_cert(
360        &self,
361        epoch: u64,
362    ) -> impl Send + Future<Output = anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>>>;
363}
364
365pub trait RequestResponseDataSource<Types: NodeType> {
366    fn request_vid_shares(
367        &self,
368        block_number: u64,
369        vid_common_data: VidCommonQueryData<Types>,
370        duration: Duration,
371    ) -> impl Future<Output = BoxFuture<'static, anyhow::Result<Vec<VidShare>>>> + Send;
372}
373
374#[async_trait]
375pub trait StateCertFetchingDataSource<Types: NodeType> {
376    async fn request_state_cert(
377        &self,
378        epoch: u64,
379        timeout: Duration,
380    ) -> Result<LightClientStateUpdateCertificateV2<Types>, StateCertFetchError>;
381}
382
383/// Database table size information.
384#[derive(Serialize, Deserialize, Clone, Debug)]
385pub struct TableSize {
386    pub table_name: String,
387    pub row_count: i64,
388    pub total_size_bytes: Option<i64>,
389}
390
391/// Data source for database metadata and statistics.
392///
393/// This trait is only implemented by SQL-based storage backends (PostgreSQL and SQLite).
394pub(crate) trait DatabaseMetadataSource {
395    /// Get the sizes of all tables in the database.
396    fn get_table_sizes(&self) -> impl Send + Future<Output = anyhow::Result<Vec<TableSize>>>;
397}
398
399// ============================================================================
400// Arc delegation implementations
401// ============================================================================
402// These implementations allow Arc<T> to implement the data source traits
403// when T implements them, which is necessary for NodeApiStateImpl to work
404// with Arc-wrapped data sources.
405
406use std::sync::Arc;
407
408#[async_trait]
409impl<D> StateCertDataSource for Arc<D>
410where
411    D: StateCertDataSource + Sync + Send,
412{
413    async fn get_state_cert_by_epoch(
414        &self,
415        epoch: u64,
416    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
417        (*self).get_state_cert_by_epoch(epoch).await
418    }
419
420    async fn insert_state_cert(
421        &self,
422        epoch: u64,
423        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
424    ) -> anyhow::Result<()> {
425        (*self).insert_state_cert(epoch, cert).await
426    }
427}
428
429impl<Types, D> RequestResponseDataSource<Types> for Arc<D>
430where
431    Types: NodeType,
432    D: RequestResponseDataSource<Types> + Send + Sync,
433{
434    async fn request_vid_shares(
435        &self,
436        block_number: u64,
437        vid_common_data: VidCommonQueryData<Types>,
438        timeout_duration: Duration,
439    ) -> BoxFuture<'static, anyhow::Result<Vec<VidShare>>> {
440        self.as_ref()
441            .request_vid_shares(block_number, vid_common_data, timeout_duration)
442            .await
443    }
444}
445
446#[async_trait]
447impl<Types, D> StateCertFetchingDataSource<Types> for Arc<D>
448where
449    Types: NodeType,
450    D: StateCertFetchingDataSource<Types> + Sync + Send,
451{
452    async fn request_state_cert(
453        &self,
454        epoch: u64,
455        timeout: Duration,
456    ) -> Result<LightClientStateUpdateCertificateV2<Types>, StateCertFetchError> {
457        (*self).request_state_cert(epoch, timeout).await
458    }
459}
460
461#[async_trait]
462impl<T, D> StakeTableDataSource<T> for Arc<D>
463where
464    T: NodeType,
465    D: StakeTableDataSource<T> + Sync + Send,
466{
467    fn get_stake_table(
468        &self,
469        epoch: Option<EpochNumber>,
470    ) -> impl Send + Future<Output = anyhow::Result<Vec<PeerConfig<T>>>> {
471        let this = self.clone();
472        async move { (*this).get_stake_table(epoch).await }
473    }
474
475    fn get_stake_table_current(
476        &self,
477    ) -> impl Send + Future<Output = anyhow::Result<StakeTableWithEpochNumber<T>>> {
478        let this = self.clone();
479        async move { (*this).get_stake_table_current().await }
480    }
481
482    fn get_da_stake_table(
483        &self,
484        epoch: Option<EpochNumber>,
485    ) -> impl Send + Future<Output = anyhow::Result<Vec<PeerConfig<T>>>> {
486        let this = self.clone();
487        async move { (*this).get_da_stake_table(epoch).await }
488    }
489
490    fn get_da_stake_table_current(
491        &self,
492    ) -> impl Send + Future<Output = anyhow::Result<StakeTableWithEpochNumber<T>>> {
493        let this = self.clone();
494        async move { (*this).get_da_stake_table_current().await }
495    }
496
497    fn get_validators(
498        &self,
499        epoch: EpochNumber,
500    ) -> impl Send + Future<Output = anyhow::Result<IndexMap<Address, AuthenticatedValidator<BLSPubKey>>>>
501    {
502        let this = self.clone();
503        async move { (*this).get_validators(epoch).await }
504    }
505
506    fn get_block_reward(
507        &self,
508        epoch: Option<EpochNumber>,
509    ) -> impl Send + Future<Output = anyhow::Result<Option<RewardAmount>>> {
510        let this = self.clone();
511        async move { (*this).get_block_reward(epoch).await }
512    }
513
514    fn current_proposal_participation(
515        &self,
516    ) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>> {
517        let this = self.clone();
518        async move { (*this).current_proposal_participation().await }
519    }
520
521    fn proposal_participation(
522        &self,
523        epoch: EpochNumber,
524    ) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>> {
525        let this = self.clone();
526        async move { (*this).proposal_participation(epoch).await }
527    }
528
529    fn current_vote_participation(&self) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>> {
530        let this = self.clone();
531        async move { (*this).current_vote_participation().await }
532    }
533
534    fn vote_participation(
535        &self,
536        epoch: EpochNumber,
537    ) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>> {
538        let this = self.clone();
539        async move { (*this).vote_participation(epoch).await }
540    }
541
542    fn get_all_validators(
543        &self,
544        epoch: EpochNumber,
545        offset: u64,
546        limit: u64,
547    ) -> impl Send + Future<Output = anyhow::Result<Vec<RegisteredValidator<PubKey>>>> {
548        let this = self.clone();
549        async move { (*this).get_all_validators(epoch, offset, limit).await }
550    }
551
552    fn stake_table_events(
553        &self,
554        from_l1_block: u64,
555        to_l1_block: u64,
556    ) -> impl Send + Future<Output = anyhow::Result<Vec<StakeTableEvent>>> {
557        let this = self.clone();
558        async move { (*this).stake_table_events(from_l1_block, to_l1_block).await }
559    }
560}
561
562/// Data source for pruning state: the oldest retained block and leaf.
563///
564/// SQL backends return the actual oldest entry; the filesystem backend always returns `None`
565/// since it does not prune.
566pub(crate) trait PruningDataSource {
567    /// Get the oldest block in storage, or `None` if empty or unsupported.
568    fn get_oldest_block(
569        &self,
570    ) -> impl Send + Future<Output = anyhow::Result<Option<BlockQueryData<SeqTypes>>>>;
571
572    /// Get the oldest leaf in storage, or `None` if empty or unsupported.
573    fn get_oldest_leaf(
574        &self,
575    ) -> impl Send + Future<Output = anyhow::Result<Option<LeafQueryData<SeqTypes>>>>;
576}
577
578#[cfg(any(test, feature = "testing"))]
579pub mod testing {
580    use super::{super::Options, *};
581
582    #[async_trait]
583    pub trait TestableSequencerDataSource: SequencerDataSource {
584        type Storage: Sync;
585
586        async fn create_storage() -> Self::Storage;
587        fn persistence_options(storage: &Self::Storage) -> Self::Options;
588        fn leaf_only_ds_options(
589            _storage: &Self::Storage,
590            _opt: Options,
591        ) -> anyhow::Result<Options> {
592            anyhow::bail!("not supported")
593        }
594        fn options(storage: &Self::Storage, opt: Options) -> Options;
595    }
596}