1use std::{collections::HashMap, time::Duration};
2
3use alloy::primitives::Address;
4use anyhow::Context;
5use async_trait::async_trait;
6use committable::Commitment;
7use espresso_types::{
8 Certificate2, FeeAccount, FeeAccountProof, FeeMerkleTree, Leaf2, NodeState, PubKey,
9 Transaction,
10 config::PublicNetworkConfig,
11 v0::traits::{PersistenceOptions, SequencerPersistence},
12 v0_3::{
13 AuthenticatedValidator, ChainConfig, RegisteredValidator, RewardAccountProofV1,
14 RewardAccountQueryDataV1, RewardAccountV1, RewardAmount, RewardMerkleTreeV1,
15 StakeTableEvent,
16 },
17 v0_4::{RewardAccountProofV2, RewardAccountQueryDataV2, RewardAccountV2, RewardMerkleTreeV2},
18};
19use futures::future::{BoxFuture, Future};
20use hotshot::types::BLSPubKey;
21use hotshot_query_service::{
22 availability::{AvailabilityDataSource, BlockQueryData, LeafQueryData, VidCommonQueryData},
23 data_source::{UpdateDataSource, VersionedDataSource},
24 fetching::provider::AnyProvider,
25 node::NodeDataSource,
26 status::StatusDataSource,
27};
28use hotshot_types::{
29 PeerConfig,
30 data::{EpochNumber, VidShare, ViewNumber},
31 light_client::LCV3StateSignatureRequestBody,
32 simple_certificate::LightClientStateUpdateCertificateV2,
33 traits::{network::ConnectedNetwork, node_implementation::NodeType},
34};
35use indexmap::IndexMap;
36use light_client::{state::LightClientOptions, storage::LightClientSqliteOptions};
37use serde::{Deserialize, Serialize};
38use tide_disco::Url;
39
40use super::{
41 AccountQueryData, BlocksFrontier, fs,
42 options::{Options, Query},
43 sql,
44};
45use crate::{
46 SeqTypes, U256,
47 api::{ApiState, LightClientProvider},
48 persistence,
49 state_cert::StateCertFetchError,
50};
51
52pub trait DataSourceOptions: PersistenceOptions {
53 type DataSource: SequencerDataSource<Options = Self>;
54
55 fn enable_query_module(&self, opt: Options, query: Query) -> Options;
56}
57
58impl DataSourceOptions for persistence::sql::Options {
59 type DataSource = sql::DataSource;
60
61 fn enable_query_module(&self, opt: Options, query: Query) -> Options {
62 opt.query_sql(query, self.clone())
63 }
64}
65
66impl DataSourceOptions for persistence::fs::Options {
67 type DataSource = fs::DataSource;
68
69 fn enable_query_module(&self, opt: Options, query: Query) -> Options {
70 opt.query_fs(query, self.clone())
71 }
72}
73
74#[async_trait]
79pub trait SequencerDataSource:
80 AvailabilityDataSource<SeqTypes>
81 + NodeDataSource<SeqTypes>
82 + StatusDataSource
83 + UpdateDataSource<SeqTypes>
84 + VersionedDataSource
85 + Sized
86{
87 type Options: DataSourceOptions<DataSource = Self>;
88
89 async fn create(opt: Self::Options, provider: Provider, reset: bool) -> anyhow::Result<Self>;
91}
92
93pub type Provider = AnyProvider<SeqTypes>;
95
96pub(super) async fn provider<N, P>(
98 peers: impl IntoIterator<Item = Url>,
99 state: &ApiState<N, P>,
100 opt: LightClientOptions,
101 db_opt: LightClientSqliteOptions,
102) -> anyhow::Result<Provider>
103where
104 N: ConnectedNetwork<PubKey>,
105 P: SequencerPersistence,
106{
107 Ok(Provider::default()
108 .with_provider(LightClientProvider::new(peers, state.clone(), opt, db_opt).await?))
109}
110
111pub(crate) trait SubmitDataSource<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> {
112 fn submit(&self, tx: Transaction) -> impl Send + Future<Output = anyhow::Result<()>>;
113}
114
115pub(crate) trait HotShotConfigDataSource {
116 fn get_config(&self) -> impl Send + Future<Output = PublicNetworkConfig>;
117}
118
119#[async_trait]
120pub(crate) trait StateSignatureDataSource<N: ConnectedNetwork<PubKey>> {
121 async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody>;
122}
123
124pub(crate) trait NodeStateDataSource {
125 fn node_state(&self) -> impl Send + Future<Output = NodeState>;
126}
127
128pub(crate) trait TokenDataSource<T: NodeType> {
129 fn get_initial_supply_l1(&self) -> impl Send + Future<Output = anyhow::Result<U256>>;
130 fn get_total_supply_l1(&self) -> impl Send + Future<Output = anyhow::Result<U256>>;
131 fn get_decided_header(&self) -> impl Send + Future<Output = espresso_types::Header>;
132}
133
134#[derive(Serialize, Deserialize)]
135#[serde(bound = "T: NodeType")]
136pub struct StakeTableWithEpochNumber<T: NodeType> {
137 pub epoch: Option<EpochNumber>,
138 pub stake_table: Vec<PeerConfig<T>>,
139}
140
141pub(crate) trait StakeTableDataSource<T: NodeType> {
142 fn get_stake_table(
144 &self,
145 epoch: Option<EpochNumber>,
146 ) -> impl Send + Future<Output = anyhow::Result<Vec<PeerConfig<T>>>>;
147
148 fn get_stake_table_current(
150 &self,
151 ) -> impl Send + Future<Output = anyhow::Result<StakeTableWithEpochNumber<T>>>;
152
153 fn get_da_stake_table(
155 &self,
156 epoch: Option<EpochNumber>,
157 ) -> impl Send + Future<Output = anyhow::Result<Vec<PeerConfig<T>>>>;
158
159 fn get_da_stake_table_current(
161 &self,
162 ) -> impl Send + Future<Output = anyhow::Result<StakeTableWithEpochNumber<T>>>;
163
164 fn get_validators(
166 &self,
167 epoch: EpochNumber,
168 ) -> impl Send + Future<Output = anyhow::Result<IndexMap<Address, AuthenticatedValidator<BLSPubKey>>>>;
169
170 fn get_block_reward(
171 &self,
172 epoch: Option<EpochNumber>,
173 ) -> impl Send + Future<Output = anyhow::Result<Option<RewardAmount>>>;
174 fn current_proposal_participation(
176 &self,
177 ) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>>;
178
179 fn proposal_participation(
181 &self,
182 epoch: EpochNumber,
183 ) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>>;
184 fn current_vote_participation(&self) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>>;
186
187 fn vote_participation(
189 &self,
190 epoch: EpochNumber,
191 ) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>>;
192
193 fn get_all_validators(
194 &self,
195 epoch: EpochNumber,
196 offset: u64,
197 limit: u64,
198 ) -> impl Send + Future<Output = anyhow::Result<Vec<RegisteredValidator<PubKey>>>>;
199
200 fn stake_table_events(
202 &self,
203 from_l1_block: u64,
204 to_l1_block: u64,
205 ) -> impl Send + Future<Output = anyhow::Result<Vec<StakeTableEvent>>>;
206}
207
208#[async_trait]
210pub(crate) trait StateCertDataSource {
211 async fn get_state_cert_by_epoch(
212 &self,
213 epoch: u64,
214 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
215
216 async fn insert_state_cert(
217 &self,
218 epoch: u64,
219 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
220 ) -> anyhow::Result<()>;
221}
222
223pub(crate) trait CatchupDataSource: Sync {
224 fn get_account(
231 &self,
232 instance: &NodeState,
233 height: u64,
234 view: ViewNumber,
235 account: FeeAccount,
236 ) -> impl Send + Future<Output = anyhow::Result<AccountQueryData>> {
237 async move {
238 let tree = self
239 .get_accounts(instance, height, view, &[account])
240 .await?;
241 let (proof, balance) = FeeAccountProof::prove(&tree, account.into()).context(
242 format!("account {account} not available for height {height}, view {view}"),
243 )?;
244 Ok(AccountQueryData { balance, proof })
245 }
246 }
247
248 fn get_accounts(
255 &self,
256 instance: &NodeState,
257 height: u64,
258 view: ViewNumber,
259 accounts: &[FeeAccount],
260 ) -> impl Send + Future<Output = anyhow::Result<FeeMerkleTree>>;
261
262 fn get_frontier(
269 &self,
270 instance: &NodeState,
271 height: u64,
272 view: ViewNumber,
273 ) -> impl Send + Future<Output = anyhow::Result<BlocksFrontier>>;
274
275 fn get_chain_config(
276 &self,
277 commitment: Commitment<ChainConfig>,
278 ) -> impl Send + Future<Output = anyhow::Result<ChainConfig>>;
279
280 fn get_leaf_chain(
281 &self,
282 height: u64,
283 ) -> impl Send + Future<Output = anyhow::Result<Vec<Leaf2>>>;
284
285 fn get_cert2(
289 &self,
290 _height: u64,
291 ) -> impl Send + Future<Output = anyhow::Result<Option<Certificate2<SeqTypes>>>> {
292 async { Ok(None) }
293 }
294
295 fn get_reward_account_v2(
302 &self,
303 instance: &NodeState,
304 height: u64,
305 view: ViewNumber,
306 account: RewardAccountV2,
307 ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV2>> {
308 async move {
309 let tree = self
310 .get_reward_accounts_v2(instance, height, view, &[account])
311 .await?;
312 let (proof, balance) = RewardAccountProofV2::prove(&tree, account.into()).context(
313 format!("reward account {account} not available for height {height}, view {view}"),
314 )?;
315 Ok(RewardAccountQueryDataV2 { balance, proof })
316 }
317 }
318
319 fn get_reward_accounts_v2(
320 &self,
321 instance: &NodeState,
322 height: u64,
323 view: ViewNumber,
324 accounts: &[RewardAccountV2],
325 ) -> impl Send + Future<Output = anyhow::Result<RewardMerkleTreeV2>>;
326
327 fn get_reward_account_v1(
328 &self,
329 instance: &NodeState,
330 height: u64,
331 view: ViewNumber,
332 account: RewardAccountV1,
333 ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV1>> {
334 async move {
335 let tree = self
336 .get_reward_accounts_v1(instance, height, view, &[account])
337 .await?;
338 let (proof, balance) = RewardAccountProofV1::prove(&tree, account.into()).context(
339 format!("reward account {account} not available for height {height}, view {view}"),
340 )?;
341 Ok(RewardAccountQueryDataV1 { balance, proof })
342 }
343 }
344
345 fn get_reward_accounts_v1(
346 &self,
347 instance: &NodeState,
348 height: u64,
349 view: ViewNumber,
350 accounts: &[RewardAccountV1],
351 ) -> impl Send + Future<Output = anyhow::Result<RewardMerkleTreeV1>>;
352
353 fn get_reward_merkle_tree_v2(
354 &self,
355 height: u64,
356 view: ViewNumber,
357 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>>;
358
359 fn get_state_cert(
360 &self,
361 epoch: u64,
362 ) -> impl Send + Future<Output = anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>>>;
363}
364
365pub trait RequestResponseDataSource<Types: NodeType> {
366 fn request_vid_shares(
367 &self,
368 block_number: u64,
369 vid_common_data: VidCommonQueryData<Types>,
370 duration: Duration,
371 ) -> impl Future<Output = BoxFuture<'static, anyhow::Result<Vec<VidShare>>>> + Send;
372}
373
374#[async_trait]
375pub trait StateCertFetchingDataSource<Types: NodeType> {
376 async fn request_state_cert(
377 &self,
378 epoch: u64,
379 timeout: Duration,
380 ) -> Result<LightClientStateUpdateCertificateV2<Types>, StateCertFetchError>;
381}
382
383#[derive(Serialize, Deserialize, Clone, Debug)]
385pub struct TableSize {
386 pub table_name: String,
387 pub row_count: i64,
388 pub total_size_bytes: Option<i64>,
389}
390
391pub(crate) trait DatabaseMetadataSource {
395 fn get_table_sizes(&self) -> impl Send + Future<Output = anyhow::Result<Vec<TableSize>>>;
397}
398
399use std::sync::Arc;
407
408#[async_trait]
409impl<D> StateCertDataSource for Arc<D>
410where
411 D: StateCertDataSource + Sync + Send,
412{
413 async fn get_state_cert_by_epoch(
414 &self,
415 epoch: u64,
416 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
417 (*self).get_state_cert_by_epoch(epoch).await
418 }
419
420 async fn insert_state_cert(
421 &self,
422 epoch: u64,
423 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
424 ) -> anyhow::Result<()> {
425 (*self).insert_state_cert(epoch, cert).await
426 }
427}
428
429impl<Types, D> RequestResponseDataSource<Types> for Arc<D>
430where
431 Types: NodeType,
432 D: RequestResponseDataSource<Types> + Send + Sync,
433{
434 async fn request_vid_shares(
435 &self,
436 block_number: u64,
437 vid_common_data: VidCommonQueryData<Types>,
438 timeout_duration: Duration,
439 ) -> BoxFuture<'static, anyhow::Result<Vec<VidShare>>> {
440 self.as_ref()
441 .request_vid_shares(block_number, vid_common_data, timeout_duration)
442 .await
443 }
444}
445
446#[async_trait]
447impl<Types, D> StateCertFetchingDataSource<Types> for Arc<D>
448where
449 Types: NodeType,
450 D: StateCertFetchingDataSource<Types> + Sync + Send,
451{
452 async fn request_state_cert(
453 &self,
454 epoch: u64,
455 timeout: Duration,
456 ) -> Result<LightClientStateUpdateCertificateV2<Types>, StateCertFetchError> {
457 (*self).request_state_cert(epoch, timeout).await
458 }
459}
460
461#[async_trait]
462impl<T, D> StakeTableDataSource<T> for Arc<D>
463where
464 T: NodeType,
465 D: StakeTableDataSource<T> + Sync + Send,
466{
467 fn get_stake_table(
468 &self,
469 epoch: Option<EpochNumber>,
470 ) -> impl Send + Future<Output = anyhow::Result<Vec<PeerConfig<T>>>> {
471 let this = self.clone();
472 async move { (*this).get_stake_table(epoch).await }
473 }
474
475 fn get_stake_table_current(
476 &self,
477 ) -> impl Send + Future<Output = anyhow::Result<StakeTableWithEpochNumber<T>>> {
478 let this = self.clone();
479 async move { (*this).get_stake_table_current().await }
480 }
481
482 fn get_da_stake_table(
483 &self,
484 epoch: Option<EpochNumber>,
485 ) -> impl Send + Future<Output = anyhow::Result<Vec<PeerConfig<T>>>> {
486 let this = self.clone();
487 async move { (*this).get_da_stake_table(epoch).await }
488 }
489
490 fn get_da_stake_table_current(
491 &self,
492 ) -> impl Send + Future<Output = anyhow::Result<StakeTableWithEpochNumber<T>>> {
493 let this = self.clone();
494 async move { (*this).get_da_stake_table_current().await }
495 }
496
497 fn get_validators(
498 &self,
499 epoch: EpochNumber,
500 ) -> impl Send + Future<Output = anyhow::Result<IndexMap<Address, AuthenticatedValidator<BLSPubKey>>>>
501 {
502 let this = self.clone();
503 async move { (*this).get_validators(epoch).await }
504 }
505
506 fn get_block_reward(
507 &self,
508 epoch: Option<EpochNumber>,
509 ) -> impl Send + Future<Output = anyhow::Result<Option<RewardAmount>>> {
510 let this = self.clone();
511 async move { (*this).get_block_reward(epoch).await }
512 }
513
514 fn current_proposal_participation(
515 &self,
516 ) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>> {
517 let this = self.clone();
518 async move { (*this).current_proposal_participation().await }
519 }
520
521 fn proposal_participation(
522 &self,
523 epoch: EpochNumber,
524 ) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>> {
525 let this = self.clone();
526 async move { (*this).proposal_participation(epoch).await }
527 }
528
529 fn current_vote_participation(&self) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>> {
530 let this = self.clone();
531 async move { (*this).current_vote_participation().await }
532 }
533
534 fn vote_participation(
535 &self,
536 epoch: EpochNumber,
537 ) -> impl Send + Future<Output = HashMap<BLSPubKey, f64>> {
538 let this = self.clone();
539 async move { (*this).vote_participation(epoch).await }
540 }
541
542 fn get_all_validators(
543 &self,
544 epoch: EpochNumber,
545 offset: u64,
546 limit: u64,
547 ) -> impl Send + Future<Output = anyhow::Result<Vec<RegisteredValidator<PubKey>>>> {
548 let this = self.clone();
549 async move { (*this).get_all_validators(epoch, offset, limit).await }
550 }
551
552 fn stake_table_events(
553 &self,
554 from_l1_block: u64,
555 to_l1_block: u64,
556 ) -> impl Send + Future<Output = anyhow::Result<Vec<StakeTableEvent>>> {
557 let this = self.clone();
558 async move { (*this).stake_table_events(from_l1_block, to_l1_block).await }
559 }
560}
561
562pub(crate) trait PruningDataSource {
567 fn get_oldest_block(
569 &self,
570 ) -> impl Send + Future<Output = anyhow::Result<Option<BlockQueryData<SeqTypes>>>>;
571
572 fn get_oldest_leaf(
574 &self,
575 ) -> impl Send + Future<Output = anyhow::Result<Option<LeafQueryData<SeqTypes>>>>;
576}
577
578#[cfg(any(test, feature = "testing"))]
579pub mod testing {
580 use super::{super::Options, *};
581
582 #[async_trait]
583 pub trait TestableSequencerDataSource: SequencerDataSource {
584 type Storage: Sync;
585
586 async fn create_storage() -> Self::Storage;
587 fn persistence_options(storage: &Self::Storage) -> Self::Options;
588 fn leaf_only_ds_options(
589 _storage: &Self::Storage,
590 _opt: Options,
591 ) -> anyhow::Result<Options> {
592 anyhow::bail!("not supported")
593 }
594 fn options(storage: &Self::Storage, opt: Options) -> Options;
595 }
596}