espresso_node/
api.rs

1use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};
2
3use alloy::primitives::U256;
4use anyhow::{Context, bail, ensure};
5use async_lock::RwLock;
6use async_once_cell::Lazy;
7use async_trait::async_trait;
8use committable::Commitment;
9use data_source::{
10    CatchupDataSource, RequestResponseDataSource, StakeTableDataSource, StakeTableWithEpochNumber,
11    StateCertDataSource, StateCertFetchingDataSource, SubmitDataSource,
12};
13use derivative::Derivative;
14use espresso_types::{
15    AccountQueryData, AuthenticatedValidatorMap, BlockMerkleTree, FeeAccount, FeeMerkleTree, Leaf2,
16    NodeState, PubKey, Transaction,
17    config::PublicNetworkConfig,
18    retain_accounts,
19    traits::EventsPersistenceRead,
20    v0::traits::{SequencerPersistence, StateCatchup},
21    v0_3::{
22        ChainConfig, RegisteredValidator, RewardAccountQueryDataV1, RewardAccountV1, RewardAmount,
23        RewardMerkleTreeV1, StakeTableEvent,
24    },
25    v0_4::{
26        PermittedRewardMerkleTreeV2, RewardAccountQueryDataV2, RewardAccountV2, RewardMerkleTreeV2,
27    },
28};
29use futures::{
30    future::{BoxFuture, Future, FutureExt},
31    stream::BoxStream,
32};
33use hotshot_contract_adapter::sol_types::EspToken;
34use hotshot_events_service::events_source::{
35    EventFilterSet, EventsSource, EventsStreamer, StartupInfo,
36};
37use hotshot_query_service::{availability::VidCommonQueryData, data_source::ExtensibleDataSource};
38use hotshot_types::{
39    PeerConfig,
40    data::{EpochNumber, VidCommitment, VidCommon, VidShare, ViewNumber},
41    event::{Event, LegacyEvent},
42    light_client::LCV3StateSignatureRequestBody,
43    network::NetworkConfig,
44    simple_certificate::LightClientStateUpdateCertificateV2,
45    traits::{election::Membership, network::ConnectedNetwork},
46    utils::epoch_from_block_number,
47    vid::avidm::{AvidMScheme, init_avidm_param},
48    vote::HasViewNumber,
49};
50use itertools::Itertools;
51use jf_merkle_tree_compat::MerkleTreeScheme;
52use moka::future::Cache;
53use rand::Rng;
54use request_response::RequestType;
55use serde::{Deserialize, Serialize};
56use tokio::time::timeout;
57use vbs::version::Version;
58
59use self::data_source::{HotShotConfigDataSource, NodeStateDataSource, StateSignatureDataSource};
60use crate::{
61    SeqTypes, SequencerApiVersion, SequencerContext,
62    api::data_source::TokenDataSource,
63    catchup::{
64        CatchupStorage, add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
65        add_v2_reward_accounts_to_state,
66    },
67    context::Consensus,
68    request_response::{
69        data_source::{retain_v1_reward_accounts, retain_v2_reward_accounts},
70        request::{Request, Response},
71    },
72    state_cert::{StateCertFetchError, validate_state_cert},
73    state_signature::StateSigner,
74};
75
76pub mod data_source;
77pub mod endpoints;
78pub mod fs;
79pub mod light_client;
80pub mod options;
81pub mod sql;
82pub mod unlock_schedule;
83mod update;
84
85pub use options::Options;
86
87pub type BlocksFrontier = <BlockMerkleTree as MerkleTreeScheme>::MembershipProof;
88
89type BoxLazy<T> = Pin<Arc<Lazy<T, BoxFuture<'static, T>>>>;
90
91#[derive(Derivative)]
92#[derivative(Clone(bound = ""), Debug(bound = ""))]
93struct ApiState<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> {
94    // The consensus state is initialized lazily so we can start the API (and healthcheck endpoints)
95    // before consensus has started. Any endpoint that uses consensus state will wait for
96    // initialization to finish, but endpoints that do not require a consensus handle can proceed
97    // without waiting.
98    #[derivative(Debug = "ignore")]
99    sequencer_context: BoxLazy<SequencerContext<N, P>>,
100
101    // we cache `token_supply` for up to an hour, to avoid repeatedly querying the contract for information that rarely changes
102    token_supply: Cache<(), U256>,
103}
104
105impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> ApiState<N, P> {
106    fn new(context_init: impl Future<Output = SequencerContext<N, P>> + Send + 'static) -> Self {
107        Self {
108            sequencer_context: Arc::pin(Lazy::from_future(context_init.boxed())),
109            token_supply: Cache::builder()
110                .max_capacity(1)
111                .time_to_live(Duration::from_secs(3600))
112                .build(),
113        }
114    }
115
116    async fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
117        self.sequencer_context
118            .as_ref()
119            .get()
120            .await
121            .get_ref()
122            .state_signer()
123    }
124
125    async fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
126        self.sequencer_context
127            .as_ref()
128            .get()
129            .await
130            .get_ref()
131            .event_streamer()
132    }
133
134    async fn consensus(&self) -> Arc<RwLock<Consensus<N, P>>> {
135        self.sequencer_context
136            .as_ref()
137            .get()
138            .await
139            .get_ref()
140            .consensus()
141    }
142
143    async fn network_config(&self) -> NetworkConfig<SeqTypes> {
144        self.sequencer_context
145            .as_ref()
146            .get()
147            .await
148            .get_ref()
149            .network_config()
150    }
151}
152
153type StorageState<N, P, D> = ExtensibleDataSource<D, ApiState<N, P>>;
154
155#[async_trait]
156impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> EventsSource<SeqTypes>
157    for ApiState<N, P>
158{
159    type EventStream = BoxStream<'static, Arc<Event<SeqTypes>>>;
160    type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<SeqTypes>>>;
161
162    async fn get_event_stream(
163        &self,
164        _filter: Option<EventFilterSet<SeqTypes>>,
165    ) -> Self::EventStream {
166        self.event_streamer()
167            .await
168            .read()
169            .await
170            .get_event_stream(None)
171            .await
172    }
173
174    async fn get_legacy_event_stream(
175        &self,
176        _filter: Option<EventFilterSet<SeqTypes>>,
177    ) -> Self::LegacyEventStream {
178        self.event_streamer()
179            .await
180            .read()
181            .await
182            .get_legacy_event_stream(None)
183            .await
184    }
185
186    async fn get_startup_info(&self) -> StartupInfo<SeqTypes> {
187        self.event_streamer()
188            .await
189            .read()
190            .await
191            .get_startup_info()
192            .await
193    }
194}
195
196impl<N: ConnectedNetwork<PubKey>, D: Send + Sync, P: SequencerPersistence> TokenDataSource<SeqTypes>
197    for StorageState<N, P, D>
198{
199    async fn get_initial_supply_l1(&self) -> anyhow::Result<U256> {
200        self.as_ref().get_initial_supply_l1().await
201    }
202
203    async fn get_total_supply_l1(&self) -> anyhow::Result<U256> {
204        self.as_ref().get_total_supply_l1().await
205    }
206
207    async fn get_decided_header(&self) -> espresso_types::Header {
208        self.as_ref().get_decided_header().await
209    }
210}
211
212impl<N: ConnectedNetwork<PubKey>, D: Send + Sync, P: SequencerPersistence> SubmitDataSource<N, P>
213    for StorageState<N, P, D>
214{
215    async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
216        self.as_ref().submit(tx).await
217    }
218}
219
220impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence> StakeTableDataSource<SeqTypes>
221    for StorageState<N, P, D>
222{
223    /// Get the stake table for a given epoch
224    async fn get_stake_table(
225        &self,
226        epoch: Option<EpochNumber>,
227    ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
228        self.as_ref().get_stake_table(epoch).await
229    }
230
231    /// Get the stake table for the current epoch if not provided
232    async fn get_stake_table_current(&self) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
233        self.as_ref().get_stake_table_current().await
234    }
235
236    /// Get the DA stake table for a given epoch
237    async fn get_da_stake_table(
238        &self,
239        epoch: Option<EpochNumber>,
240    ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
241        self.as_ref().get_da_stake_table(epoch).await
242    }
243
244    /// Get the DA stake table for the current epoch if not provided
245    async fn get_da_stake_table_current(
246        &self,
247    ) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
248        self.as_ref().get_da_stake_table_current().await
249    }
250
251    /// Get all the validators
252    async fn get_validators(
253        &self,
254        epoch: EpochNumber,
255    ) -> anyhow::Result<AuthenticatedValidatorMap> {
256        self.as_ref().get_validators(epoch).await
257    }
258
259    async fn get_block_reward(
260        &self,
261        epoch: Option<EpochNumber>,
262    ) -> anyhow::Result<Option<RewardAmount>> {
263        self.as_ref().get_block_reward(epoch).await
264    }
265    /// Get all the validator participation for the current epoch
266    async fn current_proposal_participation(&self) -> HashMap<PubKey, f64> {
267        self.as_ref().current_proposal_participation().await
268    }
269    /// Get all the validator participation for the previous epoch
270    async fn proposal_participation(&self, epoch: EpochNumber) -> HashMap<PubKey, f64> {
271        self.as_ref().proposal_participation(epoch).await
272    }
273    /// Get all the vote participation for the current epoch
274    async fn current_vote_participation(&self) -> HashMap<PubKey, f64> {
275        self.as_ref().current_vote_participation().await
276    }
277    /// Get all the vote participation for a given epoch
278    async fn vote_participation(&self, epoch: EpochNumber) -> HashMap<PubKey, f64> {
279        self.as_ref().vote_participation(epoch).await
280    }
281
282    async fn get_all_validators(
283        &self,
284        epoch: EpochNumber,
285        offset: u64,
286        limit: u64,
287    ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>> {
288        self.as_ref().get_all_validators(epoch, offset, limit).await
289    }
290
291    async fn stake_table_events(
292        &self,
293        from_l1_block: u64,
294        to_l1_block: u64,
295    ) -> anyhow::Result<Vec<StakeTableEvent>> {
296        self.as_ref()
297            .stake_table_events(from_l1_block, to_l1_block)
298            .await
299    }
300}
301
302impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> TokenDataSource<SeqTypes>
303    for ApiState<N, P>
304{
305    async fn get_initial_supply_l1(&self) -> anyhow::Result<U256> {
306        let node_state = self.sequencer_context.as_ref().get().await.node_state();
307        let fetcher = node_state
308            .coordinator
309            .membership()
310            .read()
311            .await
312            .fetcher()
313            .clone();
314        let cached = *fetcher.initial_supply.read().await;
315        match cached {
316            Some(supply) => Ok(supply),
317            None => Ok(fetcher.fetch_and_update_initial_supply().await?),
318        }
319    }
320
321    async fn get_total_supply_l1(&self) -> anyhow::Result<U256> {
322        match self.token_supply.get(&()).await {
323            Some(supply) => Ok(supply),
324            None => {
325                let node_state = self.sequencer_context.as_ref().get().await.node_state();
326                let token_contract_address = node_state.token_contract_address().await?;
327
328                let provider = node_state.l1_client.provider;
329
330                let token = EspToken::new(token_contract_address, provider.clone());
331
332                let supply = token
333                    .totalSupply()
334                    .call()
335                    .await
336                    .context("Failed to retrieve totalSupply from the contract")?;
337
338                self.token_supply.insert((), supply).await;
339
340                Ok(supply)
341            },
342        }
343    }
344
345    async fn get_decided_header(&self) -> espresso_types::Header {
346        self.consensus()
347            .await
348            .read()
349            .await
350            .decided_leaf()
351            .await
352            .block_header()
353            .clone()
354    }
355}
356
357impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> StakeTableDataSource<SeqTypes>
358    for ApiState<N, P>
359{
360    /// Get the stake table for a given epoch
361    async fn get_stake_table(
362        &self,
363        epoch: Option<EpochNumber>,
364    ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
365        let highest_epoch = self
366            .consensus()
367            .await
368            .read()
369            .await
370            .cur_epoch()
371            .await
372            .map(|e| e + 1);
373        if epoch > highest_epoch {
374            return Err(anyhow::anyhow!(
375                "requested stake table for epoch {epoch:?} is beyond the current epoch + 1 \
376                 {highest_epoch:?}"
377            ));
378        }
379        let mem = self
380            .consensus()
381            .await
382            .read()
383            .await
384            .membership_coordinator
385            .stake_table_for_epoch(epoch)
386            .await?;
387
388        Ok(mem.stake_table().await.0)
389    }
390
391    /// Get the stake table for the current epoch and return it along with the epoch number
392    async fn get_stake_table_current(&self) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
393        let epoch = self.consensus().await.read().await.cur_epoch().await;
394
395        Ok(StakeTableWithEpochNumber {
396            epoch,
397            stake_table: self.get_stake_table(epoch).await?,
398        })
399    }
400
401    /// Get the DA stake table for a given epoch
402    async fn get_da_stake_table(
403        &self,
404        epoch: Option<EpochNumber>,
405    ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
406        Ok(self
407            .consensus()
408            .await
409            .read()
410            .await
411            .membership_coordinator
412            .membership()
413            .read()
414            .await
415            .da_stake_table(epoch)
416            .0)
417    }
418
419    /// Get the DA stake table for the current epoch and return it along with the epoch number
420    async fn get_da_stake_table_current(
421        &self,
422    ) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
423        let epoch = self.consensus().await.read().await.cur_epoch().await;
424
425        Ok(StakeTableWithEpochNumber {
426            epoch,
427            stake_table: self.get_da_stake_table(epoch).await?,
428        })
429    }
430
431    async fn get_block_reward(
432        &self,
433        epoch: Option<EpochNumber>,
434    ) -> anyhow::Result<Option<RewardAmount>> {
435        let coordinator = self
436            .consensus()
437            .await
438            .read()
439            .await
440            .membership_coordinator
441            .clone();
442
443        let membership = coordinator.membership().read().await;
444        let block_reward = match epoch {
445            None => membership.fixed_block_reward(),
446            Some(e) => membership.epoch_block_reward(e),
447        };
448
449        Ok(block_reward)
450    }
451
452    /// Get the whole validators map
453    async fn get_validators(
454        &self,
455        epoch: EpochNumber,
456    ) -> anyhow::Result<AuthenticatedValidatorMap> {
457        let mem = self
458            .consensus()
459            .await
460            .read()
461            .await
462            .membership_coordinator
463            .membership_for_epoch(Some(epoch))
464            .await
465            .context("membership not found")?;
466
467        let membership = mem.coordinator.membership().read().await;
468        membership.active_validators(&epoch)
469    }
470
471    /// Get the current proposal participation.
472    async fn current_proposal_participation(&self) -> HashMap<PubKey, f64> {
473        self.consensus()
474            .await
475            .read()
476            .await
477            .consensus()
478            .read()
479            .await
480            .current_proposal_participation()
481    }
482
483    /// Get the proposal participation for a given epoch.
484    async fn proposal_participation(&self, epoch: EpochNumber) -> HashMap<PubKey, f64> {
485        self.consensus()
486            .await
487            .read()
488            .await
489            .consensus()
490            .read()
491            .await
492            .proposal_participation(epoch)
493    }
494
495    /// Get the current vote participation.
496    async fn current_vote_participation(&self) -> HashMap<PubKey, f64> {
497        self.consensus()
498            .await
499            .read()
500            .await
501            .consensus()
502            .read()
503            .await
504            .current_vote_participation()
505    }
506
507    /// Get the vote participation for a given epoch.
508    async fn vote_participation(&self, epoch: EpochNumber) -> HashMap<PubKey, f64> {
509        self.consensus()
510            .await
511            .read()
512            .await
513            .consensus()
514            .read()
515            .await
516            .vote_participation(Some(epoch))
517    }
518
519    async fn get_all_validators(
520        &self,
521        epoch: EpochNumber,
522        offset: u64,
523        limit: u64,
524    ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>> {
525        let handle = self.consensus().await;
526        let handle_read = handle.read().await;
527        let storage = handle_read.storage();
528        storage.load_all_validators(epoch, offset, limit).await
529    }
530
531    async fn stake_table_events(
532        &self,
533        from_l1_block: u64,
534        to_l1_block: u64,
535    ) -> anyhow::Result<Vec<StakeTableEvent>> {
536        let handle = self.consensus().await;
537        let handle_read = handle.read().await;
538        let storage = handle_read.storage();
539        let (status, events) = storage.load_events(from_l1_block, to_l1_block).await?;
540        ensure!(
541            status == Some(EventsPersistenceRead::Complete),
542            "some events in range [{from_l1_block}, {to_l1_block}] are not available ({status:?})"
543        );
544        Ok(events.into_iter().map(|(_, event)| event).collect())
545    }
546}
547
548impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence>
549    RequestResponseDataSource<SeqTypes> for StorageState<N, P, D>
550{
551    async fn request_vid_shares(
552        &self,
553        block_number: u64,
554        vid_common_data: VidCommonQueryData<SeqTypes>,
555        timeout_duration: Duration,
556    ) -> BoxFuture<'static, anyhow::Result<Vec<VidShare>>> {
557        self.as_ref()
558            .request_vid_shares(block_number, vid_common_data, timeout_duration)
559            .await
560    }
561}
562
563#[async_trait]
564impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence>
565    StateCertFetchingDataSource<SeqTypes> for StorageState<N, P, D>
566{
567    async fn request_state_cert(
568        &self,
569        epoch: u64,
570        timeout: Duration,
571    ) -> Result<LightClientStateUpdateCertificateV2<SeqTypes>, StateCertFetchError> {
572        self.as_ref().request_state_cert(epoch, timeout).await
573    }
574}
575
576impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> RequestResponseDataSource<SeqTypes>
577    for ApiState<N, P>
578{
579    async fn request_vid_shares(
580        &self,
581        block_number: u64,
582        vid_common_data: VidCommonQueryData<SeqTypes>,
583        duration: Duration,
584    ) -> BoxFuture<'static, anyhow::Result<Vec<VidShare>>> {
585        // Get a handle to the request response protocol
586        let request_response_protocol = self
587            .sequencer_context
588            .as_ref()
589            .get()
590            .await
591            .request_response_protocol
592            .clone();
593
594        async move {
595            // Get the total VID weight based on the VID common data
596            let total_weight = match vid_common_data.common() {
597                VidCommon::V0(_) => {
598                    // TODO: This needs to be done via the stake table
599                    return Err(anyhow::anyhow!(
600                        "V0 total weight calculation not supported yet"
601                    ));
602                },
603                VidCommon::V1(v1) => v1.total_weights,
604                VidCommon::V2(v2) => v2.param.total_weights,
605            };
606
607            // Create the AvidM parameters from the total weight
608            let avidm_param = init_avidm_param(total_weight)
609                .with_context(|| "failed to initialize avidm param")?;
610
611            // Get the payload hash for verification
612            let VidCommitment::V1(local_payload_hash) = vid_common_data.payload_hash() else {
613                bail!("V0 share verification not supported yet");
614            };
615
616            // Create a random request id
617            let request_id = rand::thread_rng().r#gen();
618
619            // Request and verify the shares from all other nodes, timing out after `duration` seconds
620            let received_shares = Arc::new(parking_lot::Mutex::new(Vec::new()));
621            let received_shares_clone = received_shares.clone();
622            let request_result: anyhow::Result<_, _> = timeout(
623                duration,
624                request_response_protocol.request_indefinitely::<_, _, _>(
625                    Request::VidShare(block_number, request_id),
626                    RequestType::Broadcast,
627                    move |_request, response| {
628                        let avidm_param = avidm_param.clone();
629                        let received_shares = received_shares_clone.clone();
630                        async move {
631                            // Make sure the response was a V1 share
632                            let Response::VidShare(VidShare::V1(received_share)) = response else {
633                                bail!("V0 share verification not supported yet");
634                            };
635
636                            // Verify the share
637                            let Ok(Ok(_)) = AvidMScheme::verify_share(
638                                &avidm_param,
639                                &local_payload_hash,
640                                &received_share,
641                            ) else {
642                                bail!("share verification failed");
643                            };
644
645                            // Add the share to the list of received shares
646                            received_shares.lock().push(received_share);
647
648                            bail!("waiting for more shares");
649
650                            #[allow(unreachable_code)]
651                            Ok(())
652                        }
653                    },
654                ),
655            )
656            .await;
657
658            // If the request timed out, return the shares we have collected so far
659            match request_result {
660                Err(_) => {
661                    // If it timed out, this was successful. Return the shares we have collected so far
662                    Ok(received_shares
663                        .lock()
664                        .clone()
665                        .into_iter()
666                        .map(VidShare::V1)
667                        .collect())
668                },
669
670                // If it was an error from the inner request, return that error
671                Ok(Err(e)) => Err(e).with_context(|| "failed to request vid shares"),
672
673                // If it was successful, this was unexpected.
674                Ok(Ok(_)) => bail!("this should not be possible"),
675            }
676        }
677        .boxed()
678    }
679}
680
681#[async_trait]
682impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> StateCertFetchingDataSource<SeqTypes>
683    for ApiState<N, P>
684{
685    async fn request_state_cert(
686        &self,
687        epoch: u64,
688        timeout: Duration,
689    ) -> Result<LightClientStateUpdateCertificateV2<SeqTypes>, StateCertFetchError> {
690        tracing::info!("fetching state certificate for epoch={epoch}");
691        let consensus = self.consensus().await;
692        let consensus_read = consensus.read().await;
693
694        let current_epoch = consensus_read.cur_epoch().await;
695
696        // // The highest epoch we can have a state certificate for is current_epoch + 1
697        // // Check if requested epoch is beyond the highest possible epoch
698        let highest_epoch = current_epoch.map(|e| e.u64() + 1);
699
700        if Some(epoch) > highest_epoch {
701            return Err(StateCertFetchError::Other(anyhow::anyhow!(
702                "requested state certificate for epoch {epoch} is beyond the highest possible \
703                 epoch {highest_epoch:?}"
704            )));
705        }
706
707        // Get the stake table for validation
708        let coordinator = consensus_read.membership_coordinator.clone();
709        if let Err(err) = coordinator
710            .stake_table_for_epoch(Some(EpochNumber::new(epoch)))
711            .await
712        {
713            tracing::warn!(
714                "Failed to get membership for epoch {epoch}: {err:#}. Waiting for catchup"
715            );
716
717            coordinator
718                .wait_for_catchup(EpochNumber::new(epoch))
719                .await
720                .map_err(|e| {
721                    StateCertFetchError::Other(
722                        anyhow::Error::new(e)
723                            .context(format!("failed to catch up for stake table epoch={epoch}")),
724                    )
725                })?;
726        }
727
728        let membership = coordinator
729            .stake_table_for_epoch(Some(EpochNumber::new(epoch)))
730            .await
731            .map_err(|e| {
732                StateCertFetchError::Other(
733                    anyhow::Error::new(e)
734                        .context(format!("failed to get stake table for epoch={epoch}")),
735                )
736            })?;
737
738        let stake_table = membership.stake_table().await;
739
740        drop(consensus_read);
741        drop(consensus);
742
743        let state_catchup = self
744            .sequencer_context
745            .as_ref()
746            .get()
747            .await
748            .node_state()
749            .state_catchup
750            .clone();
751
752        let result = tokio::time::timeout(timeout, state_catchup.fetch_state_cert(epoch)).await;
753
754        match result {
755            Err(_) => Err(StateCertFetchError::FetchError(anyhow::anyhow!(
756                "timeout while fetching state cert for epoch {epoch}"
757            ))),
758            Ok(Ok(cert)) => {
759                // Validation errors should be mapped to ValidationError
760                validate_state_cert(&cert, &stake_table).map_err(|e| {
761                    StateCertFetchError::ValidationError(e.context(format!(
762                        "state certificate validation failed for epoch={epoch}"
763                    )))
764                })?;
765
766                tracing::info!("fetched and validated state certificate for epoch {epoch}");
767                Ok(cert)
768            },
769            Ok(Err(e)) => Err(StateCertFetchError::FetchError(
770                e.context(format!("failed to fetch state cert for epoch {epoch}")),
771            )),
772        }
773    }
774}
775
776// Thin wrapper implementations that delegate to persistence
777#[async_trait]
778impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence> StateCertDataSource
779    for StorageState<N, P, D>
780{
781    async fn get_state_cert_by_epoch(
782        &self,
783        epoch: u64,
784    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
785        self.as_ref().get_state_cert_by_epoch(epoch).await
786    }
787
788    async fn insert_state_cert(
789        &self,
790        epoch: u64,
791        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
792    ) -> anyhow::Result<()> {
793        self.as_ref().insert_state_cert(epoch, cert).await
794    }
795}
796
797#[async_trait]
798impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> StateCertDataSource for ApiState<N, P> {
799    async fn get_state_cert_by_epoch(
800        &self,
801        epoch: u64,
802    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
803        let consensus = self.consensus().await;
804        let consensus_lock = consensus.read().await;
805        let persistence = consensus_lock.storage();
806
807        persistence.get_state_cert_by_epoch(epoch).await
808    }
809
810    async fn insert_state_cert(
811        &self,
812        epoch: u64,
813        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
814    ) -> anyhow::Result<()> {
815        let consensus = self.consensus().await;
816        let consensus_lock = consensus.read().await;
817        let persistence = consensus_lock.storage();
818
819        persistence.insert_state_cert(epoch, cert).await
820    }
821}
822
823impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> SubmitDataSource<N, P>
824    for ApiState<N, P>
825{
826    async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
827        let handle = self.consensus().await;
828
829        let consensus_read_lock = handle.read().await;
830
831        // Fetch full chain config from the validated state, if present.
832        // This is necessary because we support chain config upgrades,
833        // so the updated chain config is found in the validated state.
834        let cf = consensus_read_lock
835            .decided_state()
836            .await
837            .chain_config
838            .resolve();
839
840        // Use the chain config from the validated state if available,
841        // otherwise, use the node state's chain config
842        // The node state's chain config is the node's base version chain config
843        let cf = match cf {
844            Some(cf) => cf,
845            None => self.node_state().await.chain_config,
846        };
847
848        let max_block_size: u64 = cf.max_block_size.into();
849        let txn_size = tx.payload().len() as u64;
850
851        // reject transaction bigger than block size
852        if txn_size > max_block_size {
853            bail!("transaction size ({txn_size}) is greater than max_block_size ({max_block_size})")
854        }
855
856        consensus_read_lock.submit_transaction(tx).await?;
857        Ok(())
858    }
859}
860
861impl<N, P, D> NodeStateDataSource for StorageState<N, P, D>
862where
863    N: ConnectedNetwork<PubKey>,
864    P: SequencerPersistence,
865    D: Sync,
866{
867    async fn node_state(&self) -> NodeState {
868        self.as_ref().node_state().await
869    }
870}
871
872impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, D: CatchupStorage + Send + Sync>
873    data_source::DatabaseMetadataSource for StorageState<N, P, D>
874where
875    N: ConnectedNetwork<PubKey>,
876    P: SequencerPersistence,
877    D: data_source::DatabaseMetadataSource + Send + Sync,
878{
879    async fn get_table_sizes(&self) -> anyhow::Result<Vec<data_source::TableSize>> {
880        self.inner().get_table_sizes().await
881    }
882}
883
884impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, D: CatchupStorage + Send + Sync>
885    CatchupDataSource for StorageState<N, P, D>
886{
887    #[tracing::instrument(skip(self, instance))]
888    async fn get_accounts(
889        &self,
890        instance: &NodeState,
891        height: u64,
892        view: ViewNumber,
893        accounts: &[FeeAccount],
894    ) -> anyhow::Result<FeeMerkleTree> {
895        // Check if we have the desired state in memory.
896        match self
897            .as_ref()
898            .get_accounts(instance, height, view, accounts)
899            .await
900        {
901            Ok(accounts) => return Ok(accounts),
902            Err(err) => {
903                tracing::info!("accounts not in memory, trying storage: {err:#}");
904            },
905        }
906
907        // Try storage.
908        let (tree, leaf) = self
909            .inner()
910            .get_accounts(instance, height, view, accounts)
911            .await
912            .context("accounts not in memory, and could not fetch from storage")?;
913        // If we successfully fetched accounts from storage, try to add them back into the in-memory
914        // state.
915
916        let consensus = self
917            .as_ref()
918            .consensus()
919            .await
920            .read()
921            .await
922            .consensus()
923            .clone();
924        if let Err(err) =
925            add_fee_accounts_to_state::<N, P>(&consensus, &view, accounts, &tree, leaf).await
926        {
927            tracing::warn!(?view, "cannot update fetched account state: {err:#}");
928        }
929        tracing::info!(?view, "updated with fetched account state");
930
931        Ok(tree)
932    }
933
934    #[tracing::instrument(skip(self, instance))]
935    async fn get_frontier(
936        &self,
937        instance: &NodeState,
938        height: u64,
939        view: ViewNumber,
940    ) -> anyhow::Result<BlocksFrontier> {
941        // Check if we have the desired state in memory.
942        match self.as_ref().get_frontier(instance, height, view).await {
943            Ok(frontier) => return Ok(frontier),
944            Err(err) => {
945                tracing::info!("frontier is not in memory, trying storage: {err:#}");
946            },
947        }
948
949        // Try storage.
950        self.inner().get_frontier(instance, height, view).await
951    }
952
953    async fn get_chain_config(
954        &self,
955        commitment: Commitment<ChainConfig>,
956    ) -> anyhow::Result<ChainConfig> {
957        // Check if we have the desired state in memory.
958        match self.as_ref().get_chain_config(commitment).await {
959            Ok(cf) => return Ok(cf),
960            Err(err) => {
961                tracing::info!("chain config is not in memory, trying storage: {err:#}");
962            },
963        }
964
965        // Try storage.
966        self.inner().get_chain_config(commitment).await
967    }
968    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
969        // Check if we have the desired state in memory.
970        match self.as_ref().get_leaf_chain(height).await {
971            Ok(cf) => return Ok(cf),
972            Err(err) => {
973                tracing::info!("leaf chain is not in memory, trying storage: {err:#}");
974            },
975        }
976
977        // Try storage.
978        self.inner().get_leaf_chain(height).await
979    }
980
981    #[tracing::instrument(skip(self, instance))]
982    async fn get_reward_accounts_v2(
983        &self,
984        instance: &NodeState,
985        height: u64,
986        view: ViewNumber,
987        accounts: &[RewardAccountV2],
988    ) -> anyhow::Result<RewardMerkleTreeV2> {
989        // Check if we have the desired state in memory.
990        match self
991            .as_ref()
992            .get_reward_accounts_v2(instance, height, view, accounts)
993            .await
994        {
995            Ok(accounts) => return Ok(accounts),
996            Err(err) => {
997                tracing::info!("reward accounts not in memory, trying storage: {err:#}");
998            },
999        }
1000
1001        // Try storage.
1002        let (tree, leaf) = self
1003            .inner()
1004            .get_reward_accounts_v2(instance, height, view, accounts)
1005            .await
1006            .context("accounts not in memory, and could not fetch from storage")?;
1007
1008        // If we successfully fetched accounts from storage, try to add them back into the in-memory
1009        // state.
1010        let consensus = self
1011            .as_ref()
1012            .consensus()
1013            .await
1014            .read()
1015            .await
1016            .consensus()
1017            .clone();
1018        if let Err(err) =
1019            add_v2_reward_accounts_to_state::<N, P>(&consensus, &view, accounts, &tree, leaf).await
1020        {
1021            tracing::warn!(?view, "cannot update fetched account state: {err:#}");
1022        }
1023        tracing::info!(?view, "updated with fetched account state");
1024
1025        Ok(tree)
1026    }
1027
1028    #[tracing::instrument(skip(self, instance))]
1029    async fn get_reward_accounts_v1(
1030        &self,
1031        instance: &NodeState,
1032        height: u64,
1033        view: ViewNumber,
1034        accounts: &[RewardAccountV1],
1035    ) -> anyhow::Result<RewardMerkleTreeV1> {
1036        // Check if we have the desired state in memory.
1037        match self
1038            .as_ref()
1039            .get_reward_accounts_v1(instance, height, view, accounts)
1040            .await
1041        {
1042            Ok(accounts) => return Ok(accounts),
1043            Err(err) => {
1044                tracing::info!("reward accounts not in memory, trying storage: {err:#}");
1045            },
1046        }
1047
1048        // Try storage.
1049        let (tree, leaf) = self
1050            .inner()
1051            .get_reward_accounts_v1(instance, height, view, accounts)
1052            .await
1053            .context("accounts not in memory, and could not fetch from storage")?;
1054
1055        // If we successfully fetched accounts from storage, try to add them back into the in-memory
1056        // state.
1057        let consensus = self
1058            .as_ref()
1059            .consensus()
1060            .await
1061            .read()
1062            .await
1063            .consensus()
1064            .clone();
1065        if let Err(err) =
1066            add_v1_reward_accounts_to_state::<N, P>(&consensus, &view, accounts, &tree, leaf).await
1067        {
1068            tracing::warn!(?view, "cannot update fetched account state: {err:#}");
1069        }
1070        tracing::info!(?view, "updated with fetched account state");
1071
1072        Ok(tree)
1073    }
1074
1075    async fn get_reward_merkle_tree_v2(
1076        &self,
1077        height: u64,
1078        view: ViewNumber,
1079    ) -> anyhow::Result<Vec<u8>> {
1080        self.as_ref().get_reward_merkle_tree_v2(height, view).await
1081    }
1082
1083    #[tracing::instrument(skip(self))]
1084    async fn get_state_cert(
1085        &self,
1086        epoch: u64,
1087    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1088        let consensus = self.as_ref().consensus().await;
1089        let consensus_lock = consensus.read().await;
1090        let persistence = consensus_lock.storage();
1091
1092        persistence
1093            .get_state_cert_by_epoch(epoch)
1094            .await?
1095            .context(format!("state cert for epoch {epoch} not found"))
1096    }
1097}
1098
1099impl<N, P> NodeStateDataSource for ApiState<N, P>
1100where
1101    N: ConnectedNetwork<PubKey>,
1102    P: SequencerPersistence,
1103{
1104    async fn node_state(&self) -> NodeState {
1105        self.sequencer_context.as_ref().get().await.node_state()
1106    }
1107}
1108
1109impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> CatchupDataSource for ApiState<N, P> {
1110    #[tracing::instrument(skip(self, _instance))]
1111    async fn get_accounts(
1112        &self,
1113        _instance: &NodeState,
1114        height: u64,
1115        view: ViewNumber,
1116        accounts: &[FeeAccount],
1117    ) -> anyhow::Result<FeeMerkleTree> {
1118        let state = self
1119            .consensus()
1120            .await
1121            .read()
1122            .await
1123            .state(view)
1124            .await
1125            .context(format!(
1126                "state not available for height {height}, view {view}"
1127            ))?;
1128        retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
1129    }
1130
1131    #[tracing::instrument(skip(self, _instance))]
1132    async fn get_frontier(
1133        &self,
1134        _instance: &NodeState,
1135        height: u64,
1136        view: ViewNumber,
1137    ) -> anyhow::Result<BlocksFrontier> {
1138        let state = self
1139            .consensus()
1140            .await
1141            .read()
1142            .await
1143            .state(view)
1144            .await
1145            .context(format!(
1146                "state not available for height {height}, view {view}"
1147            ))?;
1148        let tree = &state.block_merkle_tree;
1149        let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
1150        Ok(frontier)
1151    }
1152
1153    async fn get_chain_config(
1154        &self,
1155        commitment: Commitment<ChainConfig>,
1156    ) -> anyhow::Result<ChainConfig> {
1157        let state = self.consensus().await.read().await.decided_state().await;
1158        let chain_config = state.chain_config;
1159
1160        if chain_config.commit() == commitment {
1161            chain_config.resolve().context("chain config found")
1162        } else {
1163            bail!("chain config not found")
1164        }
1165    }
1166
1167    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
1168        let mut leaves = self
1169            .consensus()
1170            .await
1171            .read()
1172            .await
1173            .consensus()
1174            .read()
1175            .await
1176            .undecided_leaves();
1177        leaves.sort_by_key(|l| l.view_number());
1178        let (position, mut last_leaf) = leaves
1179            .iter()
1180            .find_position(|l| l.height() == height)
1181            .context(format!("leaf chain not available for {height}"))?;
1182        let mut chain = vec![last_leaf.clone()];
1183        for leaf in leaves.iter().skip(position + 1) {
1184            if leaf.justify_qc().view_number() == last_leaf.view_number() {
1185                chain.push(leaf.clone());
1186            } else {
1187                continue;
1188            }
1189            if leaf.view_number() == last_leaf.view_number() + 1 {
1190                // one away from decide
1191                last_leaf = leaf;
1192                break;
1193            }
1194            last_leaf = leaf;
1195        }
1196        // Make sure we got one more leaf to confirm the decide
1197        for leaf in leaves
1198            .iter()
1199            .skip_while(|l| l.view_number() <= last_leaf.view_number())
1200        {
1201            if leaf.justify_qc().view_number() == last_leaf.view_number() {
1202                chain.push(leaf.clone());
1203                return Ok(chain);
1204            }
1205        }
1206        bail!(format!("leaf chain not available for {height}"))
1207    }
1208
1209    #[tracing::instrument(skip(self, _instance))]
1210    async fn get_reward_accounts_v2(
1211        &self,
1212        _instance: &NodeState,
1213        height: u64,
1214        view: ViewNumber,
1215        accounts: &[RewardAccountV2],
1216    ) -> anyhow::Result<RewardMerkleTreeV2> {
1217        let state = self
1218            .consensus()
1219            .await
1220            .read()
1221            .await
1222            .state(view)
1223            .await
1224            .context(format!(
1225                "state not available for height {height}, view {view}"
1226            ))?;
1227
1228        retain_v2_reward_accounts(&state.reward_merkle_tree_v2, accounts.iter().copied())
1229    }
1230
1231    #[tracing::instrument(skip(self, _instance))]
1232    async fn get_reward_accounts_v1(
1233        &self,
1234        _instance: &NodeState,
1235        height: u64,
1236        view: ViewNumber,
1237        accounts: &[RewardAccountV1],
1238    ) -> anyhow::Result<RewardMerkleTreeV1> {
1239        let state = self
1240            .consensus()
1241            .await
1242            .read()
1243            .await
1244            .state(view)
1245            .await
1246            .context(format!(
1247                "state not available for height {height}, view {view}"
1248            ))?;
1249
1250        retain_v1_reward_accounts(&state.reward_merkle_tree_v1, accounts.iter().copied())
1251    }
1252
1253    async fn get_reward_merkle_tree_v2(
1254        &self,
1255        height: u64,
1256        view: ViewNumber,
1257    ) -> anyhow::Result<Vec<u8>> {
1258        let state = self
1259            .consensus()
1260            .await
1261            .read()
1262            .await
1263            .state(view)
1264            .await
1265            .context(format!(
1266                "state not available for height {height}, view {view}"
1267            ))?;
1268
1269        let merkle_tree_bytes = bincode::serialize(&TryInto::<RewardMerkleTreeV2Data>::try_into(
1270            &state.reward_merkle_tree_v2,
1271        )?)
1272        .context("Merkle tree serialization failed; this should never happen.")?;
1273
1274        Ok(merkle_tree_bytes)
1275    }
1276
1277    async fn get_state_cert(
1278        &self,
1279        epoch: u64,
1280    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1281        self.get_state_cert_by_epoch(epoch)
1282            .await?
1283            .context(format!("state cert not found for epoch {epoch}"))
1284    }
1285}
1286
1287impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence> HotShotConfigDataSource
1288    for StorageState<N, P, D>
1289{
1290    async fn get_config(&self) -> PublicNetworkConfig {
1291        self.as_ref().network_config().await.into()
1292    }
1293}
1294
1295impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> HotShotConfigDataSource
1296    for ApiState<N, P>
1297{
1298    async fn get_config(&self) -> PublicNetworkConfig {
1299        self.network_config().await.into()
1300    }
1301}
1302
1303#[async_trait]
1304impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence> StateSignatureDataSource<N>
1305    for StorageState<N, P, D>
1306{
1307    async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody> {
1308        self.as_ref().get_state_signature(height).await
1309    }
1310}
1311
1312#[async_trait]
1313impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> StateSignatureDataSource<N>
1314    for ApiState<N, P>
1315{
1316    async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody> {
1317        self.state_signer()
1318            .await
1319            .read()
1320            .await
1321            .get_state_signature(height)
1322            .await
1323    }
1324}
1325
1326#[derive(Serialize, Deserialize, Debug, Clone)]
1327/// Representation of the RewardMerkleTreeV2 as a set of key-value pairs
1328pub(crate) struct RewardMerkleTreeV2Data {
1329    pub balances: Vec<(RewardAccountV2, RewardAmount)>,
1330}
1331
1332impl TryInto<RewardMerkleTreeV2Data> for &RewardMerkleTreeV2 {
1333    type Error = anyhow::Error;
1334    // Required method
1335    fn try_into(self) -> anyhow::Result<RewardMerkleTreeV2Data> {
1336        let num_leaves = self.num_leaves();
1337
1338        let balances: Vec<_> = self
1339            .iter()
1340            .map(|(account, balance)| (*account, *balance))
1341            .collect();
1342
1343        if balances.len() as u64 == num_leaves {
1344            Ok(RewardMerkleTreeV2Data { balances })
1345        } else {
1346            tracing::error!(
1347                "Attempted to serialize an incomplete RewardMerkleTreeV2. This is not a fatal \
1348                 error, but it should never happen and indicates that something may be seriously \
1349                 wrong. Balances length: {}, num_leaves: {}.",
1350                balances.len(),
1351                num_leaves
1352            );
1353            bail!(
1354                "Failed to convert RewardMerkleTreeV2 into key-value pairs. Some accounts are \
1355                 missing."
1356            );
1357        }
1358    }
1359}
1360
1361pub(crate) trait RewardMerkleTreeDataSource: Send + Sync + Clone + 'static {
1362    fn load_v1_reward_account_proof(
1363        &self,
1364        _height: u64,
1365        _account: RewardAccountV1,
1366    ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV1>>;
1367
1368    fn save_and_gc_reward_tree_v2(
1369        &self,
1370        node_state: &NodeState,
1371        height: u64,
1372        version: Version,
1373        merkle_tree: &RewardMerkleTreeV2,
1374    ) -> impl Send + Future<Output = anyhow::Result<()>> {
1375        async move {
1376            let serialization =
1377                bincode::serialize(&TryInto::<RewardMerkleTreeV2Data>::try_into(merkle_tree)?)
1378                    .context("Merkle tree serialization failed")?;
1379            self.persist_tree(height, serialization).await?;
1380
1381            // Skip garbage collection in tests
1382            if cfg!(any(test, feature = "testing")) {
1383                return Ok(());
1384            }
1385
1386            let finalized_hotshot_height = match node_state.finalized_hotshot_height().await {
1387                Ok(h) => h,
1388                Err(err) => {
1389                    tracing::warn!("failed to get finalized hotshot height: {err:#}");
1390                    return Ok(());
1391                },
1392            };
1393
1394            // Never garbage collect beyond the current block or the finalized L1 height.
1395            let mut gc_height = height.min(finalized_hotshot_height);
1396
1397            // For epoch reward versions, also retain the last 4 epochs
1398            if version >= versions::EPOCH_REWARD_VERSION
1399                && let Some(epoch_height) = node_state.epoch_height
1400            {
1401                let current_epoch = epoch_from_block_number(height, epoch_height);
1402                let gc_epoch = current_epoch.saturating_sub(4);
1403                gc_height = gc_height.min(gc_epoch * epoch_height);
1404            }
1405            if let Err(err) = self.garbage_collect(gc_height).await {
1406                tracing::info!(gc_height, "failed to garbage collect: {err:#}");
1407            }
1408
1409            Ok(())
1410        }
1411    }
1412
1413    fn persist_reward_proofs(
1414        &self,
1415        node_state: &NodeState,
1416        height: u64,
1417        version: Version,
1418    ) -> impl Send + Future<Output = anyhow::Result<()>>;
1419
1420    fn load_reward_merkle_tree_v2(
1421        &self,
1422        height: u64,
1423    ) -> impl Send + Future<Output = anyhow::Result<PermittedRewardMerkleTreeV2>> {
1424        async move {
1425            let tree_bytes = self.load_tree(height).await?;
1426
1427            let tree_data = bincode::deserialize::<RewardMerkleTreeV2Data>(&tree_bytes).context(
1428                "Failed to deserialize RewardMerkleTreeV2 for height {height} from storage; this \
1429                 should never happen.",
1430            )?;
1431
1432            PermittedRewardMerkleTreeV2::try_from_kv_set(tree_data.balances)
1433                .await
1434                .context("Failed to reconstruct reward merkle tree from storage")
1435        }
1436    }
1437
1438    fn load_reward_account_proof_v2(
1439        &self,
1440        _height: u64,
1441        _account: RewardAccountV2,
1442    ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV2>> {
1443        async {
1444            bail!("load_reward_account_proof_v2 is not supported for this data source");
1445        }
1446    }
1447
1448    fn load_latest_reward_account_proof_v2(
1449        &self,
1450        account: RewardAccountV2,
1451    ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV2>> {
1452        async move {
1453            let serialized_account = bincode::serialize(&account).context(
1454                "Failed to serialize RewardAccountV2 for lookup; this should never happen.",
1455            )?;
1456            let proof_bytes = self.load_latest_proof(serialized_account).await?;
1457
1458            bincode::deserialize::<RewardAccountQueryDataV2>(&proof_bytes).context(
1459                "Failed to deserialize RewardAccountQueryDataV2 for account {account} from \
1460                 storage; this should never happen.",
1461            )
1462        }
1463    }
1464
1465    fn persist_tree(
1466        &self,
1467        height: u64,
1468        merkle_tree: Vec<u8>,
1469    ) -> impl Send + Future<Output = anyhow::Result<()>>;
1470
1471    fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>>;
1472
1473    fn persist_proofs(
1474        &self,
1475        height: u64,
1476        proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
1477    ) -> impl Send + Future<Output = anyhow::Result<()>>;
1478
1479    fn load_proof(
1480        &self,
1481        height: u64,
1482        account: Vec<u8>,
1483        epoch_height: u64,
1484    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>>;
1485
1486    fn load_latest_proof(
1487        &self,
1488        account: Vec<u8>,
1489    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>>;
1490
1491    fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool>;
1492
1493    /// garbage collects merkle tree data for blocks strictly older than `height`
1494    fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>>;
1495}
1496
1497impl RewardMerkleTreeDataSource for hotshot_query_service::data_source::MetricsDataSource {
1498    fn load_v1_reward_account_proof(
1499        &self,
1500        _height: u64,
1501        _account: RewardAccountV1,
1502    ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV1>> {
1503        async {
1504            bail!("reward merklized state is not supported for this data source");
1505        }
1506    }
1507
1508    fn persist_reward_proofs(
1509        &self,
1510        _node_state: &NodeState,
1511        _height: u64,
1512        _version: Version,
1513    ) -> impl Send + Future<Output = anyhow::Result<()>> {
1514        async {
1515            bail!("reward merklized state is not supported for this data source");
1516        }
1517    }
1518
1519    fn persist_tree(
1520        &self,
1521        _height: u64,
1522        _merkle_tree: Vec<u8>,
1523    ) -> impl Send + Future<Output = anyhow::Result<()>> {
1524        async move {
1525            bail!("reward merklized state is not supported for this data source");
1526        }
1527    }
1528
1529    fn load_tree(&self, _height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1530        async move {
1531            bail!("reward merklized state is not supported for this data source");
1532        }
1533    }
1534
1535    fn garbage_collect(&self, _height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
1536        async move {
1537            bail!("reward merklized state is not supported for this data source");
1538        }
1539    }
1540
1541    fn persist_proofs(
1542        &self,
1543        _height: u64,
1544        _proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
1545    ) -> impl Send + Future<Output = anyhow::Result<()>> {
1546        async move {
1547            bail!("reward merklized state is not supported for this data source");
1548        }
1549    }
1550
1551    fn load_proof(
1552        &self,
1553        _height: u64,
1554        _account: Vec<u8>,
1555        _epoch_height: u64,
1556    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1557        async move {
1558            bail!("reward merklized state is not supported for this data source");
1559        }
1560    }
1561
1562    fn load_latest_proof(
1563        &self,
1564        _account: Vec<u8>,
1565    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1566        async move {
1567            bail!("reward merklized state is not supported for this data source");
1568        }
1569    }
1570
1571    fn proof_exists(&self, _height: u64) -> impl Send + Future<Output = bool> {
1572        async move { false }
1573    }
1574}
1575
1576impl<T, S> RewardMerkleTreeDataSource
1577    for hotshot_query_service::data_source::ExtensibleDataSource<T, S>
1578where
1579    T: RewardMerkleTreeDataSource,
1580    S: Send + Sync + Clone + NodeStateDataSource + 'static,
1581{
1582    async fn load_v1_reward_account_proof(
1583        &self,
1584        height: u64,
1585        account: RewardAccountV1,
1586    ) -> anyhow::Result<RewardAccountQueryDataV1> {
1587        self.inner()
1588            .load_v1_reward_account_proof(height, account)
1589            .await
1590    }
1591
1592    async fn load_reward_account_proof_v2(
1593        &self,
1594        height: u64,
1595        account: RewardAccountV2,
1596    ) -> anyhow::Result<RewardAccountQueryDataV2> {
1597        let epoch_height = self
1598            .as_ref()
1599            .node_state()
1600            .await
1601            .epoch_height
1602            .context("epoch height not found")?;
1603        let serialized_account = bincode::serialize(&account)
1604            .context("Failed to serialize RewardAccountV2 for lookup; this should never happen.")?;
1605        let proof_bytes = self
1606            .inner()
1607            .load_proof(height, serialized_account, epoch_height)
1608            .await?;
1609
1610        bincode::deserialize::<RewardAccountQueryDataV2>(&proof_bytes)
1611            .context("Failed to deserialize RewardAccountQueryDataV2 from storage")
1612    }
1613
1614    fn persist_tree(
1615        &self,
1616        height: u64,
1617        merkle_tree: Vec<u8>,
1618    ) -> impl Send + Future<Output = anyhow::Result<()>> {
1619        async move { self.inner().persist_tree(height, merkle_tree).await }
1620    }
1621
1622    fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1623        async move { self.inner().load_tree(height).await }
1624    }
1625
1626    fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
1627        async move { self.inner().garbage_collect(height).await }
1628    }
1629
1630    fn persist_proofs(
1631        &self,
1632        height: u64,
1633        proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
1634    ) -> impl Send + Future<Output = anyhow::Result<()>> {
1635        async move { self.inner().persist_proofs(height, proofs).await }
1636    }
1637
1638    fn load_proof(
1639        &self,
1640        height: u64,
1641        account: Vec<u8>,
1642        epoch_height: u64,
1643    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1644        async move { self.inner().load_proof(height, account, epoch_height).await }
1645    }
1646
1647    fn load_latest_proof(
1648        &self,
1649        account: Vec<u8>,
1650    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1651        async move { self.inner().load_latest_proof(account).await }
1652    }
1653
1654    fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool> {
1655        async move { self.inner().proof_exists(height).await }
1656    }
1657
1658    fn persist_reward_proofs(
1659        &self,
1660        node_state: &NodeState,
1661        height: u64,
1662        version: Version,
1663    ) -> impl Send + Future<Output = anyhow::Result<()>> {
1664        async move {
1665            self.inner()
1666                .persist_reward_proofs(node_state, height, version)
1667                .await
1668        }
1669    }
1670}
1671
1672#[cfg(any(test, feature = "testing"))]
1673pub mod test_helpers {
1674    use std::{cmp::max, time::Duration};
1675
1676    use alloy::{
1677        network::EthereumWallet,
1678        primitives::{Address, U256},
1679        providers::{ProviderBuilder, ext::AnvilApi},
1680    };
1681    use committable::Committable;
1682    use espresso_contract_deployer::{
1683        Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS, builder::DeployerArgsBuilder,
1684        network_config::light_client_genesis_from_stake_table,
1685    };
1686    use espresso_types::{
1687        MOCK_SEQUENCER_VERSIONS, NamespaceId, ValidatedState,
1688        v0::traits::{NullEventConsumer, PersistenceOptions, StateCatchup},
1689    };
1690    use futures::{
1691        future::{FutureExt, join_all},
1692        stream::StreamExt,
1693    };
1694    use hotshot::types::{Event, EventType};
1695    use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
1696    use hotshot_types::{
1697        event::LeafInfo, light_client::LCV3StateSignatureRequestBody, traits::metrics::NoMetrics,
1698    };
1699    use itertools::izip;
1700    use jf_merkle_tree_compat::{MerkleCommitment, MerkleTreeScheme};
1701    use staking_cli::demo::{DelegationConfig, StakingTransactions};
1702    use surf_disco::Client;
1703    use tempfile::TempDir;
1704    use test_utils::reserve_tcp_port;
1705    use tide_disco::{Api, App, Error, StatusCode, error::ServerError};
1706    use tokio::{spawn, task::JoinHandle, time::sleep};
1707    use url::Url;
1708    use vbs::version::{StaticVersion, StaticVersionType};
1709    use versions::{EPOCH_VERSION, Upgrade};
1710
1711    use super::*;
1712    use crate::{
1713        catchup::NullStateCatchup,
1714        network,
1715        persistence::no_storage,
1716        testing::{TestConfig, TestConfigBuilder, run_legacy_builder, wait_for_decide_on_handle},
1717    };
1718
1719    pub const STAKE_TABLE_CAPACITY_FOR_TEST: usize = 10;
1720
1721    pub struct TestNetwork<P: PersistenceOptions, const NUM_NODES: usize> {
1722        pub server: SequencerContext<network::Memory, P::Persistence>,
1723        pub peers: Vec<SequencerContext<network::Memory, P::Persistence>>,
1724        pub cfg: TestConfig<{ NUM_NODES }>,
1725        // todo (abdul): remove this when fs storage is removed
1726        pub temp_dir: Option<TempDir>,
1727        pub contracts: Option<Contracts>,
1728    }
1729
1730    pub struct TestNetworkConfig<const NUM_NODES: usize, P, C>
1731    where
1732        P: PersistenceOptions,
1733        C: StateCatchup + 'static,
1734    {
1735        state: [ValidatedState; NUM_NODES],
1736        persistence: [P; NUM_NODES],
1737        catchup: [C; NUM_NODES],
1738        network_config: TestConfig<{ NUM_NODES }>,
1739        api_config: Options,
1740        contracts: Option<Contracts>,
1741    }
1742
1743    impl<const NUM_NODES: usize, P, C> TestNetworkConfig<{ NUM_NODES }, P, C>
1744    where
1745        P: PersistenceOptions,
1746        C: StateCatchup + 'static,
1747    {
1748        pub fn states(&self) -> [ValidatedState; NUM_NODES] {
1749            self.state.clone()
1750        }
1751    }
1752
1753    #[derive(Clone)]
1754    pub struct TestNetworkConfigBuilder<const NUM_NODES: usize, P, C>
1755    where
1756        P: PersistenceOptions,
1757        C: StateCatchup + 'static,
1758    {
1759        state: [ValidatedState; NUM_NODES],
1760        persistence: Option<[P; NUM_NODES]>,
1761        catchup: Option<[C; NUM_NODES]>,
1762        api_config: Option<Options>,
1763        network_config: Option<TestConfig<{ NUM_NODES }>>,
1764        contracts: Option<Contracts>,
1765        initial_token_supply: Option<U256>,
1766    }
1767
1768    impl Default for TestNetworkConfigBuilder<5, no_storage::Options, NullStateCatchup> {
1769        fn default() -> Self {
1770            TestNetworkConfigBuilder {
1771                state: std::array::from_fn(|_| ValidatedState::default()),
1772                persistence: Some([no_storage::Options; 5]),
1773                catchup: Some(std::array::from_fn(|_| NullStateCatchup::default())),
1774                network_config: None,
1775                api_config: None,
1776                contracts: None,
1777                initial_token_supply: None,
1778            }
1779        }
1780    }
1781
1782    impl<const NUM_NODES: usize>
1783        TestNetworkConfigBuilder<{ NUM_NODES }, no_storage::Options, NullStateCatchup>
1784    {
1785        pub fn with_num_nodes()
1786        -> TestNetworkConfigBuilder<{ NUM_NODES }, no_storage::Options, NullStateCatchup> {
1787            TestNetworkConfigBuilder {
1788                state: std::array::from_fn(|_| ValidatedState::default()),
1789                persistence: Some([no_storage::Options; { NUM_NODES }]),
1790                catchup: Some(std::array::from_fn(|_| NullStateCatchup::default())),
1791                network_config: None,
1792                api_config: None,
1793                contracts: None,
1794                initial_token_supply: None,
1795            }
1796        }
1797    }
1798
1799    impl<const NUM_NODES: usize, P, C> TestNetworkConfigBuilder<{ NUM_NODES }, P, C>
1800    where
1801        P: PersistenceOptions,
1802        C: StateCatchup + 'static,
1803    {
1804        pub fn states(mut self, state: [ValidatedState; NUM_NODES]) -> Self {
1805            self.state = state;
1806            self
1807        }
1808
1809        pub fn initial_token_supply(mut self, supply: U256) -> Self {
1810            self.initial_token_supply = Some(supply);
1811            self
1812        }
1813
1814        pub fn persistences<NP: PersistenceOptions>(
1815            self,
1816            persistence: [NP; NUM_NODES],
1817        ) -> TestNetworkConfigBuilder<{ NUM_NODES }, NP, C> {
1818            TestNetworkConfigBuilder {
1819                state: self.state,
1820                catchup: self.catchup,
1821                network_config: self.network_config,
1822                api_config: self.api_config,
1823                persistence: Some(persistence),
1824                contracts: self.contracts,
1825                initial_token_supply: self.initial_token_supply,
1826            }
1827        }
1828
1829        pub fn api_config(mut self, api_config: Options) -> Self {
1830            self.api_config = Some(api_config);
1831            self
1832        }
1833
1834        pub fn catchups<NC: StateCatchup + 'static>(
1835            self,
1836            catchup: [NC; NUM_NODES],
1837        ) -> TestNetworkConfigBuilder<{ NUM_NODES }, P, NC> {
1838            TestNetworkConfigBuilder {
1839                state: self.state,
1840                catchup: Some(catchup),
1841                network_config: self.network_config,
1842                api_config: self.api_config,
1843                persistence: self.persistence,
1844                contracts: self.contracts,
1845                initial_token_supply: self.initial_token_supply,
1846            }
1847        }
1848
1849        pub fn network_config(mut self, network_config: TestConfig<{ NUM_NODES }>) -> Self {
1850            self.network_config = Some(network_config);
1851            self
1852        }
1853
1854        pub fn contracts(mut self, contracts: Contracts) -> Self {
1855            self.contracts = Some(contracts);
1856            self
1857        }
1858
1859        /// Setup for POS testing. Deploys contracts and adds the
1860        /// stake table address to state. Must be called before `build()`.
1861        pub async fn pos_hook(
1862            self,
1863            delegation_config: DelegationConfig,
1864            stake_table_version: StakeTableContractVersion,
1865            upgrade: Upgrade,
1866        ) -> anyhow::Result<Self> {
1867            if upgrade.base < EPOCH_VERSION && upgrade.target < EPOCH_VERSION {
1868                panic!("given version does not require pos deployment");
1869            };
1870
1871            let network_config = self
1872                .network_config
1873                .as_ref()
1874                .expect("network_config is required");
1875
1876            let l1_url = network_config.l1_url();
1877            let signer = network_config.signer();
1878            let deployer = ProviderBuilder::new()
1879                .wallet(EthereumWallet::from(signer.clone()))
1880                .connect_http(l1_url.clone());
1881
1882            let blocks_per_epoch = network_config.hotshot_config().epoch_height;
1883            let epoch_start_block = network_config.hotshot_config().epoch_start_block;
1884            let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1885                &network_config.hotshot_config().hotshot_stake_table(),
1886                STAKE_TABLE_CAPACITY_FOR_TEST,
1887            )
1888            .unwrap();
1889
1890            let mut contracts = Contracts::new();
1891            let args = DeployerArgsBuilder::default()
1892                .deployer(deployer.clone())
1893                .rpc_url(l1_url.clone())
1894                .mock_light_client(true)
1895                .genesis_lc_state(genesis_state)
1896                .genesis_st_state(genesis_stake)
1897                .blocks_per_epoch(blocks_per_epoch)
1898                .epoch_start_block(epoch_start_block)
1899                .exit_escrow_period(U256::from(max(
1900                    blocks_per_epoch * 15 + 100,
1901                    DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1902                )))
1903                .multisig_pauser(signer.address())
1904                .token_name("Espresso".to_string())
1905                .token_symbol("ESP".to_string())
1906                .initial_token_supply(self.initial_token_supply.unwrap_or(U256::from(100000u64)))
1907                .ops_timelock_delay(U256::from(0))
1908                .ops_timelock_admin(signer.address())
1909                .ops_timelock_proposers(vec![signer.address()])
1910                .ops_timelock_executors(vec![signer.address()])
1911                .safe_exit_timelock_delay(U256::from(10))
1912                .safe_exit_timelock_admin(signer.address())
1913                .safe_exit_timelock_proposers(vec![signer.address()])
1914                .safe_exit_timelock_executors(vec![signer.address()])
1915                .build()
1916                .unwrap();
1917
1918            match stake_table_version {
1919                StakeTableContractVersion::V1 => {
1920                    args.deploy_to_stake_table_v1(&mut contracts).await
1921                },
1922                StakeTableContractVersion::V2 => args.deploy_all(&mut contracts).await,
1923            }
1924            .context("failed to deploy contracts")?;
1925
1926            let stake_table_address = contracts
1927                .address(Contract::StakeTableProxy)
1928                .expect("StakeTableProxy address not found");
1929
1930            StakingTransactions::create(
1931                l1_url.clone(),
1932                &deployer,
1933                stake_table_address,
1934                network_config.staking_priv_keys(),
1935                None,
1936                delegation_config,
1937            )
1938            .await
1939            .expect("stake table setup failed")
1940            .apply_all()
1941            .await
1942            .expect("send all txns failed");
1943
1944            // enable interval mining with a 1s interval.
1945            // This ensures that blocks are finalized every second, even when there are no transactions.
1946            // It's useful for testing stake table updates,
1947            // which rely on the finalized L1 block number.
1948            if let Some(anvil) = network_config.anvil() {
1949                anvil
1950                    .anvil_set_interval_mining(1)
1951                    .await
1952                    .expect("interval mining");
1953            }
1954
1955            // Add stake table address to `ChainConfig` (held in state),
1956            // avoiding overwrite other values. Base fee is set to `0` to avoid
1957            // unnecessary catchup of `FeeState`.
1958            let state = self.state[0].clone();
1959            let chain_config = if let Some(cf) = state.chain_config.resolve() {
1960                ChainConfig {
1961                    base_fee: 0.into(),
1962                    stake_table_contract: Some(stake_table_address),
1963                    ..cf
1964                }
1965            } else {
1966                ChainConfig {
1967                    base_fee: 0.into(),
1968                    stake_table_contract: Some(stake_table_address),
1969                    ..Default::default()
1970                }
1971            };
1972
1973            let state = ValidatedState {
1974                chain_config: chain_config.into(),
1975                ..state
1976            };
1977            Ok(self
1978                .states(std::array::from_fn(|_| state.clone()))
1979                .contracts(contracts))
1980        }
1981
1982        pub fn build(self) -> TestNetworkConfig<{ NUM_NODES }, P, C> {
1983            TestNetworkConfig {
1984                state: self.state,
1985                persistence: self.persistence.unwrap(),
1986                catchup: self.catchup.unwrap(),
1987                network_config: self.network_config.unwrap(),
1988                api_config: self.api_config.unwrap(),
1989                contracts: self.contracts,
1990            }
1991        }
1992    }
1993
1994    impl<P: PersistenceOptions, const NUM_NODES: usize> TestNetwork<P, { NUM_NODES }> {
1995        pub async fn new<C: StateCatchup + 'static>(
1996            cfg: TestNetworkConfig<{ NUM_NODES }, P, C>,
1997            upgrade: versions::Upgrade,
1998        ) -> Self {
1999            let mut cfg = cfg;
2000            let mut builder_tasks = Vec::new();
2001
2002            let chain_config = cfg.state[0].chain_config.resolve();
2003            if chain_config.is_none() {
2004                tracing::warn!("Chain config is not set, using default max_block_size");
2005            }
2006            let (task, builder_url) = run_legacy_builder::<{ NUM_NODES }>(
2007                cfg.network_config.builder_port(),
2008                chain_config.map(|c| *c.max_block_size),
2009            )
2010            .await;
2011            builder_tasks.push(task);
2012            cfg.network_config
2013                .set_builder_urls(vec1::vec1![builder_url.clone()]);
2014
2015            // add default storage if none is provided as query module is now required
2016            let mut opt = cfg.api_config.clone();
2017            let temp_dir = if opt.storage_fs.is_none() && opt.storage_sql.is_none() {
2018                let temp_dir = tempfile::tempdir().unwrap();
2019                opt = opt.query_fs(
2020                    Default::default(),
2021                    crate::persistence::fs::Options::new(temp_dir.path().to_path_buf()),
2022                );
2023                Some(temp_dir)
2024            } else {
2025                None
2026            };
2027
2028            let mut nodes = join_all(
2029                izip!(cfg.state, cfg.persistence, cfg.catchup)
2030                    .enumerate()
2031                    .map(|(i, (state, persistence, state_peers))| {
2032                        let opt = opt.clone();
2033                        let cfg = &cfg.network_config;
2034                        let upgrades_map = cfg.upgrades();
2035                        async move {
2036                            if i == 0 {
2037                                opt.serve(|metrics, consumer, storage| {
2038                                    let cfg = cfg.clone();
2039                                    async move {
2040                                        Ok(cfg
2041                                            .init_node(
2042                                                0,
2043                                                state,
2044                                                persistence,
2045                                                Some(state_peers),
2046                                                storage,
2047                                                &*metrics,
2048                                                STAKE_TABLE_CAPACITY_FOR_TEST,
2049                                                consumer,
2050                                                upgrade,
2051                                                upgrades_map,
2052                                            )
2053                                            .await)
2054                                    }
2055                                    .boxed()
2056                                })
2057                                .await
2058                                .unwrap()
2059                            } else {
2060                                cfg.init_node(
2061                                    i,
2062                                    state,
2063                                    persistence,
2064                                    Some(state_peers),
2065                                    None,
2066                                    &NoMetrics,
2067                                    STAKE_TABLE_CAPACITY_FOR_TEST,
2068                                    NullEventConsumer,
2069                                    upgrade,
2070                                    upgrades_map,
2071                                )
2072                                .await
2073                            }
2074                        }
2075                    }),
2076            )
2077            .await;
2078
2079            let handle_0 = &nodes[0];
2080
2081            // Hook the builder(s) up to the event stream from the first node
2082            for builder_task in builder_tasks {
2083                builder_task.start(Box::new(handle_0.event_stream().await));
2084            }
2085
2086            for ctx in &nodes {
2087                ctx.start_consensus().await;
2088            }
2089
2090            let server = nodes.remove(0);
2091            let peers = nodes;
2092
2093            Self {
2094                server,
2095                peers,
2096                cfg: cfg.network_config,
2097                temp_dir,
2098                contracts: cfg.contracts,
2099            }
2100        }
2101
2102        pub async fn stop_consensus(&mut self) {
2103            self.server.shutdown_consensus().await;
2104
2105            for ctx in &mut self.peers {
2106                ctx.shutdown_consensus().await;
2107            }
2108        }
2109    }
2110
2111    /// Test the status API with custom options.
2112    ///
2113    /// The `opt` function can be used to modify the [`Options`] which are used to start the server.
2114    /// By default, the options are the minimal required to run this test (configuring a port and
2115    /// enabling the status API). `opt` may add additional functionality (e.g. adding a query module
2116    /// to test a different initialization path) but should not remove or modify the existing
2117    /// functionality (e.g. removing the status module or changing the port).
2118    pub async fn status_test_helper(opt: impl FnOnce(Options) -> Options) {
2119        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2120        let url = format!("http://localhost:{port}").parse().unwrap();
2121        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2122
2123        let options = opt(Options::with_port(port));
2124        let network_config = TestConfigBuilder::default().build();
2125        let config = TestNetworkConfigBuilder::default()
2126            .api_config(options)
2127            .network_config(network_config)
2128            .build();
2129        let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2130        client.connect(None).await;
2131
2132        // The status API is well tested in the query service repo. Here we are just smoke testing
2133        // that we set it up correctly. Wait for a (non-genesis) block to be sequenced and then
2134        // check the success rate metrics.
2135        while client
2136            .get::<u64>("status/block-height")
2137            .send()
2138            .await
2139            .unwrap()
2140            <= 1
2141        {
2142            sleep(Duration::from_secs(1)).await;
2143        }
2144        let success_rate = client
2145            .get::<f64>("status/success-rate")
2146            .send()
2147            .await
2148            .unwrap();
2149        // If metrics are populating correctly, we should get a finite number. If not, we might get
2150        // NaN or infinity due to division by 0.
2151        assert!(success_rate.is_finite(), "{success_rate}");
2152        // We know at least some views have been successful, since we finalized a block.
2153        assert!(success_rate > 0.0, "{success_rate}");
2154    }
2155
2156    /// Test the submit API with custom options.
2157    ///
2158    /// The `opt` function can be used to modify the [`Options`] which are used to start the server.
2159    /// By default, the options are the minimal required to run this test (configuring a port and
2160    /// enabling the submit API). `opt` may add additional functionality (e.g. adding a query module
2161    /// to test a different initialization path) but should not remove or modify the existing
2162    /// functionality (e.g. removing the submit module or changing the port).
2163    pub async fn submit_test_helper(opt: impl FnOnce(Options) -> Options) {
2164        let txn = Transaction::new(NamespaceId::from(1_u32), vec![1, 2, 3, 4]);
2165
2166        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2167
2168        let url = format!("http://localhost:{port}").parse().unwrap();
2169        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2170
2171        let options = opt(Options::with_port(port).submit(Default::default()));
2172        let network_config = TestConfigBuilder::default().build();
2173        let config = TestNetworkConfigBuilder::default()
2174            .api_config(options)
2175            .network_config(network_config)
2176            .build();
2177        let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2178        let mut events = network.server.event_stream().await;
2179
2180        client.connect(None).await;
2181
2182        let hash = client
2183            .post("submit/submit")
2184            .body_json(&txn)
2185            .unwrap()
2186            .send()
2187            .await
2188            .unwrap();
2189        assert_eq!(txn.commit(), hash);
2190
2191        // Wait for a Decide event containing transaction matching the one we sent
2192        wait_for_decide_on_handle(&mut events, &txn).await;
2193    }
2194
2195    /// Test the state signature API.
2196    pub async fn state_signature_test_helper(opt: impl FnOnce(Options) -> Options) {
2197        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2198
2199        let url = format!("http://localhost:{port}").parse().unwrap();
2200
2201        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2202
2203        let options = opt(Options::with_port(port));
2204        let network_config = TestConfigBuilder::default().build();
2205        let config = TestNetworkConfigBuilder::default()
2206            .api_config(options)
2207            .network_config(network_config)
2208            .build();
2209        let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2210
2211        let mut height: u64;
2212        // Wait for block >=2 appears
2213        // It's waiting for an extra second to make sure that the signature is generated
2214        loop {
2215            height = network.server.decided_leaf().await.height();
2216            sleep(std::time::Duration::from_secs(1)).await;
2217            if height >= 2 {
2218                break;
2219            }
2220        }
2221        // we cannot verify the signature now, because we don't know the stake table
2222        client
2223            .get::<LCV3StateSignatureRequestBody>(&format!("state-signature/block/{height}"))
2224            .send()
2225            .await
2226            .unwrap();
2227    }
2228
2229    /// Test the catchup API with custom options.
2230    ///
2231    /// The `opt` function can be used to modify the [`Options`] which are used to start the server.
2232    /// By default, the options are the minimal required to run this test (configuring a port and
2233    /// enabling the catchup API). `opt` may add additional functionality (e.g. adding a query module
2234    /// to test a different initialization path) but should not remove or modify the existing
2235    /// functionality (e.g. removing the catchup module or changing the port).
2236    pub async fn catchup_test_helper(opt: impl FnOnce(Options) -> Options) {
2237        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2238        let url = format!("http://localhost:{port}").parse().unwrap();
2239        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2240
2241        let options = opt(Options::with_port(port));
2242        let network_config = TestConfigBuilder::default().build();
2243        let config = TestNetworkConfigBuilder::default()
2244            .api_config(options)
2245            .network_config(network_config)
2246            .build();
2247        let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2248        client.connect(None).await;
2249
2250        // Wait for a few blocks to be decided.
2251        let mut events = network.server.event_stream().await;
2252        loop {
2253            if let Event {
2254                event: EventType::Decide { leaf_chain, .. },
2255                ..
2256            } = events.next().await.unwrap()
2257                && leaf_chain
2258                    .iter()
2259                    .any(|LeafInfo { leaf, .. }| leaf.block_header().height() > 2)
2260            {
2261                break;
2262            }
2263        }
2264
2265        // Stop consensus running on the node so we freeze the decided and undecided states.
2266        // We'll let it go out of scope here since it's a write lock.
2267        {
2268            network.server.shutdown_consensus().await;
2269        }
2270
2271        // Undecided fee state: absent account.
2272        let leaf = network.server.decided_leaf().await;
2273        let height = leaf.height() + 1;
2274        let view = leaf.view_number() + 1;
2275        let res = client
2276            .get::<AccountQueryData>(&format!(
2277                "catchup/{height}/{}/account/{:x}",
2278                view.u64(),
2279                Address::default()
2280            ))
2281            .send()
2282            .await
2283            .unwrap();
2284        assert_eq!(res.balance, U256::ZERO);
2285        assert_eq!(
2286            res.proof
2287                .verify(
2288                    &network
2289                        .server
2290                        .state(view)
2291                        .await
2292                        .unwrap()
2293                        .fee_merkle_tree
2294                        .commitment()
2295                )
2296                .unwrap(),
2297            U256::ZERO,
2298        );
2299
2300        // Undecided block state.
2301        let res = client
2302            .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
2303            .send()
2304            .await
2305            .unwrap();
2306        let root = &network
2307            .server
2308            .state(view)
2309            .await
2310            .unwrap()
2311            .block_merkle_tree
2312            .commitment();
2313        BlockMerkleTree::verify(root, root.size() - 1, res)
2314            .unwrap()
2315            .unwrap();
2316    }
2317
2318    pub async fn spawn_dishonest_peer_catchup_api() -> anyhow::Result<(Url, JoinHandle<()>)> {
2319        let toml = toml::from_str::<toml::Value>(include_str!("../api/catchup.toml")).unwrap();
2320        let mut api =
2321            Api::<(), hotshot_query_service::Error, SequencerApiVersion>::new(toml).unwrap();
2322
2323        api.get("account", |_req, _state: &()| {
2324            async move {
2325                Result::<AccountQueryData, _>::Err(hotshot_query_service::Error::catch_all(
2326                    StatusCode::BAD_REQUEST,
2327                    "no account found".to_string(),
2328                ))
2329            }
2330            .boxed()
2331        })?
2332        .get("blocks", |_req, _state| {
2333            async move {
2334                Result::<BlocksFrontier, _>::Err(hotshot_query_service::Error::catch_all(
2335                    StatusCode::BAD_REQUEST,
2336                    "no block found".to_string(),
2337                ))
2338            }
2339            .boxed()
2340        })?
2341        .get("chainconfig", |_req, _state| {
2342            async move {
2343                Result::<ChainConfig, _>::Ok(ChainConfig {
2344                    max_block_size: 300.into(),
2345                    base_fee: 1.into(),
2346                    fee_recipient: "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
2347                        .parse()
2348                        .unwrap(),
2349                    ..Default::default()
2350                })
2351            }
2352            .boxed()
2353        })?
2354        .get("leafchain", |_req, _state| {
2355            async move {
2356                Result::<Vec<Leaf2>, _>::Err(hotshot_query_service::Error::catch_all(
2357                    StatusCode::BAD_REQUEST,
2358                    "No leafchain found".to_string(),
2359                ))
2360            }
2361            .boxed()
2362        })?;
2363
2364        let mut app = App::<_, hotshot_query_service::Error>::with_state(());
2365        app.with_version(env!("CARGO_PKG_VERSION").parse().unwrap());
2366
2367        app.register_module::<_, _>("catchup", api).unwrap();
2368
2369        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2370        let url: Url = Url::parse(&format!("http://localhost:{port}")).unwrap();
2371
2372        let handle = spawn({
2373            let url = url.clone();
2374            async move {
2375                let _ = app.serve(url, SequencerApiVersion::instance()).await;
2376            }
2377        });
2378
2379        Ok((url, handle))
2380    }
2381}
2382
2383#[cfg(test)]
2384mod api_tests {
2385    use std::{fmt::Debug, marker::PhantomData};
2386
2387    use committable::Committable;
2388    use data_source::testing::TestableSequencerDataSource;
2389    use espresso_types::{
2390        Header, Leaf2, MOCK_SEQUENCER_VERSIONS, NamespaceId, NamespaceProofQueryData,
2391        ValidatedState,
2392        traits::{EventConsumer, PersistenceOptions},
2393    };
2394    use futures::{future, stream::StreamExt};
2395    use hotshot_example_types::node_types::TEST_VERSIONS;
2396    use hotshot_query_service::availability::{
2397        AvailabilityDataSource, BlockQueryData, VidCommonQueryData,
2398    };
2399    use hotshot_types::{
2400        data::{
2401            DaProposal2, EpochNumber, QuorumProposal2, QuorumProposalWrapper, VidCommitment,
2402            VidDisperseShare, ns_table::parse_ns_table, vid_disperse::AvidMDisperseShare,
2403        },
2404        event::LeafInfo,
2405        message::Proposal,
2406        simple_certificate::{CertificatePair, QuorumCertificate2},
2407        traits::{EncodeBytes, signature_key::SignatureKey},
2408        utils::EpochTransitionIndicator,
2409        vid::avidm::{AvidMScheme, init_avidm_param},
2410    };
2411    use surf_disco::Client;
2412    use test_helpers::{
2413        TestNetwork, TestNetworkConfigBuilder, catchup_test_helper, state_signature_test_helper,
2414        status_test_helper, submit_test_helper,
2415    };
2416    use test_utils::reserve_tcp_port;
2417    use tide_disco::error::ServerError;
2418    use vbs::version::StaticVersion;
2419
2420    use super::{update::ApiEventConsumer, *};
2421    use crate::{
2422        network,
2423        persistence::no_storage::NoStorage,
2424        testing::{TestConfigBuilder, wait_for_decide_on_handle},
2425    };
2426
2427    #[rstest_reuse::template]
2428    #[rstest::rstest]
2429    #[case(PhantomData::<crate::api::sql::DataSource>)]
2430    #[case(PhantomData::<crate::api::fs::DataSource>)]
2431    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2432    pub fn testable_sequencer_data_source<D: TestableSequencerDataSource>(
2433        #[case] _d: PhantomData<D>,
2434    ) {
2435    }
2436
2437    #[rstest_reuse::apply(testable_sequencer_data_source)]
2438    pub(crate) async fn submit_test_with_query_module<D: TestableSequencerDataSource>(
2439        _d: PhantomData<D>,
2440    ) {
2441        let storage = D::create_storage().await;
2442        submit_test_helper(|opt| D::options(&storage, opt)).await
2443    }
2444
2445    #[rstest_reuse::apply(testable_sequencer_data_source)]
2446    pub(crate) async fn status_test_with_query_module<D: TestableSequencerDataSource>(
2447        _d: PhantomData<D>,
2448    ) {
2449        let storage = D::create_storage().await;
2450        status_test_helper(|opt| D::options(&storage, opt)).await
2451    }
2452
2453    #[rstest_reuse::apply(testable_sequencer_data_source)]
2454    pub(crate) async fn state_signature_test_with_query_module<D: TestableSequencerDataSource>(
2455        _d: PhantomData<D>,
2456    ) {
2457        let storage = D::create_storage().await;
2458        state_signature_test_helper(|opt| D::options(&storage, opt)).await
2459    }
2460
2461    #[rstest_reuse::apply(testable_sequencer_data_source)]
2462    pub(crate) async fn test_namespace_query<D: TestableSequencerDataSource>(_d: PhantomData<D>) {
2463        // Arbitrary transaction, arbitrary namespace ID
2464        let ns_id = NamespaceId::from(42_u32);
2465        let txn = Transaction::new(ns_id, vec![1, 2, 3, 4]);
2466
2467        // Start query service.
2468        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2469        let storage = D::create_storage().await;
2470        let network_config = TestConfigBuilder::default().build();
2471        let config = TestNetworkConfigBuilder::default()
2472            .api_config(D::options(&storage, Options::with_port(port)).submit(Default::default()))
2473            .network_config(network_config)
2474            .build();
2475        let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2476        let mut events = network.server.event_stream().await;
2477
2478        // Connect client.
2479        let client: Client<ServerError, StaticVersion<0, 1>> =
2480            Client::new(format!("http://localhost:{port}").parse().unwrap());
2481        client.connect(None).await;
2482
2483        let hash = client
2484            .post("submit/submit")
2485            .body_json(&txn)
2486            .unwrap()
2487            .send()
2488            .await
2489            .unwrap();
2490        assert_eq!(txn.commit(), hash);
2491
2492        // Wait for a Decide event containing transaction matching the one we sent
2493        let block_height = wait_for_decide_on_handle(&mut events, &txn).await.0 as usize;
2494        tracing::info!(block_height, "transaction sequenced");
2495
2496        // Submit a second transaction for range queries.
2497        let txn2 = Transaction::new(ns_id, vec![5, 6, 7, 8]);
2498        client
2499            .post::<Commitment<Transaction>>("submit/submit")
2500            .body_json(&txn2)
2501            .unwrap()
2502            .send()
2503            .await
2504            .unwrap();
2505        let block_height2 = wait_for_decide_on_handle(&mut events, &txn2).await.0 as usize;
2506        tracing::info!(block_height2, "transaction sequenced");
2507
2508        // Wait for the query service to update to this block height.
2509        client
2510            .socket(&format!("availability/stream/blocks/{block_height2}"))
2511            .subscribe::<BlockQueryData<SeqTypes>>()
2512            .await
2513            .unwrap()
2514            .next()
2515            .await
2516            .unwrap()
2517            .unwrap();
2518
2519        let mut found_txn = false;
2520        let mut found_empty_block = false;
2521        for block_num in 0..=block_height {
2522            let header: Header = client
2523                .get(&format!("availability/header/{block_num}"))
2524                .send()
2525                .await
2526                .unwrap();
2527            let ns_query_res: NamespaceProofQueryData = client
2528                .get(&format!("availability/block/{block_num}/namespace/{ns_id}"))
2529                .send()
2530                .await
2531                .unwrap();
2532
2533            // Check other means of querying the same proof.
2534            assert_eq!(
2535                ns_query_res,
2536                client
2537                    .get(&format!(
2538                        "availability/block/hash/{}/namespace/{ns_id}",
2539                        header.commit()
2540                    ))
2541                    .send()
2542                    .await
2543                    .unwrap()
2544            );
2545            assert_eq!(
2546                ns_query_res,
2547                client
2548                    .get(&format!(
2549                        "availability/block/payload-hash/{}/namespace/{ns_id}",
2550                        header.payload_commitment()
2551                    ))
2552                    .send()
2553                    .await
2554                    .unwrap()
2555            );
2556
2557            // Verify namespace proof if present
2558            if let Some(ns_proof) = ns_query_res.proof {
2559                let vid_common: VidCommonQueryData<SeqTypes> = client
2560                    .get(&format!("availability/vid/common/{block_num}"))
2561                    .send()
2562                    .await
2563                    .unwrap();
2564                ns_proof
2565                    .verify(
2566                        header.ns_table(),
2567                        &header.payload_commitment(),
2568                        vid_common.common(),
2569                    )
2570                    .unwrap();
2571            } else {
2572                // Namespace proof should be present if ns_id exists in ns_table
2573                assert!(header.ns_table().find_ns_id(&ns_id).is_none());
2574                assert!(ns_query_res.transactions.is_empty());
2575            }
2576
2577            found_empty_block = found_empty_block || ns_query_res.transactions.is_empty();
2578
2579            for txn in ns_query_res.transactions {
2580                if txn.commit() == hash {
2581                    // Ensure that we validate an inclusion proof
2582                    found_txn = true;
2583                }
2584            }
2585        }
2586        assert!(found_txn);
2587        assert!(found_empty_block);
2588
2589        // Test range query.
2590        let ns_proofs: Vec<NamespaceProofQueryData> = client
2591            .get(&format!(
2592                "availability/block/{block_height}/{}/namespace/{ns_id}",
2593                block_height2 + 1
2594            ))
2595            .send()
2596            .await
2597            .unwrap();
2598        assert_eq!(ns_proofs.len(), block_height2 + 1 - block_height);
2599        assert_eq!(&ns_proofs[0].transactions, std::slice::from_ref(&txn));
2600        assert_eq!(
2601            &ns_proofs[ns_proofs.len() - 1].transactions,
2602            std::slice::from_ref(&txn2)
2603        );
2604        for proof in &ns_proofs[1..ns_proofs.len() - 1] {
2605            assert_eq!(proof.transactions, &[]);
2606        }
2607    }
2608
2609    #[rstest_reuse::apply(testable_sequencer_data_source)]
2610    pub(crate) async fn catchup_test_with_query_module<D: TestableSequencerDataSource>(
2611        _d: PhantomData<D>,
2612    ) {
2613        let storage = D::create_storage().await;
2614        catchup_test_helper(|opt| D::options(&storage, opt)).await
2615    }
2616
2617    #[rstest_reuse::apply(testable_sequencer_data_source)]
2618    pub async fn test_non_consecutive_decide_with_failing_event_consumer<D>(_d: PhantomData<D>)
2619    where
2620        D: TestableSequencerDataSource + Debug + 'static,
2621    {
2622        #[derive(Clone, Copy, Debug)]
2623        struct FailConsumer;
2624
2625        #[async_trait]
2626        impl EventConsumer for FailConsumer {
2627            async fn handle_event(&self, _: &Event<SeqTypes>) -> anyhow::Result<()> {
2628                bail!("mock error injection");
2629            }
2630        }
2631
2632        let (pubkey, privkey) = PubKey::generated_from_seed_indexed([0; 32], 1);
2633
2634        let storage = D::create_storage().await;
2635        let persistence = D::persistence_options(&storage).create().await.unwrap();
2636        let data_source: Arc<StorageState<network::Memory, NoStorage, _>> =
2637            Arc::new(StorageState::new(
2638                D::create(D::persistence_options(&storage), Default::default(), false)
2639                    .await
2640                    .unwrap(),
2641                ApiState::new(future::pending()),
2642            ));
2643
2644        // Create two non-consecutive leaf chains.
2645        let mut chain1 = vec![];
2646
2647        let genesis = Leaf2::genesis(
2648            &Default::default(),
2649            &NodeState::mock(),
2650            TEST_VERSIONS.test.base,
2651        )
2652        .await;
2653        let payload = genesis.block_payload().unwrap();
2654        let payload_bytes_arc = payload.encode();
2655
2656        let avidm_param = init_avidm_param(2).unwrap();
2657        let weights = vec![1u32; 2];
2658
2659        let ns_table = parse_ns_table(payload.byte_len().as_usize(), &payload.ns_table().encode());
2660        let (payload_commitment, shares) =
2661            AvidMScheme::ns_disperse(&avidm_param, &weights, &payload_bytes_arc, ns_table).unwrap();
2662
2663        let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
2664            proposal: QuorumProposal2::<SeqTypes> {
2665                block_header: genesis.block_header().clone(),
2666                view_number: ViewNumber::genesis(),
2667                justify_qc: QuorumCertificate2::genesis(
2668                    &ValidatedState::default(),
2669                    &NodeState::mock(),
2670                    MOCK_SEQUENCER_VERSIONS,
2671                )
2672                .await,
2673                upgrade_certificate: None,
2674                view_change_evidence: None,
2675                next_drb_result: None,
2676                next_epoch_justify_qc: None,
2677                epoch: None,
2678                state_cert: None,
2679            },
2680        };
2681        let mut qc = QuorumCertificate2::genesis(
2682            &ValidatedState::default(),
2683            &NodeState::mock(),
2684            MOCK_SEQUENCER_VERSIONS,
2685        )
2686        .await;
2687
2688        let mut justify_qc = qc.clone();
2689        for i in 0..5 {
2690            *quorum_proposal.proposal.block_header.height_mut() = i;
2691            quorum_proposal.proposal.view_number = ViewNumber::new(i);
2692            quorum_proposal.proposal.justify_qc = justify_qc;
2693            let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
2694            qc.view_number = leaf.view_number();
2695            qc.data.leaf_commit = Committable::commit(&leaf);
2696            justify_qc = qc.clone();
2697            chain1.push((leaf.clone(), CertificatePair::non_epoch_change(qc.clone())));
2698
2699            // Include a quorum proposal for each leaf.
2700            let quorum_proposal_signature =
2701                PubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2702                    .expect("Failed to sign quorum_proposal");
2703            persistence
2704                .append_quorum_proposal2(&Proposal {
2705                    data: quorum_proposal.clone(),
2706                    signature: quorum_proposal_signature,
2707                    _pd: Default::default(),
2708                })
2709                .await
2710                .unwrap();
2711
2712            // Include VID information for each leaf.
2713            let share: VidDisperseShare<SeqTypes> = AvidMDisperseShare {
2714                view_number: leaf.view_number(),
2715                payload_commitment,
2716                share: shares[0].clone(),
2717                recipient_key: pubkey,
2718                epoch: Some(EpochNumber::new(0)),
2719                target_epoch: Some(EpochNumber::new(0)),
2720                common: avidm_param.clone(),
2721            }
2722            .into();
2723
2724            persistence
2725                .append_vid(&share.to_proposal(&privkey).unwrap())
2726                .await
2727                .unwrap();
2728
2729            // Include payload information for each leaf.
2730            let block_payload_signature =
2731                PubKey::sign(&privkey, &payload_bytes_arc).expect("Failed to sign block payload");
2732            let da_proposal_inner = DaProposal2::<SeqTypes> {
2733                encoded_transactions: payload_bytes_arc.clone(),
2734                metadata: payload.ns_table().clone(),
2735                view_number: leaf.view_number(),
2736                epoch: Some(EpochNumber::new(0)),
2737                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
2738            };
2739            let da_proposal = Proposal {
2740                data: da_proposal_inner,
2741                signature: block_payload_signature,
2742                _pd: Default::default(),
2743            };
2744            persistence
2745                .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
2746                .await
2747                .unwrap();
2748        }
2749        // Split into two chains.
2750        let mut chain2 = chain1.split_off(2);
2751        // Make non-consecutive (i.e. we skip a leaf).
2752        chain2.remove(0);
2753
2754        // Decide 2 leaves, but fail in event processing.
2755        let leaf_chain = chain1
2756            .iter()
2757            .map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
2758            .collect::<Vec<_>>();
2759        tracing::info!("decide with event handling failure");
2760        persistence
2761            .append_decided_leaves(
2762                ViewNumber::new(1),
2763                leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
2764                None,
2765                &FailConsumer,
2766            )
2767            .await
2768            .unwrap();
2769
2770        // Now decide remaining leaves successfully. We should now process a decide event for all
2771        // the leaves.
2772        let consumer = ApiEventConsumer::from(data_source.clone());
2773        let leaf_chain = chain2
2774            .iter()
2775            .map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
2776            .collect::<Vec<_>>();
2777        tracing::info!("decide successfully");
2778        persistence
2779            .append_decided_leaves(
2780                ViewNumber::new(4),
2781                leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
2782                None,
2783                &consumer,
2784            )
2785            .await
2786            .unwrap();
2787
2788        // Check that the leaves were moved to archive storage, along with payload and VID
2789        // information.
2790        for (leaf, cert) in chain1.iter().chain(&chain2) {
2791            tracing::info!(height = leaf.height(), "check archive");
2792            let qd = data_source.get_leaf(leaf.height() as usize).await.await;
2793            let stored_leaf: Leaf2 = qd.leaf().clone();
2794            let stored_qc = qd.qc().clone();
2795            assert_eq!(&stored_leaf, leaf);
2796            assert_eq!(&stored_qc, cert.qc());
2797
2798            data_source
2799                .get_block(leaf.height() as usize)
2800                .await
2801                .try_resolve()
2802                .ok()
2803                .unwrap();
2804            data_source
2805                .get_vid_common(leaf.height() as usize)
2806                .await
2807                .try_resolve()
2808                .ok()
2809                .unwrap();
2810
2811            // Check that all data has been garbage collected for the decided views.
2812            assert!(
2813                persistence
2814                    .load_da_proposal(leaf.view_number())
2815                    .await
2816                    .unwrap()
2817                    .is_none()
2818            );
2819            assert!(
2820                persistence
2821                    .load_vid_share(leaf.view_number())
2822                    .await
2823                    .unwrap()
2824                    .is_none()
2825            );
2826            assert!(
2827                persistence
2828                    .load_quorum_proposal(leaf.view_number())
2829                    .await
2830                    .is_err()
2831            );
2832        }
2833
2834        // Check that data has _not_ been garbage collected for the missing view.
2835        assert!(
2836            persistence
2837                .load_da_proposal(ViewNumber::new(2))
2838                .await
2839                .unwrap()
2840                .is_some()
2841        );
2842        assert!(
2843            persistence
2844                .load_vid_share(ViewNumber::new(2))
2845                .await
2846                .unwrap()
2847                .is_some()
2848        );
2849        persistence
2850            .load_quorum_proposal(ViewNumber::new(2))
2851            .await
2852            .unwrap();
2853    }
2854
2855    #[rstest_reuse::apply(testable_sequencer_data_source)]
2856    pub async fn test_decide_missing_data<D>(_d: PhantomData<D>)
2857    where
2858        D: TestableSequencerDataSource + Debug + 'static,
2859    {
2860        use ark_serialize::CanonicalDeserialize;
2861
2862        let storage = D::create_storage().await;
2863        let persistence = D::persistence_options(&storage).create().await.unwrap();
2864        let data_source: Arc<StorageState<network::Memory, NoStorage, _>> =
2865            Arc::new(StorageState::new(
2866                D::create(D::persistence_options(&storage), Default::default(), false)
2867                    .await
2868                    .unwrap(),
2869                ApiState::new(future::pending()),
2870            ));
2871        let consumer = ApiEventConsumer::from(data_source.clone());
2872
2873        let mut qc = QuorumCertificate2::genesis(
2874            &ValidatedState::default(),
2875            &NodeState::mock(),
2876            MOCK_SEQUENCER_VERSIONS,
2877        )
2878        .await;
2879        let leaf = Leaf2::genesis(
2880            &ValidatedState::default(),
2881            &NodeState::mock(),
2882            TEST_VERSIONS.test.base,
2883        )
2884        .await;
2885
2886        // Append the genesis leaf. We don't use this for the test, because the update function will
2887        // automatically fill in the missing data for genesis. We just append this to get into a
2888        // consistent state to then append the leaf from view 1, which will have missing data.
2889        tracing::info!(?leaf, ?qc, "decide genesis leaf");
2890        persistence
2891            .append_decided_leaves(
2892                leaf.view_number(),
2893                [(
2894                    &leaf_info(leaf.clone()),
2895                    CertificatePair::non_epoch_change(qc.clone()),
2896                )],
2897                None,
2898                &consumer,
2899            )
2900            .await
2901            .unwrap();
2902
2903        // Create another leaf, with missing data. We have to use a different payload commitment,
2904        // otherwise the database will be able to combine the empty payload from the genesis block
2905        // with this header, and the payload will not actually be missing.
2906        let mut block_header = leaf.block_header().clone();
2907        *block_header.height_mut() += 1;
2908        *block_header.payload_commitment_mut() = VidCommitment::V1(
2909            CanonicalDeserialize::deserialize_uncompressed_unchecked([1u8; 32].as_slice()).unwrap(),
2910        );
2911        let qp = QuorumProposalWrapper {
2912            proposal: QuorumProposal2 {
2913                block_header,
2914                view_number: leaf.view_number() + 1,
2915                justify_qc: qc.clone(),
2916                upgrade_certificate: None,
2917                view_change_evidence: None,
2918                next_drb_result: None,
2919                next_epoch_justify_qc: None,
2920                epoch: None,
2921                state_cert: None,
2922            },
2923        };
2924
2925        let leaf = Leaf2::from_quorum_proposal(&qp);
2926        qc.view_number = leaf.view_number();
2927        qc.data.leaf_commit = Committable::commit(&leaf);
2928
2929        // Decide a leaf without the corresponding payload or VID.
2930        tracing::info!(?leaf, ?qc, "append leaf 1");
2931        persistence
2932            .append_decided_leaves(
2933                leaf.view_number(),
2934                [(
2935                    &leaf_info(leaf.clone()),
2936                    CertificatePair::non_epoch_change(qc),
2937                )],
2938                None,
2939                &consumer,
2940            )
2941            .await
2942            .unwrap();
2943
2944        // Check that we still processed the leaf.
2945        assert_eq!(leaf, data_source.get_leaf(1).await.await.leaf().clone());
2946        assert!(data_source.get_vid_common(1).await.is_pending());
2947        assert!(data_source.get_block(1).await.is_pending());
2948    }
2949
2950    fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
2951        LeafInfo {
2952            leaf,
2953            vid_share: None,
2954            state: Default::default(),
2955            delta: None,
2956            state_cert: None,
2957        }
2958    }
2959}
2960
2961#[cfg(test)]
2962mod test {
2963    use std::{
2964        collections::{HashMap, HashSet},
2965        time::Duration,
2966    };
2967
2968    use ::light_client::{
2969        consensus::{
2970            header::HeaderProof,
2971            leaf::{LeafProof, LeafProofHint},
2972            payload::PayloadProof,
2973        },
2974        testing::{EpochChangeQuorum, LEGACY_VERSION},
2975    };
2976    use alloy::{
2977        eips::BlockId,
2978        network::EthereumWallet,
2979        primitives::{Address, U256},
2980        providers::ProviderBuilder,
2981    };
2982    use async_lock::Mutex;
2983    use committable::{Commitment, Committable};
2984    use espresso_contract_deployer::{
2985        Contract, Contracts, builder::DeployerArgsBuilder,
2986        network_config::light_client_genesis_from_stake_table, upgrade_stake_table_v2,
2987    };
2988    use espresso_types::{
2989        ADVZNamespaceProofQueryData, FeeAmount, Header, L1Client, L1ClientOptions,
2990        MOCK_SEQUENCER_VERSIONS, NamespaceId, NamespaceProofQueryData, NsProof,
2991        RegisteredValidatorMap, RewardDistributor, StakeTableState, StateCertQueryDataV1,
2992        StateCertQueryDataV2, ValidatedState, ValidatorLeaderCounts,
2993        config::PublicHotShotConfig,
2994        traits::{MembershipPersistence, NullEventConsumer, PersistenceOptions},
2995        v0_3::{COMMISSION_BASIS_POINTS, Fetcher, RewardAmount, RewardMerkleProofV1},
2996        v0_4::{RewardAccountV2, RewardMerkleProofV2},
2997        validators_from_l1_events,
2998    };
2999    use futures::{
3000        future::{self, join_all, try_join_all},
3001        stream::{StreamExt, TryStreamExt},
3002        try_join,
3003    };
3004    use hotshot::types::EventType;
3005    use hotshot_contract_adapter::{
3006        reward::RewardClaimInput,
3007        sol_types::{EspToken, StakeTableV2},
3008        stake_table::StakeTableContractVersion,
3009    };
3010    use hotshot_query_service::{
3011        availability::{
3012            BlockQueryData, BlockSummaryQueryData, LeafQueryData, TransactionQueryData,
3013            VidCommonQueryData,
3014        },
3015        data_source::{
3016            VersionedDataSource,
3017            sql::Config,
3018            storage::{SqlStorage, StorageConnectionType},
3019        },
3020        explorer::TransactionSummariesResponse,
3021        types::HeightIndexed,
3022    };
3023    use hotshot_types::{
3024        ValidatorConfig,
3025        data::EpochNumber,
3026        event::LeafInfo,
3027        traits::{block_contents::BlockHeader, election::Membership, metrics::NoMetrics},
3028        utils::epoch_from_block_number,
3029    };
3030    use jf_merkle_tree_compat::{
3031        MerkleTreeScheme,
3032        prelude::{MerkleProof, Sha3Node},
3033    };
3034    use pretty_assertions::assert_matches;
3035    use rand::seq::SliceRandom;
3036    use rstest::rstest;
3037    use staking_cli::{demo::DelegationConfig, fetch_commission, update_commission};
3038    use surf_disco::Client;
3039    use test_helpers::{
3040        TestNetwork, TestNetworkConfigBuilder, catchup_test_helper, state_signature_test_helper,
3041        status_test_helper, submit_test_helper,
3042    };
3043    use test_utils::reserve_tcp_port;
3044    use tide_disco::{
3045        Error, StatusCode, Url, app::AppHealth, error::ServerError, healthcheck::HealthStatus,
3046    };
3047    use tokio::time::sleep;
3048    use vbs::version::StaticVersion;
3049    use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_VERSION, FEE_VERSION, Upgrade, version};
3050
3051    use self::{
3052        data_source::testing::TestableSequencerDataSource, options::HotshotEvents,
3053        sql::DataSource as SqlDataSource,
3054    };
3055    use super::*;
3056
3057    async fn wait_until_block_height(
3058        client: &Client<ServerError, StaticVersion<0, 1>>,
3059        endpoint: &str,
3060        height: u64,
3061    ) {
3062        for _retry in 0.. {
3063            let bh = client
3064                .get::<u64>(endpoint)
3065                .send()
3066                .await
3067                .expect("block height not found");
3068
3069            if bh >= height {
3070                return;
3071            }
3072            sleep(Duration::from_secs(3)).await;
3073        }
3074    }
3075    use crate::{
3076        api::{
3077            options::Query,
3078            sql::{impl_testable_data_source::tmp_options, reconstruct_state},
3079            test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3080        },
3081        catchup::{NullStateCatchup, StatePeers},
3082        persistence,
3083        persistence::no_storage,
3084        testing::{TestConfig, TestConfigBuilder, wait_for_decide_on_handle, wait_for_epochs},
3085    };
3086
3087    const POS_V3: Upgrade = Upgrade::trivial(version(0, 3));
3088    const POS_V4: Upgrade = Upgrade::trivial(version(0, 4));
3089
3090    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3091    async fn test_healthcheck() {
3092        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3093        let url = format!("http://localhost:{port}").parse().unwrap();
3094        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
3095        let options = Options::with_port(port);
3096        let network_config = TestConfigBuilder::default().build();
3097        let config = TestNetworkConfigBuilder::<5, _, NullStateCatchup>::default()
3098            .api_config(options)
3099            .network_config(network_config)
3100            .build();
3101        let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3102
3103        client.connect(None).await;
3104        let health = client.get::<AppHealth>("healthcheck").send().await.unwrap();
3105        assert_eq!(health.status, HealthStatus::Available);
3106    }
3107
3108    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3109    async fn status_test_without_query_module() {
3110        status_test_helper(|opt| opt).await
3111    }
3112
3113    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3114    async fn submit_test_without_query_module() {
3115        submit_test_helper(|opt| opt).await
3116    }
3117
3118    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3119    async fn state_signature_test_without_query_module() {
3120        state_signature_test_helper(|opt| opt).await
3121    }
3122
3123    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3124    async fn catchup_test_without_query_module() {
3125        catchup_test_helper(|opt| opt).await
3126    }
3127
3128    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3129    async fn test_leaf_only_data_source() {
3130        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3131
3132        let storage = SqlDataSource::create_storage().await;
3133        let options =
3134            SqlDataSource::leaf_only_ds_options(&storage, Options::with_port(port)).unwrap();
3135
3136        let network_config = TestConfigBuilder::default().build();
3137        let config = TestNetworkConfigBuilder::default()
3138            .api_config(options)
3139            .network_config(network_config)
3140            .build();
3141        let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3142        let url = format!("http://localhost:{port}").parse().unwrap();
3143        let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
3144
3145        tracing::info!("waiting for blocks");
3146        client.connect(Some(Duration::from_secs(15))).await;
3147        // Wait until some blocks have been decided.
3148
3149        let account = TestConfig::<5>::builder_key().fee_account();
3150
3151        let _headers = client
3152            .socket("availability/stream/headers/0")
3153            .subscribe::<Header>()
3154            .await
3155            .unwrap()
3156            .take(10)
3157            .try_collect::<Vec<_>>()
3158            .await
3159            .unwrap();
3160
3161        for i in 1..5 {
3162            let leaf = client
3163                .get::<LeafQueryData<SeqTypes>>(&format!("availability/leaf/{i}"))
3164                .send()
3165                .await
3166                .unwrap();
3167
3168            assert_eq!(leaf.height(), i);
3169
3170            let header = client
3171                .get::<Header>(&format!("availability/header/{i}"))
3172                .send()
3173                .await
3174                .unwrap();
3175
3176            assert_eq!(header.height(), i);
3177
3178            let vid = client
3179                .get::<VidCommonQueryData<SeqTypes>>(&format!("availability/vid/common/{i}"))
3180                .send()
3181                .await
3182                .unwrap();
3183
3184            assert_eq!(vid.height(), i);
3185
3186            client
3187                .get::<MerkleProof<Commitment<Header>, u64, Sha3Node, 3>>(&format!(
3188                    "block-state/{i}/{}",
3189                    i - 1
3190                ))
3191                .send()
3192                .await
3193                .unwrap();
3194
3195            client
3196                .get::<MerkleProof<FeeAmount, FeeAccount, Sha3Node, 256>>(&format!(
3197                    "fee-state/{}/{}",
3198                    i + 1,
3199                    account
3200                ))
3201                .send()
3202                .await
3203                .unwrap();
3204        }
3205
3206        // This would fail even though we have processed atleast 10 leaves
3207        // this is because light weight nodes only support leaves, headers and VID
3208        client
3209            .get::<BlockQueryData<SeqTypes>>("availability/block/1")
3210            .send()
3211            .await
3212            .unwrap_err();
3213    }
3214
3215    async fn run_catchup_test(url_suffix: &str) {
3216        // Start a sequencer network, using the query service for catchup.
3217        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3218        const NUM_NODES: usize = 5;
3219
3220        let url: url::Url = format!("http://localhost:{port}{url_suffix}")
3221            .parse()
3222            .unwrap();
3223
3224        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3225            .api_config(Options::with_port(port))
3226            .network_config(TestConfigBuilder::default().build())
3227            .catchups(std::array::from_fn(|_| {
3228                StatePeers::<StaticVersion<0, 1>>::from_urls(
3229                    vec![url.clone()],
3230                    Default::default(),
3231                    Duration::from_secs(2),
3232                    &NoMetrics,
3233                )
3234            }))
3235            .build();
3236        let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3237
3238        // Wait for replica 0 to reach a (non-genesis) decide, before disconnecting it.
3239        let mut events = network.peers[0].event_stream().await;
3240        loop {
3241            let event = events.next().await.unwrap();
3242            let EventType::Decide { leaf_chain, .. } = event.event else {
3243                continue;
3244            };
3245            if leaf_chain[0].leaf.height() > 0 {
3246                break;
3247            }
3248        }
3249
3250        // Shut down and restart replica 0. We don't just stop consensus and restart it; we fully
3251        // drop the node and recreate it so it loses all of its temporary state and starts off from
3252        // genesis. It should be able to catch up by listening to proposals and then rebuild its
3253        // state from its peers.
3254        tracing::info!("shutting down node");
3255        network.peers.remove(0);
3256
3257        // Wait for a few blocks to pass while the node is down, so it falls behind.
3258        network
3259            .server
3260            .event_stream()
3261            .await
3262            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3263            .take(3)
3264            .collect::<Vec<_>>()
3265            .await;
3266
3267        tracing::info!("restarting node");
3268        let node = network
3269            .cfg
3270            .init_node(
3271                1,
3272                ValidatedState::default(),
3273                no_storage::Options,
3274                Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
3275                    vec![url],
3276                    Default::default(),
3277                    Duration::from_secs(2),
3278                    &NoMetrics,
3279                )),
3280                None,
3281                &NoMetrics,
3282                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3283                NullEventConsumer,
3284                MOCK_SEQUENCER_VERSIONS,
3285                Default::default(),
3286            )
3287            .await;
3288        let mut events = node.event_stream().await;
3289
3290        // Wait for a (non-genesis) block proposed by each node, to prove that the lagging node has
3291        // caught up and all nodes are in sync.
3292        let mut proposers = [false; NUM_NODES];
3293        loop {
3294            let event = events.next().await.unwrap();
3295            let EventType::Decide { leaf_chain, .. } = event.event else {
3296                continue;
3297            };
3298            for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
3299                let height = leaf.height();
3300                let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
3301                if height == 0 {
3302                    continue;
3303                }
3304
3305                tracing::info!(
3306                    "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
3307                );
3308                proposers[leaf_builder] = true;
3309            }
3310
3311            if proposers.iter().all(|has_proposed| *has_proposed) {
3312                break;
3313            }
3314        }
3315    }
3316
3317    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3318    async fn test_catchup() {
3319        run_catchup_test("").await;
3320    }
3321
3322    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3323    async fn test_catchup_v0() {
3324        run_catchup_test("/v0").await;
3325    }
3326
3327    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3328    async fn test_catchup_v1() {
3329        run_catchup_test("/v1").await;
3330    }
3331
3332    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3333    async fn test_catchup_no_state_peers() {
3334        // Start a sequencer network, using the query service for catchup.
3335        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3336        const NUM_NODES: usize = 5;
3337        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3338            .api_config(Options::with_port(port))
3339            .network_config(TestConfigBuilder::default().build())
3340            .build();
3341        let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3342
3343        // Wait for replica 0 to reach a (non-genesis) decide, before disconnecting it.
3344        let mut events = network.peers[0].event_stream().await;
3345        loop {
3346            let event = events.next().await.unwrap();
3347            let EventType::Decide { leaf_chain, .. } = event.event else {
3348                continue;
3349            };
3350            if leaf_chain[0].leaf.height() > 0 {
3351                break;
3352            }
3353        }
3354
3355        // Shut down and restart replica 0. We don't just stop consensus and restart it; we fully
3356        // drop the node and recreate it so it loses all of its temporary state and starts off from
3357        // genesis. It should be able to catch up by listening to proposals and then rebuild its
3358        // state from its peers.
3359        tracing::info!("shutting down node");
3360        network.peers.remove(0);
3361
3362        // Wait for a few blocks to pass while the node is down, so it falls behind.
3363        network
3364            .server
3365            .event_stream()
3366            .await
3367            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3368            .take(3)
3369            .collect::<Vec<_>>()
3370            .await;
3371
3372        tracing::info!("restarting node");
3373        let node = network
3374            .cfg
3375            .init_node(
3376                1,
3377                ValidatedState::default(),
3378                no_storage::Options,
3379                None::<NullStateCatchup>,
3380                None,
3381                &NoMetrics,
3382                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3383                NullEventConsumer,
3384                MOCK_SEQUENCER_VERSIONS,
3385                Default::default(),
3386            )
3387            .await;
3388        let mut events = node.event_stream().await;
3389
3390        // Wait for a (non-genesis) block proposed by each node, to prove that the lagging node has
3391        // caught up and all nodes are in sync.
3392        let mut proposers = [false; NUM_NODES];
3393        loop {
3394            let event = events.next().await.unwrap();
3395            let EventType::Decide { leaf_chain, .. } = event.event else {
3396                continue;
3397            };
3398            for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
3399                let height = leaf.height();
3400                let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
3401                if height == 0 {
3402                    continue;
3403                }
3404
3405                tracing::info!(
3406                    "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
3407                );
3408                proposers[leaf_builder] = true;
3409            }
3410
3411            if proposers.iter().all(|has_proposed| *has_proposed) {
3412                break;
3413            }
3414        }
3415    }
3416
3417    #[ignore]
3418    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3419    async fn test_catchup_epochs_no_state_peers() {
3420        // Start a sequencer network, using the query service for catchup.
3421        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3422        const EPOCH_HEIGHT: u64 = 5;
3423        let network_config = TestConfigBuilder::default()
3424            .epoch_height(EPOCH_HEIGHT)
3425            .build();
3426        const NUM_NODES: usize = 5;
3427        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3428            .api_config(Options::with_port(port))
3429            .network_config(network_config)
3430            .build();
3431        let mut network = TestNetwork::new(config, Upgrade::trivial(EPOCH_VERSION)).await;
3432
3433        // Wait for replica 0 to decide in the third epoch.
3434        let mut events = network.peers[0].event_stream().await;
3435        loop {
3436            let event = events.next().await.unwrap();
3437            let EventType::Decide { leaf_chain, .. } = event.event else {
3438                continue;
3439            };
3440            tracing::error!("got decide height {}", leaf_chain[0].leaf.height());
3441
3442            if leaf_chain[0].leaf.height() > EPOCH_HEIGHT * 3 {
3443                tracing::error!("decided past one epoch");
3444                break;
3445            }
3446        }
3447
3448        // Shut down and restart replica 0. We don't just stop consensus and restart it; we fully
3449        // drop the node and recreate it so it loses all of its temporary state and starts off from
3450        // genesis. It should be able to catch up by listening to proposals and then rebuild its
3451        // state from its peers.
3452        tracing::info!("shutting down node");
3453        network.peers.remove(0);
3454
3455        // Wait for a few blocks to pass while the node is down, so it falls behind.
3456        network
3457            .server
3458            .event_stream()
3459            .await
3460            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3461            .take(3)
3462            .collect::<Vec<_>>()
3463            .await;
3464
3465        tracing::error!("restarting node");
3466        let node = network
3467            .cfg
3468            .init_node(
3469                1,
3470                ValidatedState::default(),
3471                no_storage::Options,
3472                None::<NullStateCatchup>,
3473                None,
3474                &NoMetrics,
3475                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3476                NullEventConsumer,
3477                MOCK_SEQUENCER_VERSIONS,
3478                Default::default(),
3479            )
3480            .await;
3481        let mut events = node.event_stream().await;
3482
3483        // Wait for a (non-genesis) block proposed by each node, to prove that the lagging node has
3484        // caught up and all nodes are in sync.
3485        let mut proposers = [false; NUM_NODES];
3486        loop {
3487            let event = events.next().await.unwrap();
3488            let EventType::Decide { leaf_chain, .. } = event.event else {
3489                continue;
3490            };
3491            for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
3492                let height = leaf.height();
3493                let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
3494                if height == 0 {
3495                    continue;
3496                }
3497
3498                tracing::info!(
3499                    "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
3500                );
3501                proposers[leaf_builder] = true;
3502            }
3503
3504            if proposers.iter().all(|has_proposed| *has_proposed) {
3505                break;
3506            }
3507        }
3508    }
3509
3510    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3511    async fn test_chain_config_from_instance() {
3512        // This test uses a ValidatedState which only has the default chain config commitment.
3513        // The NodeState has the full chain config.
3514        // Both chain config commitments will match, so the ValidatedState should have the
3515        // full chain config after a non-genesis block is decided.
3516
3517        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3518
3519        let chain_config: ChainConfig = ChainConfig::default();
3520
3521        let state = ValidatedState {
3522            chain_config: chain_config.commit().into(),
3523            ..Default::default()
3524        };
3525
3526        let states = std::array::from_fn(|_| state.clone());
3527
3528        let config = TestNetworkConfigBuilder::default()
3529            .api_config(Options::with_port(port))
3530            .states(states)
3531            .catchups(std::array::from_fn(|_| {
3532                StatePeers::<StaticVersion<0, 1>>::from_urls(
3533                    vec![format!("http://localhost:{port}").parse().unwrap()],
3534                    Default::default(),
3535                    Duration::from_secs(2),
3536                    &NoMetrics,
3537                )
3538            }))
3539            .network_config(TestConfigBuilder::default().build())
3540            .build();
3541
3542        let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3543
3544        // Wait for few blocks to be decided.
3545        network
3546            .server
3547            .event_stream()
3548            .await
3549            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3550            .take(3)
3551            .collect::<Vec<_>>()
3552            .await;
3553
3554        for peer in &network.peers {
3555            let state = peer.consensus().read().await.decided_state().await;
3556
3557            assert_eq!(state.chain_config.resolve().unwrap(), chain_config)
3558        }
3559
3560        network.server.shut_down().await;
3561        drop(network);
3562    }
3563
3564    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3565    async fn test_chain_config_catchup() {
3566        // This test uses a ValidatedState with a non-default chain config
3567        // so it will be different from the NodeState chain config used by the TestNetwork.
3568        // However, for this test to work, at least one node should have a full chain config
3569        // to allow other nodes to catch up.
3570
3571        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3572
3573        let cf = ChainConfig {
3574            max_block_size: 300.into(),
3575            base_fee: 1.into(),
3576            ..Default::default()
3577        };
3578
3579        // State1 contains only the chain config commitment
3580        let state1 = ValidatedState {
3581            chain_config: cf.commit().into(),
3582            ..Default::default()
3583        };
3584
3585        //state 2 contains the full chain config
3586        let state2 = ValidatedState {
3587            chain_config: cf.into(),
3588            ..Default::default()
3589        };
3590
3591        let mut states = std::array::from_fn(|_| state1.clone());
3592        // only one node has the full chain config
3593        // all the other nodes should do a catchup to get the full chain config from peer 0
3594        states[0] = state2;
3595
3596        const NUM_NODES: usize = 5;
3597        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3598            .api_config(Options::from(options::Http {
3599                port,
3600                max_connections: None,
3601            }))
3602            .states(states)
3603            .catchups(std::array::from_fn(|_| {
3604                StatePeers::<StaticVersion<0, 1>>::from_urls(
3605                    vec![format!("http://localhost:{port}").parse().unwrap()],
3606                    Default::default(),
3607                    Duration::from_secs(2),
3608                    &NoMetrics,
3609                )
3610            }))
3611            .network_config(TestConfigBuilder::default().build())
3612            .build();
3613
3614        let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3615
3616        // Wait for a few blocks to be decided.
3617        network
3618            .server
3619            .event_stream()
3620            .await
3621            .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3622            .take(3)
3623            .collect::<Vec<_>>()
3624            .await;
3625
3626        for peer in &network.peers {
3627            let state = peer.consensus().read().await.decided_state().await;
3628
3629            assert_eq!(state.chain_config.resolve().unwrap(), cf)
3630        }
3631
3632        network.server.shut_down().await;
3633        drop(network);
3634    }
3635
3636    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3637    async fn test_pos_upgrade_view_based() {
3638        test_upgrade_helper(Upgrade::new(FEE_VERSION, EPOCH_VERSION)).await;
3639    }
3640
3641    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3642    async fn test_epoch_reward_upgrade() {
3643        test_upgrade_helper(Upgrade::new(
3644            versions::DRB_AND_HEADER_UPGRADE_VERSION,
3645            versions::EPOCH_REWARD_VERSION,
3646        ))
3647        .await;
3648    }
3649
3650    async fn test_upgrade_helper(upgrade: Upgrade) {
3651        // wait this number of views beyond the configured first view
3652        // before asserting anything.
3653        let wait_extra_views = 10;
3654        // Number of nodes running in the test network.
3655        const NUM_NODES: usize = 5;
3656        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3657        let epoch_start_block = if upgrade.base >= versions::EPOCH_VERSION {
3658            0
3659        } else {
3660            321
3661        };
3662
3663        let test_config = TestConfigBuilder::default()
3664            .epoch_height(200)
3665            .epoch_start_block(epoch_start_block)
3666            .set_upgrades(upgrade.target)
3667            .await
3668            .build();
3669
3670        let chain_config_genesis = ValidatedState::default().chain_config.resolve().unwrap();
3671        let chain_config_upgrade = test_config.get_upgrade_map().chain_config(upgrade.target);
3672        assert_ne!(chain_config_genesis, chain_config_upgrade);
3673        tracing::debug!(?chain_config_genesis, ?chain_config_upgrade);
3674
3675        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3676        let persistence: [_; NUM_NODES] = storage
3677            .iter()
3678            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3679            .collect::<Vec<_>>()
3680            .try_into()
3681            .unwrap();
3682
3683        let mut builder = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3684            .api_config(SqlDataSource::options(
3685                &storage[0],
3686                Options::with_port(port),
3687            ))
3688            .persistences(persistence)
3689            .catchups(std::array::from_fn(|_| {
3690                StatePeers::<SequencerApiVersion>::from_urls(
3691                    vec![format!("http://localhost:{port}").parse().unwrap()],
3692                    Default::default(),
3693                    Duration::from_secs(2),
3694                    &NoMetrics,
3695                )
3696            }))
3697            .network_config(test_config);
3698
3699        // When the base version already has epochs, the base chain config must
3700        // include the stake_table_contract
3701        if upgrade.base >= versions::EPOCH_VERSION {
3702            let state = ValidatedState {
3703                chain_config: chain_config_upgrade.into(),
3704                ..Default::default()
3705            };
3706            builder = builder.states(std::array::from_fn(|_| state.clone()));
3707        }
3708
3709        let config = builder.build();
3710
3711        let mut network = TestNetwork::new(config, upgrade).await;
3712        let mut events = network.server.event_stream().await;
3713
3714        let target = upgrade.target;
3715
3716        // First loop to get an `UpgradeProposal`. Note that the
3717        // actual upgrade will take several to many subsequent views for
3718        // voting and finally the actual upgrade.
3719        let upgrade = loop {
3720            let event = events.next().await.unwrap();
3721            match event.event {
3722                EventType::UpgradeProposal { proposal, .. } => {
3723                    tracing::info!(?proposal, "proposal");
3724                    let upgrade = proposal.data.upgrade_proposal;
3725                    let new_version = upgrade.new_version;
3726                    tracing::info!(?new_version, "upgrade proposal new version");
3727                    assert_eq!(new_version, target);
3728                    break upgrade;
3729                },
3730                _ => continue,
3731            }
3732        };
3733
3734        let wanted_view = upgrade.new_version_first_view + wait_extra_views;
3735        // Loop until we get the `new_version_first_view`, then test the upgrade.
3736        loop {
3737            let event = events.next().await.unwrap();
3738            let view_number = event.view_number;
3739
3740            tracing::debug!(?view_number, ?upgrade.new_version_first_view, "upgrade_new_view");
3741            if view_number > wanted_view {
3742                tracing::info!(?view_number, ?upgrade.new_version_first_view, "passed upgrade view");
3743                let states = join_all(
3744                    network
3745                        .peers
3746                        .iter()
3747                        .map(|peer| async { peer.consensus().read().await.decided_state().await }),
3748                )
3749                .await;
3750                let leaves = join_all(
3751                    network
3752                        .peers
3753                        .iter()
3754                        .map(|peer| async { peer.consensus().read().await.decided_leaf().await }),
3755                )
3756                .await;
3757                let configs: Vec<ChainConfig> = states
3758                    .iter()
3759                    .map(|state| state.chain_config.resolve().unwrap())
3760                    .collect();
3761
3762                tracing::info!(?leaves, ?configs, "post upgrade state");
3763                for config in configs {
3764                    assert_eq!(config, chain_config_upgrade);
3765                }
3766                for leaf in leaves {
3767                    assert_eq!(leaf.block_header().version(), target);
3768                }
3769                break;
3770            }
3771            sleep(Duration::from_millis(200)).await;
3772        }
3773
3774        network.server.shut_down().await;
3775    }
3776
3777    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3778    pub(crate) async fn test_restart() {
3779        const NUM_NODES: usize = 5;
3780        // Initialize nodes.
3781        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3782        let persistence: [_; NUM_NODES] = storage
3783            .iter()
3784            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3785            .collect::<Vec<_>>()
3786            .try_into()
3787            .unwrap();
3788        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3789        let config = TestNetworkConfigBuilder::default()
3790            .api_config(SqlDataSource::options(
3791                &storage[0],
3792                Options::with_port(port),
3793            ))
3794            .persistences(persistence.clone())
3795            .network_config(TestConfigBuilder::default().build())
3796            .build();
3797        let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3798
3799        // Connect client.
3800        let client: Client<ServerError, SequencerApiVersion> =
3801            Client::new(format!("http://localhost:{port}").parse().unwrap());
3802        client.connect(None).await;
3803        tracing::info!(port, "server running");
3804
3805        // Wait until some blocks have been decided.
3806        client
3807            .socket("availability/stream/blocks/0")
3808            .subscribe::<BlockQueryData<SeqTypes>>()
3809            .await
3810            .unwrap()
3811            .take(3)
3812            .collect::<Vec<_>>()
3813            .await;
3814
3815        // Shut down the consensus nodes.
3816        tracing::info!("shutting down nodes");
3817        network.stop_consensus().await;
3818
3819        // Get the block height we reached.
3820        let height = client
3821            .get::<usize>("status/block-height")
3822            .send()
3823            .await
3824            .unwrap();
3825        tracing::info!("decided {height} blocks before shutting down");
3826
3827        // Get the decided chain, so we can check consistency after the restart.
3828        let chain: Vec<LeafQueryData<SeqTypes>> = client
3829            .socket("availability/stream/leaves/0")
3830            .subscribe()
3831            .await
3832            .unwrap()
3833            .take(height)
3834            .try_collect()
3835            .await
3836            .unwrap();
3837        let decided_view = chain.last().unwrap().leaf().view_number();
3838
3839        // Get the most recent state, for catchup.
3840
3841        let state = network.server.decided_state().await;
3842        tracing::info!(?decided_view, ?state, "consensus state");
3843
3844        // Fully shut down the API servers.
3845        drop(network);
3846
3847        // Start up again, resuming from the last decided leaf.
3848        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3849
3850        let config = TestNetworkConfigBuilder::default()
3851            .api_config(SqlDataSource::options(
3852                &storage[0],
3853                Options::with_port(port),
3854            ))
3855            .persistences(persistence)
3856            .catchups(std::array::from_fn(|_| {
3857                // Catchup using node 0 as a peer. Node 0 was running the archival state service
3858                // before the restart, so it should be able to resume without catching up by loading
3859                // state from storage.
3860                StatePeers::<StaticVersion<0, 1>>::from_urls(
3861                    vec![format!("http://localhost:{port}").parse().unwrap()],
3862                    Default::default(),
3863                    Duration::from_secs(2),
3864                    &NoMetrics,
3865                )
3866            }))
3867            .network_config(TestConfigBuilder::default().build())
3868            .build();
3869        let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3870        let client: Client<ServerError, StaticVersion<0, 1>> =
3871            Client::new(format!("http://localhost:{port}").parse().unwrap());
3872        client.connect(None).await;
3873        tracing::info!(port, "server running");
3874
3875        // Make sure we can decide new blocks after the restart.
3876        tracing::info!("waiting for decide, height {height}");
3877        let new_leaf: LeafQueryData<SeqTypes> = client
3878            .socket(&format!("availability/stream/leaves/{height}"))
3879            .subscribe()
3880            .await
3881            .unwrap()
3882            .next()
3883            .await
3884            .unwrap()
3885            .unwrap();
3886        assert_eq!(new_leaf.height(), height as u64);
3887        assert_eq!(
3888            new_leaf.leaf().parent_commitment(),
3889            chain[height - 1].hash()
3890        );
3891
3892        // Ensure the new chain is consistent with the old chain.
3893        let new_chain: Vec<LeafQueryData<SeqTypes>> = client
3894            .socket("availability/stream/leaves/0")
3895            .subscribe()
3896            .await
3897            .unwrap()
3898            .take(height)
3899            .try_collect()
3900            .await
3901            .unwrap();
3902        assert_eq!(chain, new_chain);
3903    }
3904
3905    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3906    async fn test_fetch_config() {
3907        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3908        let url: surf_disco::Url = format!("http://localhost:{port}").parse().unwrap();
3909        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url.clone());
3910
3911        let options = Options::with_port(port).config(Default::default());
3912        let network_config = TestConfigBuilder::default().build();
3913        let config = TestNetworkConfigBuilder::default()
3914            .api_config(options)
3915            .network_config(network_config)
3916            .build();
3917        let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3918        client.connect(None).await;
3919
3920        // Fetch a network config from the API server. The first peer URL is bogus, to test the
3921        // failure/retry case.
3922        let peers = StatePeers::<StaticVersion<0, 1>>::from_urls(
3923            vec!["https://notarealnode.network".parse().unwrap(), url],
3924            Default::default(),
3925            Duration::from_secs(2),
3926            &NoMetrics,
3927        );
3928
3929        // Fetch the config from node 1, a different node than the one running the service.
3930        let validator =
3931            ValidatorConfig::generated_from_seed_indexed([0; 32], 1, U256::from(1), false);
3932        let config = peers.fetch_config(validator.clone()).await.unwrap();
3933
3934        // Check the node-specific information in the recovered config is correct.
3935        assert_eq!(config.node_index, 1);
3936
3937        // Check the public information is also correct (with respect to the node that actually
3938        // served the config, for public keys).
3939        pretty_assertions::assert_eq!(
3940            serde_json::to_value(PublicHotShotConfig::from(config.config)).unwrap(),
3941            serde_json::to_value(PublicHotShotConfig::from(
3942                network.cfg.hotshot_config().clone()
3943            ))
3944            .unwrap()
3945        );
3946    }
3947
3948    async fn run_hotshot_event_streaming_test(url_suffix: &str) {
3949        let query_service_port =
3950            reserve_tcp_port().expect("OS should have ephemeral ports available");
3951
3952        let url = format!("http://localhost:{query_service_port}{url_suffix}")
3953            .parse()
3954            .unwrap();
3955
3956        let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
3957
3958        let options = Options::with_port(query_service_port).hotshot_events(HotshotEvents);
3959
3960        let network_config = TestConfigBuilder::default().build();
3961        let config = TestNetworkConfigBuilder::default()
3962            .api_config(options)
3963            .network_config(network_config)
3964            .build();
3965        let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3966
3967        let mut subscribed_events = client
3968            .socket("hotshot-events/events")
3969            .subscribe::<Event<SeqTypes>>()
3970            .await
3971            .unwrap();
3972
3973        let total_count = 5;
3974        // wait for these events to receive on client 1
3975        let mut receive_count = 0;
3976        loop {
3977            let event = subscribed_events.next().await.unwrap();
3978            tracing::info!("Received event in hotshot event streaming Client 1: {event:?}");
3979            receive_count += 1;
3980            if receive_count > total_count {
3981                tracing::info!("Client Received at least desired events, exiting loop");
3982                break;
3983            }
3984        }
3985        assert_eq!(receive_count, total_count + 1);
3986    }
3987
3988    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3989    async fn test_hotshot_event_streaming_v0() {
3990        run_hotshot_event_streaming_test("/v0").await;
3991    }
3992
3993    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3994    async fn test_hotshot_event_streaming_v1() {
3995        run_hotshot_event_streaming_test("/v1").await;
3996    }
3997
3998    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3999    async fn test_hotshot_event_streaming() {
4000        run_hotshot_event_streaming_test("").await;
4001    }
4002
4003    // TODO when `EPOCH_VERSION` becomes base version we can merge this
4004    // w/ above test.
4005    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4006    async fn test_hotshot_event_streaming_epoch_progression() {
4007        let epoch_height = 35;
4008        let wanted_epochs = 4;
4009
4010        let network_config = TestConfigBuilder::default()
4011            .epoch_height(epoch_height)
4012            .build();
4013
4014        let query_service_port =
4015            reserve_tcp_port().expect("OS should have ephemeral ports available");
4016
4017        let hotshot_url = format!("http://localhost:{query_service_port}")
4018            .parse()
4019            .unwrap();
4020
4021        let client: Client<ServerError, SequencerApiVersion> = Client::new(hotshot_url);
4022        let options = Options::with_port(query_service_port).hotshot_events(HotshotEvents);
4023
4024        let config = TestNetworkConfigBuilder::default()
4025            .api_config(options)
4026            .network_config(network_config.clone())
4027            .pos_hook(
4028                DelegationConfig::VariableAmounts,
4029                Default::default(),
4030                POS_V3,
4031            )
4032            .await
4033            .expect("Pos Deployment")
4034            .build();
4035
4036        let _network = TestNetwork::new(config, POS_V3).await;
4037
4038        let mut subscribed_events = client
4039            .socket("hotshot-events/events")
4040            .subscribe::<Event<SeqTypes>>()
4041            .await
4042            .unwrap();
4043
4044        let wanted_views = epoch_height * wanted_epochs;
4045
4046        let mut views = HashSet::new();
4047        let mut epochs = HashSet::new();
4048        for _ in 0..=600 {
4049            let event = subscribed_events.next().await.unwrap();
4050            let event = event.unwrap();
4051            let view_number = event.view_number;
4052            views.insert(view_number.u64());
4053
4054            if let hotshot::types::EventType::Decide { committing_qc, .. } = event.event {
4055                assert!(committing_qc.epoch().is_some(), "epochs are live");
4056                assert!(committing_qc.block_number().is_some());
4057
4058                let epoch = committing_qc.epoch().unwrap().u64();
4059                epochs.insert(epoch);
4060
4061                tracing::debug!(
4062                    "Got decide: epoch: {:?}, block: {:?} ",
4063                    epoch,
4064                    committing_qc.block_number()
4065                );
4066
4067                let expected_epoch =
4068                    epoch_from_block_number(committing_qc.block_number().unwrap(), epoch_height);
4069                tracing::debug!("expected epoch: {expected_epoch}, qc epoch: {epoch}");
4070
4071                assert_eq!(expected_epoch, epoch);
4072            }
4073            if views.contains(&wanted_views) {
4074                tracing::info!("Client Received at least desired views, exiting loop");
4075                break;
4076            }
4077        }
4078
4079        // prevent false positive when we overflow the range
4080        assert!(views.contains(&wanted_views), "Views are not progressing");
4081        assert!(
4082            epochs.contains(&wanted_epochs),
4083            "Epochs are not progressing"
4084        );
4085    }
4086
4087    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4088    async fn test_pos_rewards_basic() -> anyhow::Result<()> {
4089        // Basic PoS rewards test:
4090        // - Sets up a single validator and a single delegator (the node itself).
4091        // - Sets the number of blocks in each epoch to 20.
4092        // - Rewards begin applying from block 41 (i.e., the start of the 3rd epoch).
4093        // - Since the validator is also the delegator, it receives the full reward.
4094        // - Verifies that the reward at block height 60 matches the expected amount.
4095        let epoch_height = 20;
4096
4097        let network_config = TestConfigBuilder::default()
4098            .epoch_height(epoch_height)
4099            .build();
4100
4101        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4102
4103        const NUM_NODES: usize = 1;
4104        // Initialize nodes.
4105        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4106        let persistence: [_; NUM_NODES] = storage
4107            .iter()
4108            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4109            .collect::<Vec<_>>()
4110            .try_into()
4111            .unwrap();
4112
4113        let config = TestNetworkConfigBuilder::with_num_nodes()
4114            .api_config(SqlDataSource::options(
4115                &storage[0],
4116                Options::with_port(api_port),
4117            ))
4118            .network_config(network_config.clone())
4119            .persistences(persistence.clone())
4120            .catchups(std::array::from_fn(|_| {
4121                StatePeers::<StaticVersion<0, 1>>::from_urls(
4122                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4123                    Default::default(),
4124                    Duration::from_secs(2),
4125                    &NoMetrics,
4126                )
4127            }))
4128            .pos_hook(
4129                DelegationConfig::VariableAmounts,
4130                Default::default(),
4131                POS_V4,
4132            )
4133            .await
4134            .unwrap()
4135            .build();
4136
4137        let network = TestNetwork::new(config, POS_V4).await;
4138        let client: Client<ServerError, SequencerApiVersion> =
4139            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4140
4141        // first two epochs will be 1 and 2
4142        // rewards are distributed starting third epoch
4143        // third epoch starts from block 40 as epoch height is 20
4144        // wait for atleast 65 blocks
4145        let _blocks = client
4146            .socket("availability/stream/blocks/0")
4147            .subscribe::<BlockQueryData<SeqTypes>>()
4148            .await
4149            .unwrap()
4150            .take(65)
4151            .try_collect::<Vec<_>>()
4152            .await
4153            .unwrap();
4154
4155        let staking_priv_keys = network_config.staking_priv_keys();
4156        let account = staking_priv_keys[0].0.clone();
4157        let address = account.address();
4158
4159        let block_height = 60;
4160
4161        let node_state = network.server.node_state();
4162        let membership = node_state.coordinator.membership().read().await;
4163        let expected_amount = U256::from(20)
4164            * (membership
4165                .epoch_block_reward(3.into())
4166                .expect("block reward is not None"))
4167            .0;
4168        drop(membership);
4169
4170        // get the validator address balance at block height 60
4171        let amount = client
4172            .get::<Option<RewardAmount>>(&format!(
4173                "reward-state/reward-balance/{block_height}/{address}"
4174            ))
4175            .send()
4176            .await
4177            .unwrap()
4178            .unwrap();
4179
4180        tracing::info!("amount={amount:?}");
4181
4182        assert_eq!(amount.0, expected_amount, "reward amount don't match");
4183
4184        Ok(())
4185    }
4186
4187    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4188    async fn test_cumulative_pos_rewards() -> anyhow::Result<()> {
4189        // This test registers 5 validators and multiple delegators for each validator.
4190        // One of the delegators is also a validator.
4191        // The test verifies that the cumulative reward at each block height equals
4192        // the total block reward, which is a constant.
4193
4194        let epoch_height = 20;
4195
4196        let network_config = TestConfigBuilder::default()
4197            .epoch_height(epoch_height)
4198            .build();
4199
4200        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4201
4202        const NUM_NODES: usize = 5;
4203        // Initialize nodes.
4204        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4205        let persistence: [_; NUM_NODES] = storage
4206            .iter()
4207            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4208            .collect::<Vec<_>>()
4209            .try_into()
4210            .unwrap();
4211
4212        let config = TestNetworkConfigBuilder::with_num_nodes()
4213            .api_config(SqlDataSource::options(
4214                &storage[0],
4215                Options::with_port(api_port),
4216            ))
4217            .network_config(network_config)
4218            .persistences(persistence.clone())
4219            .catchups(std::array::from_fn(|_| {
4220                StatePeers::<StaticVersion<0, 1>>::from_urls(
4221                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4222                    Default::default(),
4223                    Duration::from_secs(2),
4224                    &NoMetrics,
4225                )
4226            }))
4227            .pos_hook(
4228                DelegationConfig::MultipleDelegators,
4229                Default::default(),
4230                POS_V4,
4231            )
4232            .await
4233            .unwrap()
4234            .build();
4235
4236        let network = TestNetwork::new(config, POS_V4).await;
4237        let node_state = network.server.node_state();
4238        let client: Client<ServerError, SequencerApiVersion> =
4239            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4240
4241        // wait for atleast 75 blocks
4242        let _blocks = client
4243            .socket("availability/stream/blocks/0")
4244            .subscribe::<BlockQueryData<SeqTypes>>()
4245            .await
4246            .unwrap()
4247            .take(75)
4248            .try_collect::<Vec<_>>()
4249            .await
4250            .unwrap();
4251
4252        // We are going to check cumulative blocks from block height 40 to 67
4253        // Basically epoch 3 and epoch 4 as epoch height is 20
4254        // get all the validators
4255        let validators = client
4256            .get::<AuthenticatedValidatorMap>("node/validators/3")
4257            .send()
4258            .await
4259            .expect("failed to get validator");
4260
4261        // insert all the address in a map
4262        // We will query the reward-balance at each block height for all the addresses
4263        // We don't know which validator was the leader because we don't have access to Membership
4264        let mut addresses = HashSet::new();
4265        for v in validators.values() {
4266            addresses.insert(v.account);
4267            addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
4268        }
4269        // get all the validators
4270        let validators = client
4271            .get::<AuthenticatedValidatorMap>("node/validators/4")
4272            .send()
4273            .await
4274            .expect("failed to get validator");
4275        for v in validators.values() {
4276            addresses.insert(v.account);
4277            addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
4278        }
4279
4280        let mut prev_cumulative_amount = U256::ZERO;
4281        // Check Cumulative rewards for epochs 3 (= block height 41 to 59) & 4 (= block height 60 to 67)
4282        for block in 41..=67 {
4283            let membership = node_state.coordinator.membership().read().await;
4284            let block_reward = membership
4285                .epoch_block_reward(epoch_from_block_number(block, epoch_height).into())
4286                .expect("block reward is not None");
4287            drop(membership);
4288
4289            let mut cumulative_amount = U256::ZERO;
4290            for address in addresses.clone() {
4291                let amount = client
4292                    .get::<Option<RewardAmount>>(&format!(
4293                        "reward-state/reward-balance/{block}/{address}"
4294                    ))
4295                    .send()
4296                    .await
4297                    .ok()
4298                    .flatten();
4299
4300                if let Some(amount) = amount {
4301                    tracing::info!("address={address}, amount={amount}");
4302                    cumulative_amount += amount.0;
4303                };
4304            }
4305
4306            // assert cumulative reward is equal to block reward
4307            assert_eq!(cumulative_amount - prev_cumulative_amount, block_reward.0);
4308            tracing::info!("cumulative_amount is correct for block={block}");
4309            prev_cumulative_amount = cumulative_amount;
4310        }
4311
4312        Ok(())
4313    }
4314
4315    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4316    async fn test_stake_table_duplicate_events_from_contract() -> anyhow::Result<()> {
4317        // TODO(abdul): This test currently uses TestNetwork only for contract deployment and for L1 block number.
4318        // Once the stake table deployment logic is refactored and isolated, TestNetwork here will be unnecessary
4319
4320        let epoch_height = 20;
4321
4322        let network_config = TestConfigBuilder::default()
4323            .epoch_height(epoch_height)
4324            .build();
4325
4326        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4327
4328        const NUM_NODES: usize = 5;
4329        // Initialize nodes.
4330        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4331        let persistence: [_; NUM_NODES] = storage
4332            .iter()
4333            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4334            .collect::<Vec<_>>()
4335            .try_into()
4336            .unwrap();
4337
4338        let l1_url = network_config.l1_url();
4339        let config = TestNetworkConfigBuilder::with_num_nodes()
4340            .api_config(SqlDataSource::options(
4341                &storage[0],
4342                Options::with_port(api_port),
4343            ))
4344            .network_config(network_config)
4345            .persistences(persistence.clone())
4346            .catchups(std::array::from_fn(|_| {
4347                StatePeers::<StaticVersion<0, 1>>::from_urls(
4348                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4349                    Default::default(),
4350                    Duration::from_secs(2),
4351                    &NoMetrics,
4352                )
4353            }))
4354            .pos_hook(
4355                DelegationConfig::MultipleDelegators,
4356                Default::default(),
4357                POS_V3,
4358            )
4359            .await
4360            .unwrap()
4361            .build();
4362
4363        let network = TestNetwork::new(config, POS_V3).await;
4364
4365        let mut prev_st = None;
4366        let state = network.server.decided_state().await;
4367        let chain_config = state.chain_config.resolve().expect("resolve chain config");
4368        let stake_table = chain_config.stake_table_contract.unwrap();
4369
4370        let l1_client = L1ClientOptions::default()
4371            .connect(vec![l1_url])
4372            .expect("failed to connect to l1");
4373
4374        let client: Client<ServerError, SequencerApiVersion> =
4375            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4376
4377        let mut headers = client
4378            .socket("availability/stream/headers/0")
4379            .subscribe::<Header>()
4380            .await
4381            .unwrap();
4382
4383        let mut target_bh = 0;
4384        while let Some(header) = headers.next().await {
4385            let header = header.unwrap();
4386            println!("got header with height {}", header.height());
4387            if header.height() == 0 {
4388                continue;
4389            }
4390            let l1_block = header.l1_finalized().expect("l1 block not found");
4391
4392            let sorted_events = Fetcher::fetch_events_from_contract(
4393                l1_client.clone(),
4394                stake_table,
4395                None,
4396                l1_block.number(),
4397            )
4398            .await?;
4399
4400            let mut sorted_dedup_removed = sorted_events.clone();
4401            sorted_dedup_removed.dedup();
4402
4403            assert_eq!(
4404                sorted_events.len(),
4405                sorted_dedup_removed.len(),
4406                "duplicates found"
4407            );
4408
4409            // This also checks if there is a duplicate registration
4410            let stake_table =
4411                validators_from_l1_events(sorted_events.into_iter().map(|(_, e)| e)).unwrap();
4412            if let Some(prev_st) = prev_st {
4413                assert_eq!(stake_table, prev_st);
4414            }
4415
4416            prev_st = Some(stake_table);
4417
4418            if target_bh == 100 {
4419                break;
4420            }
4421
4422            target_bh = header.height();
4423        }
4424
4425        Ok(())
4426    }
4427
4428    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4429    async fn test_rewards_v4() -> anyhow::Result<()> {
4430        // This test verifies PoS reward distribution logic for multiple delegators per validator.
4431        //
4432        //  assertions:
4433        // - No rewards are distributed during the first 2 epochs.
4434        // - Rewards begin from epoch 3 onward.
4435        // - Delegator stake sums match the corresponding validator stake.
4436        // - Reward values match those returned by the reward state API.
4437        // - Commission calculations are within a small acceptable rounding tolerance.
4438        // - Ensure that the `total_reward_distributed` field in the block header matches the total block reward distributed
4439        const EPOCH_HEIGHT: u64 = 20;
4440
4441        let network_config = TestConfigBuilder::default()
4442            .epoch_height(EPOCH_HEIGHT)
4443            .build();
4444
4445        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4446
4447        const NUM_NODES: usize = 5;
4448
4449        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4450        let persistence: [_; NUM_NODES] = storage
4451            .iter()
4452            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4453            .collect::<Vec<_>>()
4454            .try_into()
4455            .unwrap();
4456
4457        let config = TestNetworkConfigBuilder::with_num_nodes()
4458            .api_config(SqlDataSource::options(
4459                &storage[0],
4460                Options::with_port(api_port),
4461            ))
4462            .network_config(network_config)
4463            .persistences(persistence.clone())
4464            .catchups(std::array::from_fn(|_| {
4465                StatePeers::<StaticVersion<0, 1>>::from_urls(
4466                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4467                    Default::default(),
4468                    Duration::from_secs(2),
4469                    &NoMetrics,
4470                )
4471            }))
4472            .pos_hook(
4473                DelegationConfig::MultipleDelegators,
4474                Default::default(),
4475                POS_V4,
4476            )
4477            .await
4478            .unwrap()
4479            .build();
4480
4481        let network = TestNetwork::new(config, POS_V4).await;
4482        let client: Client<ServerError, SequencerApiVersion> =
4483            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4484
4485        // Wait for the chain to progress beyond epoch 3 so rewards start being distributed.
4486        let mut events = network.peers[0].event_stream().await;
4487        while let Some(event) = events.next().await {
4488            if let EventType::Decide { leaf_chain, .. } = event.event {
4489                let height = leaf_chain[0].leaf.height();
4490                tracing::info!("Node 0 decided at height: {height}");
4491                if height > EPOCH_HEIGHT * 3 {
4492                    break;
4493                }
4494            }
4495        }
4496
4497        // Verify that there are no validators for epoch # 1 and epoch # 2
4498        {
4499            client
4500                .get::<AuthenticatedValidatorMap>("node/validators/1")
4501                .send()
4502                .await
4503                .unwrap()
4504                .is_empty();
4505
4506            client
4507                .get::<AuthenticatedValidatorMap>("node/validators/2")
4508                .send()
4509                .await
4510                .unwrap()
4511                .is_empty();
4512        }
4513
4514        // Get the epoch # 3 validators
4515        let validators = client
4516            .get::<AuthenticatedValidatorMap>("node/validators/3")
4517            .send()
4518            .await
4519            .expect("validators");
4520
4521        assert!(!validators.is_empty());
4522
4523        // Collect addresses to track rewards for all participants.
4524        let mut addresses = HashSet::new();
4525        for v in validators.values() {
4526            addresses.insert(v.account);
4527            addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
4528        }
4529
4530        let mut leaves = client
4531            .socket("availability/stream/leaves/0")
4532            .subscribe::<LeafQueryData<SeqTypes>>()
4533            .await
4534            .unwrap();
4535
4536        let node_state = network.server.node_state();
4537        let coordinator = node_state.coordinator;
4538
4539        let membership = coordinator.membership().read().await;
4540
4541        // Ensure rewards remain zero up for the first two epochs
4542        while let Some(leaf) = leaves.next().await {
4543            let leaf = leaf.unwrap();
4544            let header = leaf.header();
4545            assert_eq!(header.total_reward_distributed().unwrap().0, U256::ZERO);
4546
4547            let epoch_number =
4548                EpochNumber::new(epoch_from_block_number(leaf.height(), EPOCH_HEIGHT));
4549
4550            assert!(membership.epoch_block_reward(epoch_number).is_none());
4551
4552            let height = header.height();
4553            for address in addresses.clone() {
4554                let amount = client
4555                    .get::<Option<RewardAmount>>(&format!(
4556                        "reward-state-v2/reward-balance/{height}/{address}"
4557                    ))
4558                    .send()
4559                    .await
4560                    .ok()
4561                    .flatten();
4562                assert!(amount.is_none(), "amount is not none for block {height}")
4563            }
4564
4565            if leaf.height() == EPOCH_HEIGHT * 2 {
4566                break;
4567            }
4568        }
4569
4570        drop(membership);
4571
4572        let mut rewards_map = HashMap::new();
4573        let mut total_distributed = U256::ZERO;
4574        let mut epoch_rewards = HashMap::<EpochNumber, U256>::new();
4575
4576        while let Some(leaf) = leaves.next().await {
4577            let leaf = leaf.unwrap();
4578
4579            let header = leaf.header();
4580            let distributed = header
4581                .total_reward_distributed()
4582                .expect("rewards distributed is none");
4583
4584            let block = leaf.height();
4585            tracing::info!("verify rewards for block={block:?}");
4586            let membership = coordinator.membership().read().await;
4587            let epoch_number =
4588                EpochNumber::new(epoch_from_block_number(leaf.height(), EPOCH_HEIGHT));
4589
4590            let block_reward = membership.epoch_block_reward(epoch_number).unwrap();
4591            let leader = membership
4592                .leader(leaf.leaf().view_number(), Some(epoch_number))
4593                .expect("leader");
4594            let leader_eth_address = membership.address(&epoch_number, leader).expect("address");
4595
4596            drop(membership);
4597
4598            let validators = client
4599                .get::<AuthenticatedValidatorMap>(&format!("node/validators/{epoch_number}"))
4600                .send()
4601                .await
4602                .expect("validators");
4603
4604            let leader_validator = validators
4605                .get(&leader_eth_address)
4606                .expect("leader not found");
4607
4608            let distributor =
4609                RewardDistributor::new(leader_validator.clone(), block_reward, distributed);
4610            // Verify that the sum of delegator stakes equals the validator's total stake.
4611            for validator in validators.values() {
4612                let delegator_stake_sum: U256 = validator.delegators.values().cloned().sum();
4613
4614                assert_eq!(delegator_stake_sum, validator.stake);
4615            }
4616
4617            let computed_rewards = distributor.compute_rewards().expect("reward computation");
4618
4619            // Validate that the leader's commission is within a 10 wei tolerance of the expected value.
4620            let total_reward = block_reward.0;
4621            let leader_commission_basis_points = U256::from(leader_validator.commission);
4622            let calculated_leader_commission_reward = leader_commission_basis_points
4623                .checked_mul(total_reward)
4624                .context("overflow")?
4625                .checked_div(U256::from(COMMISSION_BASIS_POINTS))
4626                .context("overflow")?;
4627
4628            assert!(
4629                computed_rewards.leader_commission().0 - calculated_leader_commission_reward
4630                    <= U256::from(10_u64)
4631            );
4632
4633            // Aggregate rewards by address (both delegator and leader).
4634            let leader_commission = *computed_rewards.leader_commission();
4635            for (address, amount) in computed_rewards.delegators().clone() {
4636                rewards_map
4637                    .entry(address)
4638                    .and_modify(|entry| *entry += amount)
4639                    .or_insert(amount);
4640            }
4641
4642            // add leader commission reward
4643            rewards_map
4644                .entry(leader_eth_address)
4645                .and_modify(|entry| *entry += leader_commission)
4646                .or_insert(leader_commission);
4647
4648            // assert that the reward matches to what is in the reward merkle tree
4649            for (address, calculated_amount) in rewards_map.iter() {
4650                let mut attempt = 0;
4651                let amount_from_api = loop {
4652                    let result = client
4653                        .get::<Option<RewardAmount>>(&format!(
4654                            "reward-state-v2/reward-balance/{block}/{address}"
4655                        ))
4656                        .send()
4657                        .await
4658                        .ok()
4659                        .flatten();
4660
4661                    if let Some(amount) = result {
4662                        break amount;
4663                    }
4664
4665                    attempt += 1;
4666                    if attempt >= 3 {
4667                        panic!(
4668                            "Failed to fetch reward amount for address {address} after 3 retries"
4669                        );
4670                    }
4671
4672                    sleep(Duration::from_secs(2)).await;
4673                };
4674
4675                assert_eq!(amount_from_api, *calculated_amount);
4676            }
4677
4678            // Confirm the header's total distributed field matches the cumulative expected amount.
4679            total_distributed += block_reward.0;
4680            assert_eq!(
4681                header.total_reward_distributed().unwrap().0,
4682                total_distributed
4683            );
4684
4685            // Block reward shouldn't change for the same epoch
4686            epoch_rewards
4687                .entry(epoch_number)
4688                .and_modify(|r| assert_eq!(*r, block_reward.0))
4689                .or_insert(block_reward.0);
4690
4691            // Stop the test after verifying 5 full epochs.
4692            if leaf.height() == EPOCH_HEIGHT * 5 {
4693                break;
4694            }
4695        }
4696
4697        Ok(())
4698    }
4699
4700    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4701    async fn test_epoch_reward_distribution_basic() -> anyhow::Result<()> {
4702        const EPOCH_HEIGHT: u64 = 10;
4703        const NUM_NODES: usize = 5;
4704
4705        const V6: Upgrade = Upgrade::trivial(version(0, 6));
4706
4707        let network_config = TestConfigBuilder::default()
4708            .epoch_height(EPOCH_HEIGHT)
4709            .build();
4710
4711        let api_port = reserve_tcp_port().expect("No ports free for query service");
4712
4713        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4714        let persistence: [_; NUM_NODES] = storage
4715            .iter()
4716            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4717            .collect::<Vec<_>>()
4718            .try_into()
4719            .unwrap();
4720
4721        let config = TestNetworkConfigBuilder::with_num_nodes()
4722            .api_config(SqlDataSource::options(
4723                &storage[0],
4724                Options::with_port(api_port),
4725            ))
4726            .network_config(network_config)
4727            .persistences(persistence.clone())
4728            .catchups(std::array::from_fn(|_| {
4729                StatePeers::<StaticVersion<0, 1>>::from_urls(
4730                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4731                    Default::default(),
4732                    Duration::from_secs(2),
4733                    &NoMetrics,
4734                )
4735            }))
4736            .pos_hook(DelegationConfig::MultipleDelegators, Default::default(), V6)
4737            .await
4738            .unwrap()
4739            .build();
4740
4741        let _network = TestNetwork::new(config, V6).await;
4742        let client: Client<ServerError, SequencerApiVersion> =
4743            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4744
4745        // Wait for chain to reach epoch 5
4746        let height_client: Client<ServerError, StaticVersion<0, 1>> =
4747            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4748        wait_until_block_height(&height_client, "node/block-height", EPOCH_HEIGHT * 5).await;
4749
4750        let mut leaves = client
4751            .socket("availability/stream/leaves/0")
4752            .subscribe::<LeafQueryData<SeqTypes>>()
4753            .await
4754            .unwrap();
4755
4756        // Epochs 1-3: verify no rewards
4757        while let Some(leaf) = leaves.next().await {
4758            let leaf = leaf.unwrap();
4759            let header = leaf.header();
4760            let height = header.height();
4761
4762            let total_distributed = header.total_reward_distributed().unwrap();
4763            assert_eq!(
4764                total_distributed.0,
4765                U256::ZERO,
4766                "epochs 1-3 should have no rewards, height={height}"
4767            );
4768
4769            if height == EPOCH_HEIGHT * 3 {
4770                break;
4771            }
4772        }
4773
4774        while let Some(leaf) = leaves.next().await {
4775            let leaf = leaf.unwrap();
4776            let header = leaf.header();
4777            let height = header.height();
4778
4779            if height == EPOCH_HEIGHT * 4 {
4780                let total_distributed = header.total_reward_distributed().unwrap();
4781                assert!(total_distributed.0 > U256::ZERO,);
4782                break;
4783            }
4784        }
4785
4786        while let Some(leaf) = leaves.next().await {
4787            let leaf = leaf.unwrap();
4788            let header = leaf.header();
4789            let height = header.height();
4790
4791            if height == EPOCH_HEIGHT * 5 {
4792                let total_distributed = header.total_reward_distributed().unwrap();
4793                assert!(total_distributed.0 > U256::ZERO,);
4794                break;
4795            }
4796        }
4797
4798        Ok(())
4799    }
4800
4801    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4802    async fn test_epoch_reward_total_distributed_rewards() -> anyhow::Result<()> {
4803        // Epochs 1-3: No rewards distributed (total_reward_distributed = 0)
4804        // Epoch 4: Rewards only distributed in the LAST block
4805        // Epoch 5: All blocks before last have same total as epoch 4 last block,
4806        //          last block has higher total because of new distribution
4807        const EPOCH_HEIGHT: u64 = 10;
4808        const NUM_NODES: usize = 5;
4809
4810        const V6: Upgrade = Upgrade::trivial(version(0, 6));
4811
4812        let network_config = TestConfigBuilder::default()
4813            .epoch_height(EPOCH_HEIGHT)
4814            .build();
4815
4816        let api_port = reserve_tcp_port().expect("No ports free for query service");
4817
4818        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4819        let persistence: [_; NUM_NODES] = storage
4820            .iter()
4821            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4822            .collect::<Vec<_>>()
4823            .try_into()
4824            .unwrap();
4825
4826        let config = TestNetworkConfigBuilder::with_num_nodes()
4827            .api_config(SqlDataSource::options(
4828                &storage[0],
4829                Options::with_port(api_port),
4830            ))
4831            .network_config(network_config)
4832            .persistences(persistence.clone())
4833            .catchups(std::array::from_fn(|_| {
4834                StatePeers::<StaticVersion<0, 1>>::from_urls(
4835                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4836                    Default::default(),
4837                    Duration::from_secs(2),
4838                    &NoMetrics,
4839                )
4840            }))
4841            .pos_hook(DelegationConfig::MultipleDelegators, Default::default(), V6)
4842            .await
4843            .unwrap()
4844            .build();
4845
4846        let _network = TestNetwork::new(config, V6).await;
4847        let client: Client<ServerError, SequencerApiVersion> =
4848            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4849
4850        let height_client: Client<ServerError, StaticVersion<0, 1>> =
4851            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4852        wait_until_block_height(&height_client, "node/block-height", EPOCH_HEIGHT * 5).await;
4853
4854        let mut leaves = client
4855            .socket("availability/stream/leaves/0")
4856            .subscribe::<LeafQueryData<SeqTypes>>()
4857            .await
4858            .unwrap();
4859
4860        while let Some(leaf) = leaves.next().await {
4861            let leaf = leaf.unwrap();
4862            let header = leaf.header();
4863            let height = header.height();
4864
4865            let total_distributed = header.total_reward_distributed().unwrap();
4866            assert_eq!(total_distributed.0, U256::ZERO,);
4867
4868            if height == EPOCH_HEIGHT * 3 {
4869                break;
4870            }
4871        }
4872
4873        while let Some(leaf) = leaves.next().await {
4874            let leaf = leaf.unwrap();
4875            let header = leaf.header();
4876            let height = header.height();
4877
4878            let total_distributed = header.total_reward_distributed().unwrap();
4879
4880            if height < EPOCH_HEIGHT * 4 {
4881                assert_eq!(total_distributed.0, U256::ZERO,);
4882            } else {
4883                assert!(total_distributed.0 > U256::ZERO,);
4884                break;
4885            }
4886        }
4887
4888        let epoch4_last_reward = {
4889            let header = client
4890                .get::<Header>(&format!("availability/header/{}", EPOCH_HEIGHT * 4))
4891                .send()
4892                .await
4893                .unwrap();
4894            header.total_reward_distributed().unwrap()
4895        };
4896
4897        assert!(
4898            epoch4_last_reward.0 > U256::ZERO,
4899            "epoch 4 last block should have positive rewards"
4900        );
4901
4902        while let Some(leaf) = leaves.next().await {
4903            let leaf = leaf.unwrap();
4904            let header = leaf.header();
4905            let height = header.height();
4906
4907            let total_distributed = header.total_reward_distributed().unwrap();
4908
4909            if height < EPOCH_HEIGHT * 5 {
4910                assert_eq!(total_distributed, epoch4_last_reward,);
4911            } else {
4912                assert!(total_distributed.0 > epoch4_last_reward.0,);
4913                break;
4914            }
4915        }
4916
4917        Ok(())
4918    }
4919
4920    // test actual rewards
4921    // todo: test each account rewards by querying merklized state api
4922    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4923    async fn test_reward_state_v2_epoch_distribution() -> anyhow::Result<()> {
4924        const EPOCH_HEIGHT: u64 = 10;
4925        const NUM_NODES: usize = 5;
4926        const NUM_EPOCHS: u64 = 6;
4927        const V6: Upgrade = Upgrade::trivial(version(0, 6));
4928
4929        let network_config = TestConfigBuilder::default()
4930            .epoch_height(EPOCH_HEIGHT)
4931            .build();
4932
4933        let api_port = reserve_tcp_port().expect("No ports free for query service");
4934
4935        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4936        let persistence: [_; NUM_NODES] = storage
4937            .iter()
4938            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4939            .collect::<Vec<_>>()
4940            .try_into()
4941            .unwrap();
4942
4943        let config = TestNetworkConfigBuilder::with_num_nodes()
4944            .api_config(SqlDataSource::options(
4945                &storage[0],
4946                Options::with_port(api_port),
4947            ))
4948            .network_config(network_config)
4949            .persistences(persistence.clone())
4950            .catchups(std::array::from_fn(|_| {
4951                StatePeers::<StaticVersion<0, 1>>::from_urls(
4952                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
4953                    Default::default(),
4954                    Duration::from_secs(2),
4955                    &NoMetrics,
4956                )
4957            }))
4958            .pos_hook(DelegationConfig::MultipleDelegators, Default::default(), V6)
4959            .await
4960            .unwrap()
4961            .build();
4962
4963        let network = TestNetwork::new(config, V6).await;
4964        let client: Client<ServerError, SequencerApiVersion> =
4965            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4966
4967        let node_state = network.server.node_state();
4968        let coordinator = node_state.coordinator;
4969
4970        let mut expected_total_distributed = U256::ZERO;
4971
4972        let mut leaves = client
4973            .socket("availability/stream/leaves/0")
4974            .subscribe::<LeafQueryData<SeqTypes>>()
4975            .await
4976            .unwrap();
4977
4978        while let Some(leaf) = leaves.next().await {
4979            let leaf = leaf.unwrap();
4980            let header = leaf.header();
4981            let height = header.height();
4982
4983            let epoch = epoch_from_block_number(height, EPOCH_HEIGHT);
4984
4985            let is_epoch_last_block = height % EPOCH_HEIGHT == 0;
4986
4987            if epoch <= 3 {
4988                continue;
4989            }
4990
4991            let header_total_distributed = header
4992                .total_reward_distributed()
4993                .expect("total_reward_distributed should exist");
4994
4995            if is_epoch_last_block {
4996                let prev_epoch = epoch - 1;
4997                let prev_epoch_number = EpochNumber::new(prev_epoch);
4998                let membership = coordinator.membership().read().await;
4999                let prev_block_reward = membership
5000                    .epoch_block_reward(prev_epoch_number)
5001                    .expect("epoch block reward should exist");
5002                drop(membership);
5003
5004                let epoch_total = prev_block_reward.0 * U256::from(EPOCH_HEIGHT);
5005                expected_total_distributed += epoch_total;
5006            }
5007
5008            assert_eq!(
5009                header_total_distributed.0, expected_total_distributed,
5010                "total_reward_distributed mismatch at height {height}"
5011            );
5012
5013            if height >= NUM_EPOCHS * EPOCH_HEIGHT {
5014                break;
5015            }
5016        }
5017
5018        Ok(())
5019    }
5020
5021    /// Verifies that the `leader_counts` array in V6 headers is correct.
5022    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5023    async fn test_epoch_leader_counts() -> anyhow::Result<()> {
5024        const EPOCH_HEIGHT: u64 = 10;
5025        const NUM_NODES: usize = 5;
5026        const NUM_EPOCHS: u64 = 6;
5027        const V6: Upgrade = Upgrade::trivial(version(0, 6));
5028
5029        let network_config = TestConfigBuilder::default()
5030            .epoch_height(EPOCH_HEIGHT)
5031            .build();
5032
5033        let api_port = reserve_tcp_port().expect("No ports free for query service");
5034
5035        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5036        let persistence: [_; NUM_NODES] = storage
5037            .iter()
5038            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5039            .collect::<Vec<_>>()
5040            .try_into()
5041            .unwrap();
5042
5043        let config = TestNetworkConfigBuilder::with_num_nodes()
5044            .api_config(SqlDataSource::options(
5045                &storage[0],
5046                Options::with_port(api_port),
5047            ))
5048            .network_config(network_config)
5049            .persistences(persistence.clone())
5050            .catchups(std::array::from_fn(|_| {
5051                StatePeers::<StaticVersion<0, 1>>::from_urls(
5052                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
5053                    Default::default(),
5054                    Duration::from_secs(2),
5055                    &NoMetrics,
5056                )
5057            }))
5058            .pos_hook(DelegationConfig::MultipleDelegators, Default::default(), V6)
5059            .await
5060            .unwrap()
5061            .build();
5062
5063        let network = TestNetwork::new(config, V6).await;
5064        let client: Client<ServerError, SequencerApiVersion> =
5065            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5066
5067        let node_state = network.server.node_state();
5068        let coordinator = node_state.coordinator;
5069
5070        // Track expected leader counts by address
5071        let mut expected_counts: HashMap<Address, u16> = HashMap::new();
5072
5073        let mut leaves = client
5074            .socket("availability/stream/leaves/0")
5075            .subscribe::<LeafQueryData<SeqTypes>>()
5076            .await
5077            .unwrap();
5078
5079        while let Some(leaf) = leaves.next().await {
5080            let leaf = leaf.unwrap();
5081            let header = leaf.header();
5082            let height = header.height();
5083            let epoch = epoch_from_block_number(height, EPOCH_HEIGHT);
5084            let epoch_number = EpochNumber::new(epoch);
5085
5086            if epoch <= 2 {
5087                continue;
5088            }
5089
5090            let header_leader_counts = header
5091                .leader_counts()
5092                .expect("V6 header must have leader_counts");
5093
5094            // Reset counts at the start of a new epoch
5095            let is_epoch_start = (height - 1) % EPOCH_HEIGHT == 0;
5096            if is_epoch_start {
5097                expected_counts.clear();
5098            }
5099
5100            // Determine the leader for this block and track by address
5101            let view_number = leaf.leaf().view_number();
5102            let membership = coordinator.membership().read().await;
5103            let leader = membership
5104                .leader(view_number, Some(epoch_number))
5105                .expect("leader should exist");
5106            let leader_address = membership
5107                .address(&epoch_number, leader)
5108                .expect("leader should have an address");
5109
5110            let validator_leader_counts =
5111                ValidatorLeaderCounts::new(&membership, &epoch_number, *header_leader_counts)
5112                    .expect("ValidatorLeaderCounts should build from header leader_counts");
5113            drop(membership);
5114
5115            *expected_counts.entry(leader_address).or_insert(0) += 1;
5116
5117            let header_counts: HashMap<Address, u16> = validator_leader_counts
5118                .active_leaders()
5119                .map(|(v, count)| (v.account, count))
5120                .collect();
5121
5122            assert_eq!(
5123                header_counts, expected_counts,
5124                "leader_counts mismatch at height {height} (epoch {epoch})"
5125            );
5126
5127            if height % EPOCH_HEIGHT == 0 {
5128                let total: u16 = expected_counts.values().sum();
5129                assert_eq!(
5130                    total, EPOCH_HEIGHT as u16,
5131                    "total leader_counts at epoch boundary should equal EPOCH_HEIGHT at height \
5132                     {height}"
5133                );
5134            }
5135
5136            if height >= NUM_EPOCHS * EPOCH_HEIGHT {
5137                break;
5138            }
5139        }
5140
5141        Ok(())
5142    }
5143
5144    #[rstest]
5145    #[case(POS_V3)]
5146    #[case(POS_V4)]
5147    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5148    async fn test_node_stake_table_api(#[case] upgrade: Upgrade) {
5149        let epoch_height = 20;
5150
5151        let network_config = TestConfigBuilder::default()
5152            .epoch_height(epoch_height)
5153            .build();
5154
5155        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5156
5157        const NUM_NODES: usize = 2;
5158        // Initialize nodes.
5159        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5160        let persistence: [_; NUM_NODES] = storage
5161            .iter()
5162            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5163            .collect::<Vec<_>>()
5164            .try_into()
5165            .unwrap();
5166
5167        let config = TestNetworkConfigBuilder::with_num_nodes()
5168            .api_config(SqlDataSource::options(
5169                &storage[0],
5170                Options::with_port(api_port),
5171            ))
5172            .network_config(network_config)
5173            .persistences(persistence.clone())
5174            .catchups(std::array::from_fn(|_| {
5175                StatePeers::<StaticVersion<0, 1>>::from_urls(
5176                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
5177                    Default::default(),
5178                    Duration::from_secs(2),
5179                    &NoMetrics,
5180                )
5181            }))
5182            .pos_hook(
5183                DelegationConfig::MultipleDelegators,
5184                Default::default(),
5185                upgrade,
5186            )
5187            .await
5188            .unwrap()
5189            .build();
5190
5191        let _network = TestNetwork::new(config, upgrade).await;
5192
5193        let client: Client<ServerError, SequencerApiVersion> =
5194            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5195
5196        // wait for atleast 2 epochs
5197        let _blocks = client
5198            .socket("availability/stream/blocks/0")
5199            .subscribe::<BlockQueryData<SeqTypes>>()
5200            .await
5201            .unwrap()
5202            .take(40)
5203            .try_collect::<Vec<_>>()
5204            .await
5205            .unwrap();
5206
5207        for i in 1..=3 {
5208            let _st = client
5209                .get::<Vec<PeerConfig<SeqTypes>>>(&format!("node/stake-table/{}", i as u64))
5210                .send()
5211                .await
5212                .expect("failed to get stake table");
5213        }
5214
5215        let _st = client
5216            .get::<StakeTableWithEpochNumber<SeqTypes>>("node/stake-table/current")
5217            .send()
5218            .await
5219            .expect("failed to get stake table");
5220    }
5221
5222    #[rstest]
5223    #[case(POS_V3)]
5224    #[case(POS_V4)]
5225    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5226    async fn test_epoch_stake_table_catchup(#[case] upgrade: Upgrade) {
5227        const EPOCH_HEIGHT: u64 = 10;
5228        const NUM_NODES: usize = 6;
5229
5230        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5231
5232        let network_config = TestConfigBuilder::default()
5233            .epoch_height(EPOCH_HEIGHT)
5234            .build();
5235
5236        // Initialize storage for each node
5237        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5238
5239        let persistence_options: [_; NUM_NODES] = storage
5240            .iter()
5241            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5242            .collect::<Vec<_>>()
5243            .try_into()
5244            .unwrap();
5245
5246        // setup catchup peers
5247        let catchup_peers = std::array::from_fn(|_| {
5248            StatePeers::<StaticVersion<0, 1>>::from_urls(
5249                vec![format!("http://localhost:{port}").parse().unwrap()],
5250                Default::default(),
5251                Duration::from_secs(2),
5252                &NoMetrics,
5253            )
5254        });
5255        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
5256            .api_config(SqlDataSource::options(
5257                &storage[0],
5258                Options::with_port(port),
5259            ))
5260            .network_config(network_config)
5261            .persistences(persistence_options.clone())
5262            .catchups(catchup_peers)
5263            .pos_hook(
5264                DelegationConfig::MultipleDelegators,
5265                Default::default(),
5266                upgrade,
5267            )
5268            .await
5269            .unwrap()
5270            .build();
5271
5272        let state = config.states()[0].clone();
5273        let mut network = TestNetwork::new(config, upgrade).await;
5274
5275        // Wait for the peer 0 (node 1) to advance past three epochs
5276        let mut events = network.peers[0].event_stream().await;
5277        while let Some(event) = events.next().await {
5278            if let EventType::Decide { leaf_chain, .. } = event.event {
5279                let height = leaf_chain[0].leaf.height();
5280                tracing::info!("Node 0 decided at height: {height}");
5281                if height > EPOCH_HEIGHT * 3 {
5282                    break;
5283                }
5284            }
5285        }
5286
5287        // Shutdown and remove node 1 to simulate falling behind
5288        tracing::info!("Shutting down peer 0");
5289        network.peers.remove(0);
5290
5291        // Wait for epochs to progress with node 1 offline
5292        let mut events = network.server.event_stream().await;
5293        while let Some(event) = events.next().await {
5294            if let EventType::Decide { leaf_chain, .. } = event.event {
5295                let height = leaf_chain[0].leaf.height();
5296                if height > EPOCH_HEIGHT * 7 {
5297                    break;
5298                }
5299            }
5300        }
5301
5302        // add node 1 to the network with fresh storage
5303        let storage = SqlDataSource::create_storage().await;
5304        let options = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
5305        tracing::info!("Restarting peer 0");
5306        let node = network
5307            .cfg
5308            .init_node(
5309                1,
5310                state,
5311                options,
5312                Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5313                    vec![format!("http://localhost:{port}").parse().unwrap()],
5314                    Default::default(),
5315                    Duration::from_secs(2),
5316                    &NoMetrics,
5317                )),
5318                None,
5319                &NoMetrics,
5320                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5321                NullEventConsumer,
5322                upgrade,
5323                Default::default(),
5324            )
5325            .await;
5326
5327        let coordinator = node.node_state().coordinator;
5328        let server_node_state = network.server.node_state();
5329        let server_coordinator = server_node_state.coordinator;
5330        // Verify that the restarted node catches up for each epoch
5331        for epoch_num in 1..=7 {
5332            let epoch = EpochNumber::new(epoch_num);
5333            let membership_for_epoch = coordinator.membership_for_epoch(Some(epoch)).await;
5334            if membership_for_epoch.is_err() {
5335                coordinator.wait_for_catchup(epoch).await.unwrap();
5336            }
5337
5338            println!("have stake table for epoch = {epoch_num}");
5339
5340            let node_stake_table = coordinator
5341                .membership()
5342                .read()
5343                .await
5344                .stake_table(Some(epoch));
5345            let stake_table = server_coordinator
5346                .membership()
5347                .read()
5348                .await
5349                .stake_table(Some(epoch));
5350            println!("asserting stake table for epoch = {epoch_num}");
5351
5352            assert_eq!(
5353                node_stake_table, stake_table,
5354                "Stake table mismatch for epoch {epoch_num}",
5355            );
5356        }
5357    }
5358
5359    #[rstest]
5360    #[case(POS_V3)]
5361    #[case(POS_V4)]
5362    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5363    async fn test_epoch_stake_table_catchup_stress(#[case] upgrade: Upgrade) {
5364        const EPOCH_HEIGHT: u64 = 10;
5365        const NUM_NODES: usize = 6;
5366
5367        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5368
5369        let network_config = TestConfigBuilder::default()
5370            .epoch_height(EPOCH_HEIGHT)
5371            .build();
5372
5373        // Initialize storage for each node
5374        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5375
5376        let persistence_options: [_; NUM_NODES] = storage
5377            .iter()
5378            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5379            .collect::<Vec<_>>()
5380            .try_into()
5381            .unwrap();
5382
5383        // setup catchup peers
5384        let catchup_peers = std::array::from_fn(|_| {
5385            StatePeers::<StaticVersion<0, 1>>::from_urls(
5386                vec![format!("http://localhost:{port}").parse().unwrap()],
5387                Default::default(),
5388                Duration::from_secs(2),
5389                &NoMetrics,
5390            )
5391        });
5392        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
5393            .api_config(SqlDataSource::options(
5394                &storage[0],
5395                Options::with_port(port),
5396            ))
5397            .network_config(network_config)
5398            .persistences(persistence_options.clone())
5399            .catchups(catchup_peers)
5400            .pos_hook(
5401                DelegationConfig::MultipleDelegators,
5402                Default::default(),
5403                upgrade,
5404            )
5405            .await
5406            .unwrap()
5407            .build();
5408
5409        let state = config.states()[0].clone();
5410        let mut network = TestNetwork::new(config, upgrade).await;
5411
5412        // Wait for the peer 0 (node 1) to advance past three epochs
5413        let mut events = network.peers[0].event_stream().await;
5414        while let Some(event) = events.next().await {
5415            if let EventType::Decide { leaf_chain, .. } = event.event {
5416                let height = leaf_chain[0].leaf.height();
5417                tracing::info!("Node 0 decided at height: {height}");
5418                if height > EPOCH_HEIGHT * 3 {
5419                    break;
5420                }
5421            }
5422        }
5423
5424        // Shutdown and remove node 1 to simulate falling behind
5425        tracing::info!("Shutting down peer 0");
5426        network.peers.remove(0);
5427
5428        // Wait for epochs to progress with node 1 offline
5429        let mut events = network.server.event_stream().await;
5430        while let Some(event) = events.next().await {
5431            if let EventType::Decide { leaf_chain, .. } = event.event {
5432                let height = leaf_chain[0].leaf.height();
5433                tracing::info!("Server decided at height: {height}");
5434                //  until 7 epochs
5435                if height > EPOCH_HEIGHT * 7 {
5436                    break;
5437                }
5438            }
5439        }
5440
5441        // add node 1 to the network with fresh storage
5442        let storage = SqlDataSource::create_storage().await;
5443        let options = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
5444
5445        tracing::info!("Restarting peer 0");
5446        let node = network
5447            .cfg
5448            .init_node(
5449                1,
5450                state,
5451                options,
5452                Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5453                    vec![format!("http://localhost:{port}").parse().unwrap()],
5454                    Default::default(),
5455                    Duration::from_secs(2),
5456                    &NoMetrics,
5457                )),
5458                None,
5459                &NoMetrics,
5460                test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5461                NullEventConsumer,
5462                upgrade,
5463                Default::default(),
5464            )
5465            .await;
5466
5467        let coordinator = node.node_state().coordinator;
5468
5469        let server_node_state = network.server.node_state();
5470        let server_coordinator = server_node_state.coordinator;
5471
5472        // Trigger catchup for all epochs in quick succession and in random order
5473        let mut rand_epochs: Vec<_> = (1..=7).collect();
5474        rand_epochs.shuffle(&mut rand::thread_rng());
5475        println!("trigger catchup in this order: {rand_epochs:?}");
5476        for epoch_num in rand_epochs {
5477            let epoch = EpochNumber::new(epoch_num);
5478            let _ = coordinator.membership_for_epoch(Some(epoch)).await;
5479        }
5480
5481        // Verify that the restarted node catches up for each epoch
5482        for epoch_num in 1..=7 {
5483            println!("getting stake table for epoch = {epoch_num}");
5484            let epoch = EpochNumber::new(epoch_num);
5485            let _ = coordinator.wait_for_catchup(epoch).await.unwrap();
5486
5487            println!("have stake table for epoch = {epoch_num}");
5488
5489            let node_stake_table = coordinator
5490                .membership()
5491                .read()
5492                .await
5493                .stake_table(Some(epoch));
5494            let stake_table = server_coordinator
5495                .membership()
5496                .read()
5497                .await
5498                .stake_table(Some(epoch));
5499
5500            println!("asserting stake table for epoch = {epoch_num}");
5501
5502            assert_eq!(
5503                node_stake_table, stake_table,
5504                "Stake table mismatch for epoch {epoch_num}",
5505            );
5506        }
5507    }
5508
5509    #[rstest]
5510    #[case(POS_V3)]
5511    #[case(POS_V4)]
5512    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5513    async fn test_merklized_state_catchup_on_restart(
5514        #[case] upgrade: Upgrade,
5515    ) -> anyhow::Result<()> {
5516        // This test verifies that a query node can catch up on
5517        // merklized state after being offline for multiple epochs.
5518        //
5519        // Steps:
5520        // 1. Start a test network with 5 sequencer nodes.
5521        // 2. Start a separate node with the query module enabled, connected to the network.
5522        //    - This node stores merklized state
5523        // 3. Shut down the query node after 1 epoch.
5524        // 4. Allow the network to progress 3 more epochs (query node remains offline).
5525        // 5. Restart the query node.
5526        //    - The node is expected to reconstruct or catch up on its own
5527        const EPOCH_HEIGHT: u64 = 10;
5528
5529        let network_config = TestConfigBuilder::default()
5530            .epoch_height(EPOCH_HEIGHT)
5531            .build();
5532
5533        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5534
5535        tracing::info!("API PORT = {api_port}");
5536        const NUM_NODES: usize = 5;
5537
5538        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5539        let persistence: [_; NUM_NODES] = storage
5540            .iter()
5541            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5542            .collect::<Vec<_>>()
5543            .try_into()
5544            .unwrap();
5545
5546        let config = TestNetworkConfigBuilder::with_num_nodes()
5547            .api_config(SqlDataSource::options(
5548                &storage[0],
5549                Options::with_port(api_port).catchup(Default::default()),
5550            ))
5551            .network_config(network_config)
5552            .persistences(persistence.clone())
5553            .catchups(std::array::from_fn(|_| {
5554                StatePeers::<StaticVersion<0, 1>>::from_urls(
5555                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
5556                    Default::default(),
5557                    Duration::from_secs(2),
5558                    &NoMetrics,
5559                )
5560            }))
5561            .pos_hook(
5562                DelegationConfig::MultipleDelegators,
5563                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
5564                upgrade,
5565            )
5566            .await
5567            .unwrap()
5568            .build();
5569        let state = config.states()[0].clone();
5570        let mut network = TestNetwork::new(config, upgrade).await;
5571
5572        // Remove peer 0 and restart it with the query module enabled.
5573        // Adding an additional node to the test network is not straight forward,
5574        // as the keys have already been initialized in the config above.
5575        // So, we remove this node and re-add it using the same index.
5576        network.peers[0].shut_down().await;
5577        network.peers.remove(0);
5578        let node_0_storage = &storage[1];
5579        let node_0_persistence = persistence[1].clone();
5580        let node_0_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5581        tracing::info!("node_0_port {node_0_port}");
5582        // enable query module with api peers
5583        let opt = Options::with_port(node_0_port).query_sql(
5584            Query {
5585                peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
5586            },
5587            tmp_options(node_0_storage),
5588        );
5589
5590        // start the query node so that it builds the merklized state
5591        let node_0 = opt
5592            .clone()
5593            .serve(|metrics, consumer, storage| {
5594                let cfg = network.cfg.clone();
5595                let node_0_persistence = node_0_persistence.clone();
5596                let state = state.clone();
5597                async move {
5598                    Ok(cfg
5599                        .init_node(
5600                            1,
5601                            state,
5602                            node_0_persistence.clone(),
5603                            Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5604                                vec![format!("http://localhost:{api_port}").parse().unwrap()],
5605                                Default::default(),
5606                                Duration::from_secs(2),
5607                                &NoMetrics,
5608                            )),
5609                            storage,
5610                            &*metrics,
5611                            test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5612                            consumer,
5613                            upgrade,
5614                            Default::default(),
5615                        )
5616                        .await)
5617                }
5618                .boxed()
5619            })
5620            .await
5621            .unwrap();
5622
5623        let mut events = network.peers[2].event_stream().await;
5624        // wait for 1 epoch
5625        wait_for_epochs(&mut events, EPOCH_HEIGHT, 1).await;
5626
5627        // shutdown the node for 3 epochs
5628        drop(node_0);
5629
5630        // wait for 4 epochs
5631        wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
5632
5633        // start the node again.
5634        tracing::info!("restarting node");
5635        let node_0 = opt
5636            .serve(|metrics, consumer, storage| {
5637                let cfg = network.cfg.clone();
5638                async move {
5639                    Ok(cfg
5640                        .init_node(
5641                            1,
5642                            state,
5643                            node_0_persistence,
5644                            Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5645                                vec![format!("http://localhost:{api_port}").parse().unwrap()],
5646                                Default::default(),
5647                                Duration::from_secs(2),
5648                                &NoMetrics,
5649                            )),
5650                            storage,
5651                            &*metrics,
5652                            test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5653                            consumer,
5654                            upgrade,
5655                            Default::default(),
5656                        )
5657                        .await)
5658                }
5659                .boxed()
5660            })
5661            .await
5662            .unwrap();
5663
5664        let client: Client<ServerError, SequencerApiVersion> =
5665            Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
5666        client.connect(None).await;
5667
5668        wait_for_epochs(&mut events, EPOCH_HEIGHT, 6).await;
5669
5670        let epoch_7_block = EPOCH_HEIGHT * 6 + 1;
5671
5672        // check that the node's state has reward accounts
5673        let mut retries = 0;
5674        loop {
5675            sleep(Duration::from_secs(1)).await;
5676            let state = node_0.decided_state().await;
5677
5678            let leaves = if upgrade.base == EPOCH_VERSION {
5679                // Use legacy tree for V3
5680                state.reward_merkle_tree_v1.num_leaves()
5681            } else {
5682                // Use new tree for V4 and above
5683                state.reward_merkle_tree_v2.num_leaves()
5684            };
5685
5686            if leaves > 0 {
5687                tracing::info!("Node's state has reward accounts");
5688                break;
5689            }
5690
5691            retries += 1;
5692            if retries > 120 {
5693                panic!("max retries reached. failed to catchup reward state");
5694            }
5695        }
5696
5697        retries = 0;
5698        // check that the node has stored atleast 6 epochs merklized state in persistence
5699        loop {
5700            sleep(Duration::from_secs(3)).await;
5701
5702            let bh = client
5703                .get::<u64>("block-state/block-height")
5704                .send()
5705                .await
5706                .expect("block height not found");
5707
5708            tracing::info!("block state: block height={bh}");
5709            if bh > epoch_7_block {
5710                break;
5711            }
5712
5713            retries += 1;
5714            if retries > 30 {
5715                panic!(
5716                    "max retries reached. block state block height is less than epoch 7 start \
5717                     block"
5718                );
5719            }
5720        }
5721
5722        // shutdown consensus to freeze the state
5723        node_0.shutdown_consensus().await;
5724        let decided_leaf = node_0.decided_leaf().await;
5725        let state = node_0.decided_state().await;
5726        tracing::info!(
5727            height = decided_leaf.height(),
5728            ?decided_leaf,
5729            ?state,
5730            "final state"
5731        );
5732
5733        let height = decided_leaf.height();
5734        let num_leaves = state.block_merkle_tree.num_leaves();
5735        tracing::info!(height, num_leaves, "checking block merkle tree state");
5736        state
5737            .block_merkle_tree
5738            .lookup(height - 1)
5739            .expect_ok()
5740            .unwrap_or_else(|err| {
5741                panic!(
5742                    "block state not found ({err:#}):\n{:#?}",
5743                    state.block_merkle_tree
5744                )
5745            });
5746
5747        Ok(())
5748    }
5749
5750    #[rstest]
5751    #[case(POS_V3)]
5752    #[case(POS_V4)]
5753    #[test_log::test(tokio::test(flavor = "multi_thread"))]
5754    async fn test_state_reconstruction(#[case] upgrade: Upgrade) -> anyhow::Result<()> {
5755        // This test verifies that a query node can successfully reconstruct its state
5756        // after being shut down from the database
5757        //
5758        // Steps:
5759        // 1. Start a test network with 5 nodes.
5760        // 2. Add a query node connected to the network.
5761        // 3. Let the network run until 3 epochs have passed.
5762        // 4. Shut down the query node.
5763        // 5. Attempt to reconstruct its state from storage using:
5764        //    - No fee/reward accounts
5765        //    - Only fee accounts
5766        //    - Only reward accounts
5767        //    - Both fee and reward accounts
5768        // 6. Assert that the reconstructed state is correct in all scenarios.
5769
5770        const EPOCH_HEIGHT: u64 = 10;
5771
5772        let network_config = TestConfigBuilder::default()
5773            .epoch_height(EPOCH_HEIGHT)
5774            .build();
5775
5776        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5777
5778        tracing::info!("API PORT = {api_port}");
5779        const NUM_NODES: usize = 5;
5780
5781        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5782        let persistence: [_; NUM_NODES] = storage
5783            .iter()
5784            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5785            .collect::<Vec<_>>()
5786            .try_into()
5787            .unwrap();
5788
5789        let config = TestNetworkConfigBuilder::with_num_nodes()
5790            .api_config(SqlDataSource::options(
5791                &storage[0],
5792                Options::with_port(api_port),
5793            ))
5794            .network_config(network_config)
5795            .persistences(persistence.clone())
5796            .catchups(std::array::from_fn(|_| {
5797                StatePeers::<StaticVersion<0, 1>>::from_urls(
5798                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
5799                    Default::default(),
5800                    Duration::from_secs(2),
5801                    &NoMetrics,
5802                )
5803            }))
5804            .pos_hook(
5805                DelegationConfig::MultipleDelegators,
5806                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
5807                upgrade,
5808            )
5809            .await
5810            .unwrap()
5811            .build();
5812        let state = config.states()[0].clone();
5813        let mut network = TestNetwork::new(config, upgrade).await;
5814        // Remove peer 0 and restart it with the query module enabled.
5815        // Adding an additional node to the test network is not straight forward,
5816        // as the keys have already been initialized in the config above.
5817        // So, we remove this node and re-add it using the same index.
5818        network.peers.remove(0);
5819
5820        let node_0_storage = &storage[1];
5821        let node_0_persistence = persistence[1].clone();
5822        let node_0_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5823        tracing::info!("node_0_port {node_0_port}");
5824        let opt = Options::with_port(node_0_port).query_sql(
5825            Query {
5826                peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
5827            },
5828            tmp_options(node_0_storage),
5829        );
5830        let node_0 = opt
5831            .clone()
5832            .serve(|metrics, consumer, storage| {
5833                let cfg = network.cfg.clone();
5834                let node_0_persistence = node_0_persistence.clone();
5835                let state = state.clone();
5836                async move {
5837                    Ok(cfg
5838                        .init_node(
5839                            1,
5840                            state,
5841                            node_0_persistence.clone(),
5842                            Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5843                                vec![format!("http://localhost:{api_port}").parse().unwrap()],
5844                                Default::default(),
5845                                Duration::from_secs(2),
5846                                &NoMetrics,
5847                            )),
5848                            storage,
5849                            &*metrics,
5850                            test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5851                            consumer,
5852                            upgrade,
5853                            Default::default(),
5854                        )
5855                        .await)
5856                }
5857                .boxed()
5858            })
5859            .await
5860            .unwrap();
5861
5862        let mut events = network.peers[2].event_stream().await;
5863        // Wait until at least 3 epochs have passed
5864        wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
5865
5866        tracing::warn!("shutting down node 0");
5867
5868        node_0.shutdown_consensus().await;
5869
5870        let instance = node_0.node_state();
5871        let state = node_0.decided_state().await;
5872        let fee_accounts = state
5873            .fee_merkle_tree
5874            .clone()
5875            .into_iter()
5876            .map(|(acct, _)| acct)
5877            .collect::<Vec<_>>();
5878        let reward_accounts = match upgrade.base {
5879            EPOCH_VERSION => state
5880                .reward_merkle_tree_v1
5881                .clone()
5882                .into_iter()
5883                .map(|(acct, _)| RewardAccountV2::from(acct))
5884                .collect::<Vec<_>>(),
5885            DRB_AND_HEADER_UPGRADE_VERSION => state
5886                .reward_merkle_tree_v2
5887                .clone()
5888                .into_iter()
5889                .map(|(acct, _)| acct)
5890                .collect::<Vec<_>>(),
5891            _ => panic!("invalid version"),
5892        };
5893
5894        let client: Client<ServerError, SequencerApiVersion> =
5895            Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
5896        client.connect(Some(Duration::from_secs(10))).await;
5897
5898        // wait 3s to be sure that all the
5899        // transactions have been committed
5900        sleep(Duration::from_secs(3)).await;
5901
5902        tracing::info!("getting node block height");
5903        let node_block_height = client
5904            .get::<u64>("node/block-height")
5905            .send()
5906            .await
5907            .context("getting Espresso block height")
5908            .unwrap();
5909
5910        tracing::info!("node block height={node_block_height}");
5911
5912        let leaf_query_data = client
5913            .get::<LeafQueryData<SeqTypes>>(&format!("availability/leaf/{}", node_block_height - 1))
5914            .send()
5915            .await
5916            .context("error getting leaf")
5917            .unwrap();
5918
5919        tracing::info!("leaf={leaf_query_data:?}");
5920        let leaf = leaf_query_data.leaf();
5921        let to_view = leaf.view_number() + 1;
5922
5923        let ds = SqlStorage::connect(
5924            Config::try_from(&node_0_persistence).unwrap(),
5925            StorageConnectionType::Sequencer,
5926        )
5927        .await
5928        .unwrap();
5929        let mut tx = ds.read().await?;
5930
5931        let (state, leaf) = reconstruct_state(
5932            &instance,
5933            &ds,
5934            &mut tx,
5935            node_block_height - 1,
5936            to_view,
5937            &[],
5938            &[],
5939        )
5940        .await
5941        .unwrap();
5942        assert_eq!(leaf.view_number(), to_view);
5943        assert!(
5944            state
5945                .block_merkle_tree
5946                .lookup(node_block_height - 1)
5947                .expect_ok()
5948                .is_ok(),
5949            "inconsistent block merkle tree"
5950        );
5951
5952        // Reconstruct fee state
5953        let (state, leaf) = reconstruct_state(
5954            &instance,
5955            &ds,
5956            &mut tx,
5957            node_block_height - 1,
5958            to_view,
5959            &fee_accounts,
5960            &[],
5961        )
5962        .await
5963        .unwrap();
5964
5965        assert_eq!(leaf.view_number(), to_view);
5966        assert!(
5967            state
5968                .block_merkle_tree
5969                .lookup(node_block_height - 1)
5970                .expect_ok()
5971                .is_ok(),
5972            "inconsistent block merkle tree"
5973        );
5974
5975        for account in &fee_accounts {
5976            state.fee_merkle_tree.lookup(account).expect_ok().unwrap();
5977        }
5978
5979        // Reconstruct reward state
5980
5981        let (state, leaf) = reconstruct_state(
5982            &instance,
5983            &ds,
5984            &mut tx,
5985            node_block_height - 1,
5986            to_view,
5987            &[],
5988            &reward_accounts,
5989        )
5990        .await
5991        .unwrap();
5992
5993        match upgrade.base {
5994            EPOCH_VERSION => {
5995                for account in reward_accounts.clone() {
5996                    state
5997                        .reward_merkle_tree_v1
5998                        .lookup(RewardAccountV1::from(account))
5999                        .expect_ok()
6000                        .unwrap();
6001                }
6002            },
6003            DRB_AND_HEADER_UPGRADE_VERSION => {
6004                for account in &reward_accounts {
6005                    state
6006                        .reward_merkle_tree_v2
6007                        .lookup(account)
6008                        .expect_ok()
6009                        .unwrap();
6010                }
6011            },
6012            _ => panic!("invalid version"),
6013        };
6014
6015        assert_eq!(leaf.view_number(), to_view);
6016        assert!(
6017            state
6018                .block_merkle_tree
6019                .lookup(node_block_height - 1)
6020                .expect_ok()
6021                .is_ok(),
6022            "inconsistent block merkle tree"
6023        );
6024        // Reconstruct reward and fee state
6025
6026        let (state, leaf) = reconstruct_state(
6027            &instance,
6028            &ds,
6029            &mut tx,
6030            node_block_height - 1,
6031            to_view,
6032            &fee_accounts,
6033            &reward_accounts,
6034        )
6035        .await
6036        .unwrap();
6037
6038        assert!(
6039            state
6040                .block_merkle_tree
6041                .lookup(node_block_height - 1)
6042                .expect_ok()
6043                .is_ok(),
6044            "inconsistent block merkle tree"
6045        );
6046        assert_eq!(leaf.view_number(), to_view);
6047
6048        match upgrade.base {
6049            EPOCH_VERSION => {
6050                for account in reward_accounts.clone() {
6051                    state
6052                        .reward_merkle_tree_v1
6053                        .lookup(RewardAccountV1::from(account))
6054                        .expect_ok()
6055                        .unwrap();
6056                }
6057            },
6058            DRB_AND_HEADER_UPGRADE_VERSION => {
6059                for account in &reward_accounts {
6060                    state
6061                        .reward_merkle_tree_v2
6062                        .lookup(account)
6063                        .expect_ok()
6064                        .unwrap();
6065                }
6066            },
6067            _ => panic!("invalid version"),
6068        };
6069
6070        for account in &fee_accounts {
6071            state.fee_merkle_tree.lookup(account).expect_ok().unwrap();
6072        }
6073
6074        Ok(())
6075    }
6076
6077    #[rstest]
6078    #[case(POS_V3)]
6079    #[case(POS_V4)]
6080    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6081    async fn test_block_reward_api(#[case] upgrade: Upgrade) -> anyhow::Result<()> {
6082        let epoch_height = 10;
6083
6084        let network_config = TestConfigBuilder::default()
6085            .epoch_height(epoch_height)
6086            .build();
6087
6088        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6089
6090        const NUM_NODES: usize = 1;
6091        // Initialize nodes.
6092        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6093        let persistence: [_; NUM_NODES] = storage
6094            .iter()
6095            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6096            .collect::<Vec<_>>()
6097            .try_into()
6098            .unwrap();
6099
6100        let config = TestNetworkConfigBuilder::with_num_nodes()
6101            .api_config(SqlDataSource::options(
6102                &storage[0],
6103                Options::with_port(api_port),
6104            ))
6105            .network_config(network_config.clone())
6106            .persistences(persistence.clone())
6107            .catchups(std::array::from_fn(|_| {
6108                StatePeers::<StaticVersion<0, 1>>::from_urls(
6109                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
6110                    Default::default(),
6111                    Duration::from_secs(2),
6112                    &NoMetrics,
6113                )
6114            }))
6115            .pos_hook(
6116                DelegationConfig::VariableAmounts,
6117                Default::default(),
6118                upgrade,
6119            )
6120            .await
6121            .unwrap()
6122            .build();
6123
6124        let _network = TestNetwork::new(config, upgrade).await;
6125        let client: Client<ServerError, SequencerApiVersion> =
6126            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6127
6128        let _blocks = client
6129            .socket("availability/stream/blocks/0")
6130            .subscribe::<BlockQueryData<SeqTypes>>()
6131            .await
6132            .unwrap()
6133            .take(3)
6134            .try_collect::<Vec<_>>()
6135            .await
6136            .unwrap();
6137
6138        let block_reward = client
6139            .get::<Option<RewardAmount>>("node/block-reward")
6140            .send()
6141            .await
6142            .expect("failed to get block reward")
6143            .expect("block reward is None");
6144        tracing::info!("block_reward={block_reward:?}");
6145
6146        assert!(block_reward.0 > U256::ZERO);
6147
6148        Ok(())
6149    }
6150
6151    /// `chain_id`: None = default (35353, non-mainnet), Some(1) = mainnet
6152    #[rstest]
6153    #[case(POS_V4, None)]
6154    #[case(POS_V4, Some(1u64))]
6155    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6156    async fn test_token_supply_api(
6157        #[case] upgrade: Upgrade,
6158        #[case] chain_id: Option<u64>,
6159    ) -> anyhow::Result<()> {
6160        use alloy::primitives::utils::parse_ether;
6161        use espresso_types::v0_3::ChainConfig;
6162
6163        let epoch_height = 10;
6164        let network_config = TestConfigBuilder::default()
6165            .epoch_height(epoch_height)
6166            .build();
6167
6168        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6169
6170        const NUM_NODES: usize = 1;
6171        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6172        let persistence: [_; NUM_NODES] = storage
6173            .iter()
6174            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6175            .collect::<Vec<_>>()
6176            .try_into()
6177            .unwrap();
6178
6179        // Use the real initial supply (3.59B tokens) so the unlock schedule
6180        // produces realistic locked/unlocked values in the supply calculations.
6181        let initial_supply_tokens = U256::from(3_590_000_000u64);
6182        let initial_supply_wei = parse_ether("3590000000").unwrap();
6183
6184        let mut builder = TestNetworkConfigBuilder::with_num_nodes()
6185            .api_config(SqlDataSource::options(
6186                &storage[0],
6187                Options::with_port(api_port),
6188            ))
6189            .network_config(network_config.clone())
6190            .persistences(persistence.clone())
6191            .catchups(std::array::from_fn(|_| {
6192                StatePeers::<StaticVersion<0, 1>>::from_urls(
6193                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
6194                    Default::default(),
6195                    Duration::from_secs(2),
6196                    &NoMetrics,
6197                )
6198            }))
6199            .initial_token_supply(initial_supply_tokens);
6200
6201        // Must set states before pos_hook, which preserves chain_id from state[0].
6202        if let Some(id) = chain_id {
6203            let state = ValidatedState {
6204                chain_config: ChainConfig {
6205                    chain_id: U256::from(id).into(),
6206                    ..Default::default()
6207                }
6208                .into(),
6209                ..Default::default()
6210            };
6211            builder = builder.states(std::array::from_fn(|_| state.clone()));
6212        }
6213
6214        let config = builder
6215            .pos_hook(
6216                DelegationConfig::VariableAmounts,
6217                Default::default(),
6218                upgrade,
6219            )
6220            .await
6221            .unwrap()
6222            .build();
6223
6224        let _network = TestNetwork::new(config, upgrade).await;
6225        let client: Client<ServerError, SequencerApiVersion> =
6226            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6227
6228        let _blocks = client
6229            .socket("availability/stream/blocks/0")
6230            .subscribe::<BlockQueryData<SeqTypes>>()
6231            .await
6232            .unwrap()
6233            .take(3)
6234            .try_collect::<Vec<_>>()
6235            .await
6236            .unwrap();
6237
6238        let minted: String = client
6239            .get("token/total-minted-supply")
6240            .send()
6241            .await
6242            .expect("total-minted-supply");
6243        let circ_eth: String = client
6244            .get("token/circulating-supply-ethereum")
6245            .send()
6246            .await
6247            .expect("circulating-supply-ethereum");
6248        let circulating: String = client
6249            .get("token/circulating-supply")
6250            .send()
6251            .await
6252            .expect("circulating-supply");
6253        tracing::info!(%minted, %circ_eth, %circulating);
6254
6255        let minted = parse_ether(&minted)?;
6256        let circ_eth = parse_ether(&circ_eth)?;
6257        let circ = parse_ether(&circulating)?;
6258
6259        assert_eq!(minted, initial_supply_wei);
6260        assert!(circ_eth <= minted);
6261        assert!(circ >= circ_eth);
6262        assert!(circ > U256::ZERO);
6263
6264        if chain_id == Some(1) {
6265            // Proves the unlock schedule is hooked up: locked > 0 means
6266            // the mainnet code path ran. Vesting ends ~2032; delete after.
6267            assert!(circ_eth < minted);
6268        }
6269
6270        Ok(())
6271    }
6272
6273    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6274    async fn test_scanning_token_contract_initialized_event() -> anyhow::Result<()> {
6275        use espresso_types::v0_3::ChainConfig;
6276
6277        let blocks_per_epoch = 10;
6278
6279        let network_config = TestConfigBuilder::<1>::default()
6280            .epoch_height(blocks_per_epoch)
6281            .build();
6282
6283        let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
6284            &network_config.hotshot_config().hotshot_stake_table(),
6285            STAKE_TABLE_CAPACITY_FOR_TEST,
6286        )
6287        .unwrap();
6288
6289        let deployer = ProviderBuilder::new()
6290            .wallet(EthereumWallet::from(network_config.signer().clone()))
6291            .connect_http(network_config.l1_url().clone());
6292
6293        let mut contracts = Contracts::new();
6294        let args = DeployerArgsBuilder::default()
6295            .deployer(deployer.clone())
6296            .rpc_url(network_config.l1_url().clone())
6297            .mock_light_client(true)
6298            .genesis_lc_state(genesis_state)
6299            .genesis_st_state(genesis_stake)
6300            .blocks_per_epoch(blocks_per_epoch)
6301            .epoch_start_block(1)
6302            .multisig_pauser(network_config.signer().address())
6303            .token_name("Espresso".to_string())
6304            .token_symbol("ESP".to_string())
6305            .initial_token_supply(U256::from(3590000000u64))
6306            .ops_timelock_delay(U256::from(0))
6307            .ops_timelock_admin(network_config.signer().address())
6308            .ops_timelock_proposers(vec![network_config.signer().address()])
6309            .ops_timelock_executors(vec![network_config.signer().address()])
6310            .safe_exit_timelock_delay(U256::from(0))
6311            .safe_exit_timelock_admin(network_config.signer().address())
6312            .safe_exit_timelock_proposers(vec![network_config.signer().address()])
6313            .safe_exit_timelock_executors(vec![network_config.signer().address()])
6314            .build()
6315            .unwrap();
6316
6317        args.deploy_all(&mut contracts).await.unwrap();
6318
6319        let st_addr = contracts
6320            .address(Contract::StakeTableProxy)
6321            .expect("StakeTableProxy deployed");
6322
6323        let l1_url = network_config.l1_url().clone();
6324
6325        let storage = SqlDataSource::create_storage().await;
6326        let mut opt = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
6327        let persistence = opt.create().await.unwrap();
6328
6329        let l1_client = L1ClientOptions {
6330            stake_table_update_interval: Duration::from_secs(7),
6331            l1_retry_delay: Duration::from_millis(10),
6332            l1_events_max_block_range: 10000,
6333            ..Default::default()
6334        }
6335        .connect(vec![l1_url])
6336        .unwrap();
6337        l1_client.spawn_tasks().await;
6338
6339        let fetcher = Fetcher::new(
6340            Arc::new(NullStateCatchup::default()),
6341            Arc::new(Mutex::new(persistence.clone())),
6342            l1_client.clone(),
6343            ChainConfig {
6344                stake_table_contract: Some(st_addr),
6345                base_fee: 0.into(),
6346                ..Default::default()
6347            },
6348        );
6349
6350        let provider = l1_client.provider;
6351        let stake_table = StakeTableV2::new(st_addr, provider.clone());
6352
6353        let stake_table_init_block = stake_table
6354            .initializedAtBlock()
6355            .block(BlockId::finalized())
6356            .call()
6357            .await?
6358            .to::<u64>();
6359
6360        tracing::info!("stake table init block = {stake_table_init_block}");
6361
6362        let token_address = stake_table
6363            .token()
6364            .block(BlockId::finalized())
6365            .call()
6366            .await
6367            .context("Failed to get token address")?;
6368
6369        let token = EspToken::new(token_address, provider.clone());
6370
6371        let init_log = fetcher
6372            .scan_token_contract_initialized_event_log(stake_table_init_block, token.clone())
6373            .await
6374            .unwrap();
6375
6376        let init_block = init_log.block_number.context("missing block number")?;
6377        let init_tx_hash = init_log
6378            .transaction_hash
6379            .context("missing transaction hash")?;
6380
6381        let transfer_logs = token
6382            .Transfer_filter()
6383            .from_block(init_block)
6384            .to_block(init_block)
6385            .query()
6386            .await
6387            .unwrap();
6388
6389        let (mint_transfer, _) = transfer_logs
6390            .iter()
6391            .find(|(transfer, log)| {
6392                log.transaction_hash == Some(init_tx_hash) && transfer.from == Address::ZERO
6393            })
6394            .context("no mint transfer event in init tx")?;
6395
6396        assert!(mint_transfer.value > U256::ZERO);
6397
6398        Ok(())
6399    }
6400
6401    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6402    async fn test_tx_metadata() {
6403        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6404
6405        let url = format!("http://localhost:{port}").parse().unwrap();
6406        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
6407
6408        let storage = SqlDataSource::create_storage().await;
6409        let network_config = TestConfigBuilder::default().build();
6410        let config = TestNetworkConfigBuilder::default()
6411            .api_config(
6412                SqlDataSource::options(&storage, Options::with_port(port))
6413                    .submit(Default::default())
6414                    .explorer(Default::default()),
6415            )
6416            .network_config(network_config)
6417            .build();
6418        let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
6419        let mut events = network.server.event_stream().await;
6420
6421        client.connect(None).await;
6422
6423        // Submit a few transactions in different namespaces.
6424        let namespace_counts = [(101, 1), (102, 2), (103, 3)];
6425        for (ns, count) in &namespace_counts {
6426            for i in 0..*count {
6427                let ns_id = NamespaceId::from(*ns as u64);
6428                let txn = Transaction::new(ns_id, vec![*ns, i]);
6429                client
6430                    .post::<()>("submit/submit")
6431                    .body_json(&txn)
6432                    .unwrap()
6433                    .send()
6434                    .await
6435                    .unwrap();
6436                let (block, _) = wait_for_decide_on_handle(&mut events, &txn).await;
6437
6438                // Block summary should contain information about the namespace.
6439                let summary: BlockSummaryQueryData<SeqTypes> = client
6440                    .get(&format!("availability/block/summary/{block}"))
6441                    .send()
6442                    .await
6443                    .unwrap();
6444                let ns_info = summary.namespaces();
6445                assert_eq!(ns_info.len(), 1);
6446                assert_eq!(ns_info.keys().copied().collect::<Vec<_>>(), vec![ns_id]);
6447                assert_eq!(ns_info[&ns_id].num_transactions, 1);
6448                assert_eq!(ns_info[&ns_id].size, txn.size_in_block(true));
6449            }
6450        }
6451
6452        // List transactions in each namespace.
6453        for (ns, count) in &namespace_counts {
6454            tracing::info!(ns, "list transactions in namespace");
6455
6456            let ns_id = NamespaceId::from(*ns as u64);
6457            let summaries: TransactionSummariesResponse<SeqTypes> = client
6458                .get(&format!(
6459                    "explorer/transactions/latest/{count}/namespace/{ns_id}"
6460                ))
6461                .send()
6462                .await
6463                .unwrap();
6464            let txs = summaries.transaction_summaries;
6465            assert_eq!(txs.len(), *count as usize);
6466
6467            // Check that transactions are listed in descending order.
6468            for i in 0..*count {
6469                let summary = &txs[i as usize];
6470                let expected = Transaction::new(ns_id, vec![*ns, count - i - 1]);
6471                assert_eq!(summary.rollups, vec![ns_id]);
6472                assert_eq!(summary.hash, expected.commit());
6473            }
6474        }
6475    }
6476
6477    use std::time::Instant;
6478
6479    use rand::thread_rng;
6480
6481    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6482    async fn test_aggregator_namespace_endpoints() {
6483        let mut rng = thread_rng();
6484
6485        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6486
6487        let url = format!("http://localhost:{port}").parse().unwrap();
6488        tracing::info!("Sequencer URL = {url}");
6489        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
6490
6491        let options = Options::with_port(port).submit(Default::default());
6492        const NUM_NODES: usize = 2;
6493        // Initialize storage for each node
6494        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6495
6496        let persistence_options: [_; NUM_NODES] = storage
6497            .iter()
6498            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6499            .collect::<Vec<_>>()
6500            .try_into()
6501            .unwrap();
6502
6503        let network_config = TestConfigBuilder::default().build();
6504
6505        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
6506            .api_config(SqlDataSource::options(&storage[0], options))
6507            .network_config(network_config)
6508            .persistences(persistence_options.clone())
6509            .build();
6510        let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
6511        let mut events = network.server.event_stream().await;
6512        let start = Instant::now();
6513        let mut total_transactions = 0;
6514        let mut tx_heights = Vec::new();
6515        let mut sizes = HashMap::new();
6516        // inserting transactions for some namespaces
6517        // the number of transactions inserted is equal to namespace number.
6518        for namespace in 1..=4 {
6519            for _count in 0..namespace {
6520                // Generate a random payload length between 4 and 10 bytes
6521                let payload_len = rng.gen_range(4..=10);
6522                let payload: Vec<u8> = (0..payload_len).map(|_| rng.r#gen()).collect();
6523
6524                let txn = Transaction::new(NamespaceId::from(namespace as u32), payload);
6525
6526                client.connect(None).await;
6527
6528                let hash = client
6529                    .post("submit/submit")
6530                    .body_json(&txn)
6531                    .unwrap()
6532                    .send()
6533                    .await
6534                    .unwrap();
6535                assert_eq!(txn.commit(), hash);
6536
6537                // Wait for a Decide event containing transaction matching the one we sent
6538                let (height, size) = wait_for_decide_on_handle(&mut events, &txn).await;
6539                tx_heights.push(height);
6540                total_transactions += 1;
6541                *sizes.entry(namespace).or_insert(0) += size;
6542            }
6543        }
6544
6545        let duration = start.elapsed();
6546
6547        println!("Time elapsed to submit transactions: {duration:?}");
6548
6549        let last_tx_height = tx_heights.last().unwrap();
6550        for namespace in 1..=4 {
6551            let count = client
6552                .get::<u64>(&format!("node/transactions/count/namespace/{namespace}"))
6553                .send()
6554                .await
6555                .unwrap();
6556            assert_eq!(
6557                count, namespace as u64,
6558                "Incorrect transaction count for namespace {namespace}: expected {namespace}, got \
6559                 {count}"
6560            );
6561
6562            // check the range endpoint
6563            let to_endpoint_count = client
6564                .get::<u64>(&format!(
6565                    "node/transactions/count/namespace/{namespace}/{last_tx_height}"
6566                ))
6567                .send()
6568                .await
6569                .unwrap();
6570            assert_eq!(
6571                to_endpoint_count, namespace as u64,
6572                "Incorrect transaction count for range endpoint (to only) for namespace \
6573                 {namespace}: expected {namespace}, got {to_endpoint_count}"
6574            );
6575
6576            // check the range endpoint
6577            let from_to_endpoint_count = client
6578                .get::<u64>(&format!(
6579                    "node/transactions/count/namespace/{namespace}/0/{last_tx_height}"
6580                ))
6581                .send()
6582                .await
6583                .unwrap();
6584            assert_eq!(
6585                from_to_endpoint_count, namespace as u64,
6586                "Incorrect transaction count for range endpoint (from-to) for namespace \
6587                 {namespace}: expected {namespace}, got {from_to_endpoint_count}"
6588            );
6589
6590            let ns_size = client
6591                .get::<usize>(&format!("node/payloads/size/namespace/{namespace}"))
6592                .send()
6593                .await
6594                .unwrap();
6595
6596            let expected_ns_size = *sizes.get(&namespace).unwrap();
6597            assert_eq!(
6598                ns_size, expected_ns_size,
6599                "Incorrect payload size for namespace {namespace}: expected {expected_ns_size}, \
6600                 got {ns_size}"
6601            );
6602
6603            let ns_size_to = client
6604                .get::<usize>(&format!(
6605                    "node/payloads/size/namespace/{namespace}/{last_tx_height}"
6606                ))
6607                .send()
6608                .await
6609                .unwrap();
6610            assert_eq!(
6611                ns_size_to, expected_ns_size,
6612                "Incorrect payload size for namespace {namespace} up to height {last_tx_height}: \
6613                 expected {expected_ns_size}, got {ns_size_to}"
6614            );
6615
6616            let ns_size_from_to = client
6617                .get::<usize>(&format!(
6618                    "node/payloads/size/namespace/{namespace}/0/{last_tx_height}"
6619                ))
6620                .send()
6621                .await
6622                .unwrap();
6623            assert_eq!(
6624                ns_size_from_to, expected_ns_size,
6625                "Incorrect payload size for namespace {namespace} from 0 to height \
6626                 {last_tx_height}: expected {expected_ns_size}, got {ns_size_from_to}"
6627            );
6628        }
6629
6630        let total_tx_count = client
6631            .get::<u64>("node/transactions/count")
6632            .send()
6633            .await
6634            .unwrap();
6635        assert_eq!(
6636            total_tx_count, total_transactions,
6637            "Incorrect total transaction count: expected {total_transactions}, got \
6638             {total_tx_count}"
6639        );
6640
6641        let total_payload_size = client
6642            .get::<usize>("node/payloads/size")
6643            .send()
6644            .await
6645            .unwrap();
6646
6647        let expected_total_size: usize = sizes.values().copied().sum();
6648        assert_eq!(
6649            total_payload_size, expected_total_size,
6650            "Incorrect total payload size: expected {expected_total_size}, got \
6651             {total_payload_size}"
6652        );
6653    }
6654
6655    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6656    async fn test_stream_transactions_endpoint() {
6657        // This test submits transactions to a sequencer for multiple namespaces,
6658        // waits for them to be decided, and then verifies that:
6659        // 1. All transactions appear in the transaction stream.
6660        // 2. Each namespace-specific transaction stream only includes the transactions of that namespace.
6661
6662        let mut rng = thread_rng();
6663
6664        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6665
6666        let url = format!("http://localhost:{port}").parse().unwrap();
6667        tracing::info!("Sequencer URL = {url}");
6668        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
6669
6670        let options = Options::with_port(port).submit(Default::default());
6671        const NUM_NODES: usize = 2;
6672        // Initialize storage for each node
6673        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6674
6675        let persistence_options: [_; NUM_NODES] = storage
6676            .iter()
6677            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6678            .collect::<Vec<_>>()
6679            .try_into()
6680            .unwrap();
6681
6682        let network_config = TestConfigBuilder::default().build();
6683
6684        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
6685            .api_config(SqlDataSource::options(&storage[0], options))
6686            .network_config(network_config)
6687            .persistences(persistence_options.clone())
6688            .build();
6689        let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
6690        let mut events = network.server.event_stream().await;
6691        let mut all_transactions = HashMap::new();
6692        let mut namespace_tx: HashMap<_, HashSet<_>> = HashMap::new();
6693
6694        // Submit transactions to namespaces 1 through 4
6695
6696        for namespace in 1..=4 {
6697            for _count in 0..namespace {
6698                let payload_len = rng.gen_range(4..=10);
6699                let payload: Vec<u8> = (0..payload_len).map(|_| rng.r#gen()).collect();
6700
6701                let txn = Transaction::new(NamespaceId::from(namespace as u32), payload);
6702
6703                client.connect(None).await;
6704
6705                let hash = client
6706                    .post("submit/submit")
6707                    .body_json(&txn)
6708                    .unwrap()
6709                    .send()
6710                    .await
6711                    .unwrap();
6712                assert_eq!(txn.commit(), hash);
6713
6714                // Wait for a Decide event containing transaction matching the one we sent
6715                wait_for_decide_on_handle(&mut events, &txn).await;
6716                // Store transaction for later validation
6717
6718                all_transactions.insert(txn.commit(), txn.clone());
6719                namespace_tx.entry(namespace).or_default().insert(txn);
6720            }
6721        }
6722
6723        let mut transactions = client
6724            .socket("availability/stream/transactions/0")
6725            .subscribe::<TransactionQueryData<SeqTypes>>()
6726            .await
6727            .expect("failed to subscribe to transactions endpoint");
6728
6729        let mut count = 0;
6730        while let Some(tx) = transactions.next().await {
6731            let tx = tx.unwrap();
6732            let expected = all_transactions
6733                .get(&tx.transaction().commit())
6734                .expect("txn not found ");
6735            assert_eq!(tx.transaction(), expected, "invalid transaction");
6736            count += 1;
6737
6738            if count == all_transactions.len() {
6739                break;
6740            }
6741        }
6742
6743        // Validate namespace-specific stream endpoint
6744
6745        for (namespace, expected_ns_txns) in &namespace_tx {
6746            let mut api_namespace_txns = client
6747                .socket(&format!(
6748                    "availability/stream/transactions/0/namespace/{namespace}",
6749                ))
6750                .subscribe::<TransactionQueryData<SeqTypes>>()
6751                .await
6752                .unwrap_or_else(|_| {
6753                    panic!("failed to subscribe to transactions namespace {namespace}")
6754                });
6755
6756            let mut received = HashSet::new();
6757
6758            while let Some(res) = api_namespace_txns.next().await {
6759                let tx = res.expect("stream error");
6760                received.insert(tx.transaction().clone());
6761
6762                if received.len() == expected_ns_txns.len() {
6763                    break;
6764                }
6765            }
6766
6767            assert_eq!(
6768                received, *expected_ns_txns,
6769                "Mismatched transactions for namespace {namespace}"
6770            );
6771        }
6772    }
6773
6774    #[rstest]
6775    #[case(POS_V3)]
6776    #[case(POS_V4)]
6777    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6778    async fn test_v3_and_v4_reward_tree_updates(#[case] upgrade: Upgrade) -> anyhow::Result<()> {
6779        // This test checks that the correct merkle tree is updated based on version
6780        //
6781        // When the protocol version is v3:
6782        // - The v3 Merkle tree is updated
6783        // - The v4 Merkle tree must be empty.
6784        //
6785        // When the protocol version is v4:
6786        // - The v4 Merkle tree is updated
6787        // - The v3 Merkle tree must be empty.
6788        const EPOCH_HEIGHT: u64 = 10;
6789
6790        let network_config = TestConfigBuilder::default()
6791            .epoch_height(EPOCH_HEIGHT)
6792            .build();
6793
6794        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6795
6796        tracing::info!("API PORT = {api_port}");
6797        const NUM_NODES: usize = 5;
6798
6799        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6800        let persistence: [_; NUM_NODES] = storage
6801            .iter()
6802            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6803            .collect::<Vec<_>>()
6804            .try_into()
6805            .unwrap();
6806
6807        let config = TestNetworkConfigBuilder::with_num_nodes()
6808            .api_config(SqlDataSource::options(
6809                &storage[0],
6810                Options::with_port(api_port).catchup(Default::default()),
6811            ))
6812            .network_config(network_config)
6813            .persistences(persistence.clone())
6814            .catchups(std::array::from_fn(|_| {
6815                StatePeers::<StaticVersion<0, 1>>::from_urls(
6816                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
6817                    Default::default(),
6818                    Duration::from_secs(2),
6819                    &NoMetrics,
6820                )
6821            }))
6822            .pos_hook(
6823                DelegationConfig::MultipleDelegators,
6824                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
6825                upgrade,
6826            )
6827            .await
6828            .unwrap()
6829            .build();
6830        let mut network = TestNetwork::new(config, upgrade).await;
6831
6832        let mut events = network.peers[2].event_stream().await;
6833        // wait for 4 epochs
6834        wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
6835
6836        let validated_state = network.server.decided_state().await;
6837        if upgrade.base == EPOCH_VERSION {
6838            let v1_tree = &validated_state.reward_merkle_tree_v1;
6839            assert!(v1_tree.num_leaves() > 0, "v1 reward tree tree is empty");
6840            let v2_tree = &validated_state.reward_merkle_tree_v2;
6841            assert!(
6842                v2_tree.num_leaves() == 0,
6843                "v2 reward tree tree is not empty"
6844            );
6845        } else {
6846            let v1_tree = &validated_state.reward_merkle_tree_v1;
6847            assert!(
6848                v1_tree.num_leaves() == 0,
6849                "v1 reward tree tree is not empty"
6850            );
6851            let v2_tree = &validated_state.reward_merkle_tree_v2;
6852            assert!(v2_tree.num_leaves() > 0, "v2 reward tree tree is empty");
6853        }
6854
6855        network.stop_consensus().await;
6856        Ok(())
6857    }
6858
6859    #[rstest]
6860    #[case(POS_V3)]
6861    #[case(POS_V4)]
6862    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6863    pub(crate) async fn test_state_cert_query(#[case] upgrade: Upgrade) {
6864        const TEST_EPOCH_HEIGHT: u64 = 10;
6865        const TEST_EPOCHS: u64 = 5;
6866
6867        let network_config = TestConfigBuilder::default()
6868            .epoch_height(TEST_EPOCH_HEIGHT)
6869            .build();
6870
6871        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6872
6873        tracing::info!("API PORT = {api_port}");
6874        const NUM_NODES: usize = 2;
6875
6876        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6877        let persistence: [_; NUM_NODES] = storage
6878            .iter()
6879            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6880            .collect::<Vec<_>>()
6881            .try_into()
6882            .unwrap();
6883
6884        let config = TestNetworkConfigBuilder::with_num_nodes()
6885            .api_config(SqlDataSource::options(
6886                &storage[0],
6887                Options::with_port(api_port).catchup(Default::default()),
6888            ))
6889            .network_config(network_config)
6890            .persistences(persistence.clone())
6891            .catchups(std::array::from_fn(|_| {
6892                StatePeers::<StaticVersion<0, 1>>::from_urls(
6893                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
6894                    Default::default(),
6895                    Duration::from_secs(2),
6896                    &NoMetrics,
6897                )
6898            }))
6899            .pos_hook(
6900                DelegationConfig::MultipleDelegators,
6901                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
6902                upgrade,
6903            )
6904            .await
6905            .unwrap()
6906            .build();
6907
6908        let network = TestNetwork::new(config, upgrade).await;
6909        let mut events = network.server.event_stream().await;
6910
6911        // Wait until 5 epochs have passed.
6912        loop {
6913            let event = events.next().await.unwrap();
6914            tracing::info!("Received event from handle: {event:?}");
6915
6916            if let hotshot::types::EventType::Decide { leaf_chain, .. } = event.event {
6917                println!(
6918                    "Decide event received: {:?}",
6919                    leaf_chain.first().unwrap().leaf.height()
6920                );
6921                if let Some(first_leaf) = leaf_chain.first() {
6922                    let height = first_leaf.leaf.height();
6923                    tracing::info!("Decide event received at height: {height}");
6924
6925                    if height >= TEST_EPOCHS * TEST_EPOCH_HEIGHT {
6926                        break;
6927                    }
6928                }
6929            }
6930        }
6931
6932        // Connect client.
6933        let client: Client<ServerError, StaticVersion<0, 1>> =
6934            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6935        client.connect(Some(Duration::from_secs(10))).await;
6936
6937        // Get the state cert for the epoch 3 to 5
6938        for i in 3..=TEST_EPOCHS {
6939            // v2
6940
6941            let state_query_data_v2 = client
6942                .get::<StateCertQueryDataV2<SeqTypes>>(&format!("availability/state-cert-v2/{i}"))
6943                .send()
6944                .await
6945                .unwrap();
6946            let state_cert_v2 = state_query_data_v2.0.clone();
6947            tracing::info!("state_cert_v2: {state_cert_v2:?}");
6948            assert_eq!(state_cert_v2.epoch.u64(), i);
6949            assert_eq!(
6950                state_cert_v2.light_client_state.block_height,
6951                i * TEST_EPOCH_HEIGHT - 5
6952            );
6953            let block_height = state_cert_v2.light_client_state.block_height;
6954
6955            let header: Header = client
6956                .get(&format!("availability/header/{block_height}"))
6957                .send()
6958                .await
6959                .unwrap();
6960
6961            // verify auth root if the consensus version is v4
6962            if header.version() == DRB_AND_HEADER_UPGRADE_VERSION {
6963                let auth_root = state_cert_v2.auth_root;
6964                let header_auth_root = header.auth_root().unwrap();
6965                if auth_root.is_zero() || header_auth_root.is_zero() {
6966                    panic!("auth root shouldn't be zero");
6967                }
6968
6969                assert_eq!(auth_root, header_auth_root, "auth root mismatch");
6970            }
6971
6972            // v1
6973            let state_query_data_v1 = client
6974                .get::<StateCertQueryDataV1<SeqTypes>>(&format!("availability/state-cert/{i}"))
6975                .send()
6976                .await
6977                .unwrap();
6978
6979            let state_cert_v1 = state_query_data_v1.0.clone();
6980            tracing::info!("state_cert_v1: {state_cert_v1:?}");
6981            assert_eq!(state_query_data_v1, state_query_data_v2.into());
6982        }
6983    }
6984
6985    /// Test state certificate catchup functionality by simulating a node that falls behind and needs
6986    /// to catch up. This test starts a 5-node network with epoch height 10, waits for 3 epochs to
6987    /// pass, then removes and restarts node 0 with a fresh storage. The
6988    /// restarted node catches up for the missing state certificates.
6989
6990    #[rstest]
6991    #[case(POS_V3)]
6992    #[case(POS_V4)]
6993    #[test_log::test(tokio::test(flavor = "multi_thread"))]
6994    pub(crate) async fn test_state_cert_catchup(#[case] upgrade: Upgrade) {
6995        const EPOCH_HEIGHT: u64 = 10;
6996
6997        let network_config = TestConfigBuilder::default()
6998            .epoch_height(EPOCH_HEIGHT)
6999            .build();
7000
7001        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7002
7003        tracing::info!("API PORT = {api_port}");
7004        const NUM_NODES: usize = 5;
7005
7006        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7007        let persistence: [_; NUM_NODES] = storage
7008            .iter()
7009            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7010            .collect::<Vec<_>>()
7011            .try_into()
7012            .unwrap();
7013
7014        let config = TestNetworkConfigBuilder::with_num_nodes()
7015            .api_config(SqlDataSource::options(
7016                &storage[0],
7017                Options::with_port(api_port),
7018            ))
7019            .network_config(network_config)
7020            .persistences(persistence.clone())
7021            .catchups(std::array::from_fn(|_| {
7022                StatePeers::<StaticVersion<0, 1>>::from_urls(
7023                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
7024                    Default::default(),
7025                    Duration::from_secs(2),
7026                    &NoMetrics,
7027                )
7028            }))
7029            .pos_hook(
7030                DelegationConfig::MultipleDelegators,
7031                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
7032                upgrade,
7033            )
7034            .await
7035            .unwrap()
7036            .build();
7037        let state = config.states()[0].clone();
7038        let mut network = TestNetwork::new(config, upgrade).await;
7039
7040        let mut events = network.peers[2].event_stream().await;
7041        // Wait until at least 5 epochs have passed
7042        wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
7043
7044        // Remove peer 0 and restart it with the query module enabled.
7045        // Adding an additional node to the test network is not straight forward,
7046        // as the keys have already been initialized in the config above.
7047        // So, we remove this node and re-add it using the same index.
7048        network.peers.remove(0);
7049
7050        let new_storage: hotshot_query_service::data_source::sql::testing::TmpDb =
7051            SqlDataSource::create_storage().await;
7052        let new_persistence: persistence::sql::Options =
7053            <SqlDataSource as TestableSequencerDataSource>::persistence_options(&new_storage);
7054
7055        let node_0_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7056        tracing::info!("node_0_port {node_0_port}");
7057        let opt = Options::with_port(node_0_port).query_sql(
7058            Query {
7059                peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
7060            },
7061            tmp_options(&new_storage),
7062        );
7063        let node_0 = opt
7064            .clone()
7065            .serve(|metrics, consumer, storage| {
7066                let cfg = network.cfg.clone();
7067                let new_persistence = new_persistence.clone();
7068                let state = state.clone();
7069                async move {
7070                    Ok(cfg
7071                        .init_node(
7072                            1,
7073                            state,
7074                            new_persistence.clone(),
7075                            Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
7076                                vec![format!("http://localhost:{api_port}").parse().unwrap()],
7077                                Default::default(),
7078                                Duration::from_secs(2),
7079                                &NoMetrics,
7080                            )),
7081                            storage,
7082                            &*metrics,
7083                            test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
7084                            consumer,
7085                            upgrade,
7086                            Default::default(),
7087                        )
7088                        .await)
7089                }
7090                .boxed()
7091            })
7092            .await
7093            .unwrap();
7094
7095        let mut events = node_0.event_stream().await;
7096        // Wait until at least 5 epochs have passed
7097        wait_for_epochs(&mut events, EPOCH_HEIGHT, 5).await;
7098
7099        let client: Client<ServerError, StaticVersion<0, 1>> =
7100            Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
7101        client.connect(Some(Duration::from_secs(60))).await;
7102
7103        for epoch in 3..=5 {
7104            let state_cert = client
7105                .get::<StateCertQueryDataV2<SeqTypes>>(&format!(
7106                    "availability/state-cert-v2/{epoch}"
7107                ))
7108                .send()
7109                .await
7110                .unwrap();
7111            assert_eq!(state_cert.0.epoch.u64(), epoch);
7112        }
7113    }
7114
7115    #[test_log::test(tokio::test(flavor = "multi_thread"))]
7116    async fn test_integration_commission_updates() -> anyhow::Result<()> {
7117        const NUM_NODES: usize = 3;
7118        const EPOCH_HEIGHT: u64 = 10;
7119
7120        // Use version that supports epochs (V3 or V4)
7121        let versions = POS_V4;
7122
7123        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7124
7125        // Initialize storage for nodes
7126        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7127        let persistence: [_; NUM_NODES] = storage
7128            .iter()
7129            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7130            .collect::<Vec<_>>()
7131            .try_into()
7132            .unwrap();
7133
7134        // Configure test network with epochs
7135        let network_config = TestConfigBuilder::default()
7136            .epoch_height(EPOCH_HEIGHT)
7137            .build();
7138
7139        // Build test network configuration starting with V1 stake table
7140        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
7141            .api_config(SqlDataSource::options(
7142                &storage[0],
7143                Options::with_port(api_port),
7144            ))
7145            .network_config(network_config.clone())
7146            .persistences(persistence.clone())
7147            .catchups(std::array::from_fn(|_| {
7148                StatePeers::<SequencerApiVersion>::from_urls(
7149                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
7150                    Default::default(),
7151                    Duration::from_secs(2),
7152                    &NoMetrics,
7153                )
7154            }))
7155            .pos_hook(
7156                // We want no new rewards after setting the commission to zero.
7157                DelegationConfig::NoSelfDelegation,
7158                StakeTableContractVersion::V1, // upgraded later
7159                POS_V4,
7160            )
7161            .await
7162            .unwrap()
7163            .build();
7164
7165        let network = TestNetwork::new(config, versions).await;
7166        let provider = network.cfg.anvil().unwrap();
7167        let deployer_addr = network.cfg.signer().address();
7168        let mut contracts = network.contracts.unwrap();
7169        let st_addr = contracts.address(Contract::StakeTableProxy).unwrap();
7170        upgrade_stake_table_v2(
7171            provider,
7172            L1Client::new(vec![network.cfg.l1_url()])?,
7173            &mut contracts,
7174            deployer_addr,
7175            deployer_addr,
7176        )
7177        .await?;
7178
7179        let mut commissions = vec![];
7180        for (i, (validator, provider)) in
7181            network_config.validator_providers().into_iter().enumerate()
7182        {
7183            let commission = fetch_commission(provider.clone(), st_addr, validator).await?;
7184            let new_commission = match i {
7185                0 => 0u16,
7186                1 => commission.to_evm() + 500u16,
7187                2 => commission.to_evm() - 100u16,
7188                _ => unreachable!(),
7189            }
7190            .try_into()?;
7191            commissions.push((validator, commission, new_commission));
7192            tracing::info!(%validator, %commission, %new_commission, "Update commission");
7193            update_commission(provider, st_addr, new_commission)
7194                .await?
7195                .get_receipt()
7196                .await?;
7197        }
7198
7199        // wait until new stake table takes effect
7200        let current_epoch = network.peers[0]
7201            .decided_leaf()
7202            .await
7203            .epoch(EPOCH_HEIGHT)
7204            .unwrap();
7205        let target_epoch = current_epoch.u64() + 3;
7206        println!("target epoch for new stake table: {target_epoch}");
7207        let mut events = network.peers[0].event_stream().await;
7208        wait_for_epochs(&mut events, EPOCH_HEIGHT, target_epoch).await;
7209
7210        // the last epoch with the old commissions
7211        let client: Client<ServerError, SequencerApiVersion> =
7212            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
7213        let validators = client
7214            .get::<AuthenticatedValidatorMap>(&format!("node/validators/{}", target_epoch - 1))
7215            .send()
7216            .await
7217            .expect("validators");
7218        assert!(!validators.is_empty());
7219        for (val, old_comm, _) in commissions.clone() {
7220            assert_eq!(validators.get(&val).unwrap().commission, old_comm.to_evm());
7221        }
7222
7223        // the first epoch with the new commissions
7224        let client: Client<ServerError, SequencerApiVersion> =
7225            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
7226        let validators = client
7227            .get::<AuthenticatedValidatorMap>(&format!("node/validators/{target_epoch}"))
7228            .send()
7229            .await
7230            .expect("validators");
7231        assert!(!validators.is_empty());
7232        for (val, _, new_comm) in commissions.clone() {
7233            assert_eq!(validators.get(&val).unwrap().commission, new_comm.to_evm());
7234        }
7235
7236        let last_block_with_old_commissions = EPOCH_HEIGHT * (target_epoch - 1);
7237        let block_with_new_commissions = EPOCH_HEIGHT * target_epoch;
7238        let mut new_amounts = vec![];
7239        for (val, ..) in commissions {
7240            let before = client
7241                .get::<Option<RewardAmount>>(&format!(
7242                    "reward-state-v2/reward-balance/{last_block_with_old_commissions}/{val}"
7243                ))
7244                .send()
7245                .await?
7246                .unwrap();
7247            let after = client
7248                .get::<Option<RewardAmount>>(&format!(
7249                    "reward-state-v2/reward-balance/{block_with_new_commissions}/{val}"
7250                ))
7251                .send()
7252                .await?
7253                .unwrap();
7254            new_amounts.push(after - before);
7255        }
7256
7257        let tolerance = U256::from(10 * EPOCH_HEIGHT).into();
7258        // validator zero got new new rewards except remainders
7259        assert!(new_amounts[0] < tolerance);
7260
7261        // other validators are still receiving rewards
7262        assert!(new_amounts[1] + new_amounts[2] > tolerance);
7263
7264        Ok(())
7265    }
7266
7267    #[rstest]
7268    #[case(POS_V3)]
7269    #[case(POS_V4)]
7270    #[test_log::test(tokio::test(flavor = "multi_thread"))]
7271    async fn test_reward_proof_endpoint(#[case] upgrade: Upgrade) -> anyhow::Result<()> {
7272        const EPOCH_HEIGHT: u64 = 10;
7273        const NUM_NODES: usize = 5;
7274
7275        let network_config = TestConfigBuilder::default()
7276            .epoch_height(EPOCH_HEIGHT)
7277            .build();
7278
7279        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7280        println!("API PORT = {api_port}");
7281
7282        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7283        let persistence: [_; NUM_NODES] = storage
7284            .iter()
7285            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7286            .collect::<Vec<_>>()
7287            .try_into()
7288            .unwrap();
7289
7290        let config = TestNetworkConfigBuilder::with_num_nodes()
7291            .api_config(SqlDataSource::options(
7292                &storage[0],
7293                Options::with_port(api_port).catchup(Default::default()),
7294            ))
7295            .network_config(network_config)
7296            .persistences(persistence.clone())
7297            .catchups(std::array::from_fn(|_| {
7298                StatePeers::<StaticVersion<0, 1>>::from_urls(
7299                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
7300                    Default::default(),
7301                    Duration::from_secs(2),
7302                    &NoMetrics,
7303                )
7304            }))
7305            .pos_hook(
7306                DelegationConfig::MultipleDelegators,
7307                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
7308                upgrade,
7309            )
7310            .await
7311            .unwrap()
7312            .build();
7313
7314        let mut network = TestNetwork::new(config, upgrade).await;
7315
7316        // wait for 4 epochs
7317        let mut events = network.server.event_stream().await;
7318        wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
7319
7320        let url = format!("http://localhost:{api_port}").parse().unwrap();
7321        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
7322
7323        let validated_state = network.server.decided_state().await;
7324        let decided_leaf = network.server.decided_leaf().await;
7325        let height = decided_leaf.height();
7326
7327        // validate proof returned from the api
7328        if upgrade.base == EPOCH_VERSION {
7329            // V1 case
7330            wait_until_block_height(&client, "reward-state/block-height", height).await;
7331
7332            network.stop_consensus().await;
7333
7334            for (address, _) in validated_state.reward_merkle_tree_v1.iter() {
7335                let (_, expected_proof) = validated_state
7336                    .reward_merkle_tree_v1
7337                    .lookup(*address)
7338                    .expect_ok()
7339                    .unwrap();
7340
7341                let res = client
7342                    .get::<RewardAccountQueryDataV1>(&format!(
7343                        "reward-state/proof/{height}/{address}"
7344                    ))
7345                    .send()
7346                    .await
7347                    .unwrap();
7348
7349                match res.proof.proof {
7350                    RewardMerkleProofV1::Presence(p) => {
7351                        assert_eq!(
7352                            p, expected_proof,
7353                            "Proof mismatch for V1 at {height}, addr={address}"
7354                        );
7355                    },
7356                    other => panic!(
7357                        "Expected Present proof for V1 at {height}, addr={address}, got {other:?}"
7358                    ),
7359                }
7360            }
7361        } else {
7362            // V2 case
7363            wait_until_block_height(&client, "reward-state-v2/block-height", height).await;
7364
7365            network.stop_consensus().await;
7366
7367            for (address, _) in validated_state.reward_merkle_tree_v2.iter() {
7368                let (_, expected_proof) = validated_state
7369                    .reward_merkle_tree_v2
7370                    .lookup(*address)
7371                    .expect_ok()
7372                    .unwrap();
7373
7374                let res = client
7375                    .get::<RewardAccountQueryDataV2>(&format!(
7376                        "reward-state-v2/proof/{height}/{address}"
7377                    ))
7378                    .send()
7379                    .await
7380                    .unwrap();
7381
7382                match res.proof.proof.clone() {
7383                    RewardMerkleProofV2::Presence(p) => {
7384                        assert_eq!(
7385                            p, expected_proof,
7386                            "Proof mismatch for V2 at {height}, addr={address}"
7387                        );
7388                    },
7389                    other => panic!(
7390                        "Expected Present proof for V2 at {height}, addr={address}, got {other:?}"
7391                    ),
7392                }
7393
7394                let reward_claim_input = client
7395                    .get::<RewardClaimInput>(&format!(
7396                        "reward-state-v2/reward-claim-input/{height}/{address}"
7397                    ))
7398                    .send()
7399                    .await
7400                    .unwrap();
7401
7402                assert_eq!(reward_claim_input, res.to_reward_claim_input()?);
7403            }
7404        }
7405
7406        Ok(())
7407    }
7408
7409    #[test_log::test(tokio::test(flavor = "multi_thread"))]
7410    async fn test_all_validators_endpoint() -> anyhow::Result<()> {
7411        const EPOCH_HEIGHT: u64 = 20;
7412
7413        let network_config = TestConfigBuilder::default()
7414            .epoch_height(EPOCH_HEIGHT)
7415            .build();
7416
7417        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7418
7419        const NUM_NODES: usize = 5;
7420
7421        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7422        let persistence: [_; NUM_NODES] = storage
7423            .iter()
7424            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7425            .collect::<Vec<_>>()
7426            .try_into()
7427            .unwrap();
7428
7429        let config = TestNetworkConfigBuilder::with_num_nodes()
7430            .api_config(SqlDataSource::options(
7431                &storage[0],
7432                Options::with_port(api_port),
7433            ))
7434            .network_config(network_config)
7435            .persistences(persistence.clone())
7436            .catchups(std::array::from_fn(|_| {
7437                StatePeers::<StaticVersion<0, 1>>::from_urls(
7438                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
7439                    Default::default(),
7440                    Duration::from_secs(2),
7441                    &NoMetrics,
7442                )
7443            }))
7444            .pos_hook(
7445                DelegationConfig::MultipleDelegators,
7446                Default::default(),
7447                POS_V4,
7448            )
7449            .await
7450            .unwrap()
7451            .build();
7452
7453        let network = TestNetwork::new(config, POS_V4).await;
7454        let client: Client<ServerError, SequencerApiVersion> =
7455            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
7456
7457        let err = client
7458            .get::<Vec<RegisteredValidator<PubKey>>>("node/all-validators/1/0/1001")
7459            .header("Accept", "application/json")
7460            .send()
7461            .await
7462            .unwrap_err();
7463
7464        assert_matches!(err, ServerError { status, message} if
7465                status == StatusCode::BAD_REQUEST
7466                && message.contains("Limit cannot be greater than 1000")
7467        );
7468
7469        // Wait for the chain to progress beyond epoch 3
7470        let mut events = network.peers[0].event_stream().await;
7471        wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
7472
7473        // Verify that there are no validators for epoch # 1 and epoch # 2
7474        {
7475            client
7476                .get::<Vec<RegisteredValidator<PubKey>>>("node/all-validators/1/0/100")
7477                .send()
7478                .await
7479                .unwrap()
7480                .is_empty();
7481
7482            client
7483                .get::<Vec<RegisteredValidator<PubKey>>>("node/all-validators/2/0/100")
7484                .send()
7485                .await
7486                .unwrap()
7487                .is_empty();
7488        }
7489
7490        // Get the epoch # 3 validators
7491        let validators = client
7492            .get::<Vec<RegisteredValidator<PubKey>>>("node/all-validators/3/0/100")
7493            .send()
7494            .await
7495            .expect("validators");
7496
7497        assert!(!validators.is_empty());
7498
7499        Ok(())
7500    }
7501
7502    #[test_log::test(tokio::test(flavor = "multi_thread"))]
7503    async fn test_reward_accounts_catchup_endpoint() -> anyhow::Result<()> {
7504        const EPOCH_HEIGHT: u64 = 10;
7505        const NUM_NODES: usize = 3;
7506
7507        let network_config = TestConfigBuilder::default()
7508            .epoch_height(EPOCH_HEIGHT)
7509            .build();
7510
7511        let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7512        println!("API PORT = {api_port}");
7513
7514        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7515        let persistence: [_; NUM_NODES] = storage
7516            .iter()
7517            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7518            .collect::<Vec<_>>()
7519            .try_into()
7520            .unwrap();
7521
7522        let config = TestNetworkConfigBuilder::with_num_nodes()
7523            .api_config(SqlDataSource::options(
7524                &storage[0],
7525                Options::with_port(api_port).catchup(Default::default()),
7526            ))
7527            .network_config(network_config)
7528            .persistences(persistence.clone())
7529            .catchups(std::array::from_fn(|_| {
7530                StatePeers::<StaticVersion<0, 1>>::from_urls(
7531                    vec![format!("http://localhost:{api_port}").parse().unwrap()],
7532                    Default::default(),
7533                    Duration::from_secs(2),
7534                    &NoMetrics,
7535                )
7536            }))
7537            .pos_hook(
7538                DelegationConfig::MultipleDelegators,
7539                hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
7540                POS_V4,
7541            )
7542            .await
7543            .unwrap()
7544            .build();
7545
7546        let mut network = TestNetwork::new(config, POS_V4).await;
7547
7548        let client: Client<ServerError, StaticVersion<0, 1>> =
7549            Client::new(format!("http://localhost:{api_port}").parse().unwrap());
7550
7551        client.connect(None).await;
7552
7553        let mut events = network.server.event_stream().await;
7554        wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
7555
7556        network.stop_consensus().await;
7557        let height = network.server.decided_leaf().await.height();
7558        wait_until_block_height(&client, "reward-state-v2/block-height", height).await;
7559
7560        let err = client
7561            .get::<Vec<(RewardAccountV2, RewardAmount)>>(&format!(
7562                "reward-state-v2/reward-amounts/{height}/0/10001"
7563            ))
7564            .send()
7565            .await
7566            .unwrap_err();
7567
7568        assert_matches!(err, ServerError { status, .. } if
7569            status == StatusCode::BAD_REQUEST
7570
7571        );
7572
7573        let mut expected: Vec<_> = network
7574            .server
7575            .decided_state()
7576            .await
7577            .reward_merkle_tree_v2
7578            .iter()
7579            .map(|(addr, amt)| (*addr, *amt))
7580            .collect();
7581        // Results are sorted by account address descending
7582        expected.sort_by_key(|(acct, _)| std::cmp::Reverse(*acct));
7583
7584        tracing::info!("expected accounts = {expected:?}");
7585        let limit = expected.len().min(10_000) as u64;
7586        let offset = 0u64;
7587        let expected: Vec<_> = expected.into_iter().take(limit as usize).collect();
7588
7589        let res = client
7590            .get::<Vec<(RewardAccountV2, RewardAmount)>>(&format!(
7591                "reward-state-v2/reward-amounts/{height}/{offset}/{limit}"
7592            ))
7593            .send()
7594            .await
7595            .unwrap();
7596
7597        assert_eq!(res, expected);
7598
7599        Ok(())
7600    }
7601
7602    #[test_log::test(tokio::test(flavor = "multi_thread"))]
7603    async fn test_namespace_query_compat_v0_2() {
7604        test_namespace_query_compat_helper(Upgrade::trivial(FEE_VERSION)).await;
7605    }
7606
7607    #[test_log::test(tokio::test(flavor = "multi_thread"))]
7608    async fn test_namespace_query_compat_v0_3() {
7609        test_namespace_query_compat_helper(Upgrade::trivial(EPOCH_VERSION)).await;
7610    }
7611
7612    async fn test_namespace_query_compat_helper(upgrade: Upgrade) {
7613        // Number of nodes running in the test network.
7614        const NUM_NODES: usize = 5;
7615
7616        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7617        let url: Url = format!("http://localhost:{port}").parse().unwrap();
7618
7619        let test_config = TestConfigBuilder::default().build();
7620        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
7621            .api_config(Options::from(options::Http {
7622                port,
7623                max_connections: None,
7624            }))
7625            .catchups(std::array::from_fn(|_| {
7626                StatePeers::<SequencerApiVersion>::from_urls(
7627                    vec![url.clone()],
7628                    Default::default(),
7629                    Duration::from_secs(2),
7630                    &NoMetrics,
7631                )
7632            }))
7633            .network_config(test_config)
7634            .build();
7635
7636        let mut network = TestNetwork::new(config, upgrade).await;
7637        let mut events = network.server.event_stream().await;
7638
7639        // Submit a transaction.
7640        let ns = NamespaceId::from(10_000u64);
7641        let tx = Transaction::new(ns, vec![1, 2, 3]);
7642        network.server.submit_transaction(tx.clone()).await.unwrap();
7643        let block = wait_for_decide_on_handle(&mut events, &tx).await.0;
7644
7645        // Check namespace proof queries.
7646        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
7647        client.connect(None).await;
7648
7649        let (header, common): (Header, VidCommonQueryData<SeqTypes>) = try_join!(
7650            client.get(&format!("availability/header/{block}")).send(),
7651            client
7652                .get(&format!("availability/vid/common/{block}"))
7653                .send()
7654        )
7655        .unwrap();
7656        let version = header.version();
7657
7658        // The latest version of the API (whether we specifically ask for v1 or let the redirect
7659        // occur) will give us a namespace proof no matter which VID version is in use.
7660        for api_ver in ["/v1", ""] {
7661            tracing::info!("test namespace API version: {api_ver}");
7662
7663            let ns_proof: NamespaceProofQueryData = client
7664                .get(&format!(
7665                    "{api_ver}/availability/block/{block}/namespace/{ns}"
7666                ))
7667                .send()
7668                .await
7669                .unwrap();
7670            let proof = ns_proof.proof.as_ref().unwrap();
7671            if version < EPOCH_VERSION {
7672                assert!(matches!(proof, NsProof::V0(..)));
7673            } else {
7674                assert!(matches!(proof, NsProof::V1(..)));
7675            }
7676            let (txs, ns_from_proof) = proof
7677                .verify(
7678                    header.ns_table(),
7679                    &header.payload_commitment(),
7680                    common.common(),
7681                )
7682                .unwrap();
7683            assert_eq!(ns_from_proof, ns);
7684            assert_eq!(txs, ns_proof.transactions);
7685            assert_eq!(txs, std::slice::from_ref(&tx));
7686
7687            // Test range endpoint.
7688            let ns_proofs: Vec<NamespaceProofQueryData> = client
7689                .get(&format!(
7690                    "{api_ver}/availability/block/{}/{}/namespace/{ns}",
7691                    block,
7692                    block + 1
7693                ))
7694                .send()
7695                .await
7696                .unwrap();
7697            assert_eq!(&ns_proofs, std::slice::from_ref(&ns_proof));
7698
7699            // Any API version can correctly tell us that the namespace does not exist.
7700            let ns_proof: NamespaceProofQueryData = client
7701                .get(&format!(
7702                    "{api_ver}/availability/block/{}/namespace/{ns}",
7703                    block - 1
7704                ))
7705                .send()
7706                .await
7707                .unwrap();
7708            assert_eq!(ns_proof.proof, None);
7709            assert_eq!(ns_proof.transactions, vec![]);
7710
7711            // Test streaming.
7712            let mut proofs = client
7713                .socket(&format!(
7714                    "{api_ver}/availability/stream/blocks/0/namespace/{ns}"
7715                ))
7716                .subscribe()
7717                .await
7718                .unwrap();
7719            for i in 0.. {
7720                tracing::info!(i, "stream proof");
7721                let proof: NamespaceProofQueryData = proofs.next().await.unwrap().unwrap();
7722                if proof.proof.is_none() {
7723                    tracing::info!("waiting for non-trivial proof from stream");
7724                    continue;
7725                }
7726                assert_eq!(&proof.transactions, std::slice::from_ref(&tx));
7727                break;
7728            }
7729        }
7730
7731        // The legacy version of the API only works for old VID.
7732        tracing::info!("test namespace API version: v0");
7733        if version < EPOCH_VERSION {
7734            let ns_proof: ADVZNamespaceProofQueryData = client
7735                .get(&format!("v0/availability/block/{block}/namespace/{ns}"))
7736                .send()
7737                .await
7738                .unwrap();
7739            let proof = ns_proof.proof.as_ref().unwrap();
7740            let VidCommon::V0(common) = common.common() else {
7741                panic!("wrong VID common version");
7742            };
7743            let (txs, ns_from_proof) = proof
7744                .verify(header.ns_table(), &header.payload_commitment(), common)
7745                .unwrap();
7746            assert_eq!(ns_from_proof, ns);
7747            assert_eq!(txs, ns_proof.transactions);
7748            assert_eq!(&txs, std::slice::from_ref(&tx));
7749
7750            // Test range endpoint.
7751            let ns_proofs: Vec<ADVZNamespaceProofQueryData> = client
7752                .get(&format!(
7753                    "v0/availability/block/{}/{}/namespace/{ns}",
7754                    block,
7755                    block + 1
7756                ))
7757                .send()
7758                .await
7759                .unwrap();
7760            assert_eq!(&ns_proofs, std::slice::from_ref(&ns_proof));
7761        } else {
7762            // It will fail if we ask for a proof for a block using new VID.
7763            client
7764                .get::<ADVZNamespaceProofQueryData>(&format!(
7765                    "v0/availability/block/{block}/namespace/{ns}"
7766                ))
7767                .send()
7768                .await
7769                .unwrap_err();
7770        }
7771
7772        // Any API version can correctly tell us that the namespace does not exist.
7773        let ns_proof: ADVZNamespaceProofQueryData = client
7774            .get(&format!(
7775                "v0/availability/block/{}/namespace/{ns}",
7776                block - 1
7777            ))
7778            .send()
7779            .await
7780            .unwrap();
7781        assert_eq!(ns_proof.proof, None);
7782        assert_eq!(ns_proof.transactions, vec![]);
7783
7784        // Use the legacy API to stream namespace proofs until we get to a non-trivial proof or a
7785        // VID version we can't deal with.
7786        let mut proofs = client
7787            .socket(&format!("v0/availability/stream/blocks/0/namespace/{ns}"))
7788            .subscribe()
7789            .await
7790            .unwrap();
7791        for i in 0.. {
7792            tracing::info!(i, "stream proof");
7793            let proof: ADVZNamespaceProofQueryData = match proofs.next().await {
7794                Some(proof) => proof.unwrap(),
7795                None => {
7796                    // Steam not expected to end on legacy consensus version.
7797                    assert!(
7798                        version >= EPOCH_VERSION,
7799                        "legacy steam ended while still on legacy consensus"
7800                    );
7801                    break;
7802                },
7803            };
7804            if proof.proof.is_none() {
7805                tracing::info!("waiting for non-trivial proof from stream");
7806                continue;
7807            }
7808            assert_eq!(&proof.transactions, std::slice::from_ref(&tx));
7809            break;
7810        }
7811
7812        network.server.shut_down().await;
7813    }
7814
7815    #[test_log::test(tokio::test(flavor = "multi_thread"))]
7816    async fn test_light_client_completeness() {
7817        // Run the through a protocol upgrade and epoch change, then check that we are able to get a
7818        // correct light client proof for every finalized leaf.
7819
7820        const NUM_NODES: usize = 1;
7821        const EPOCH_HEIGHT: u64 = 200;
7822
7823        let upgrade = Upgrade::new(LEGACY_VERSION, EPOCH_VERSION);
7824        let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7825        let url: Url = format!("http://localhost:{port}").parse().unwrap();
7826
7827        let test_config = TestConfigBuilder::default()
7828            .epoch_height(EPOCH_HEIGHT)
7829            .epoch_start_block(321)
7830            .set_upgrades(upgrade.target)
7831            .await
7832            .build();
7833
7834        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7835        let persistence: [_; NUM_NODES] = storage
7836            .iter()
7837            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7838            .collect::<Vec<_>>()
7839            .try_into()
7840            .unwrap();
7841
7842        let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
7843            .api_config(
7844                SqlDataSource::options(&storage[0], Options::with_port(port))
7845                    .light_client(Default::default()),
7846            )
7847            .persistences(persistence.clone())
7848            .catchups(std::array::from_fn(|_| {
7849                StatePeers::<SequencerApiVersion>::from_urls(
7850                    vec![url.clone()],
7851                    Default::default(),
7852                    Duration::from_secs(2),
7853                    &NoMetrics,
7854                )
7855            }))
7856            .network_config(test_config)
7857            .build();
7858
7859        let mut network = TestNetwork::new(config, upgrade).await;
7860        let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
7861        client.connect(None).await;
7862
7863        // Get a leaf stream so that we can wait for various events. Also keep track of each leaf
7864        // yielded, which we can use as ground truth later in the test.
7865        let mut actual_leaves = vec![];
7866        let mut actual_blocks = vec![];
7867        let mut leaves = client
7868            .socket("availability/stream/leaves/0")
7869            .subscribe::<LeafQueryData<SeqTypes>>()
7870            .await
7871            .unwrap()
7872            .zip(
7873                client
7874                    .socket("availability/stream/blocks/0")
7875                    .subscribe::<BlockQueryData<SeqTypes>>()
7876                    .await
7877                    .unwrap(),
7878            )
7879            .map(|(leaf, block)| {
7880                let leaf = leaf.unwrap();
7881                let block = block.unwrap();
7882                actual_leaves.push(leaf.clone());
7883                actual_blocks.push(block);
7884                leaf
7885            });
7886
7887        // Wait for the upgrade to take effect.
7888        let (upgrade_height, first_epoch) = loop {
7889            let leaf: LeafQueryData<SeqTypes> = leaves.next().await.unwrap();
7890            if leaf.header().version() < EPOCH_VERSION {
7891                tracing::info!(version = %leaf.header().version(), height = leaf.header().height(), view = ?leaf.leaf().view_number(), "waiting for epoch upgrade");
7892                continue;
7893            }
7894            break (leaf.height(), leaf.leaf().epoch(EPOCH_HEIGHT).unwrap());
7895        };
7896        tracing::info!(upgrade_height, ?first_epoch, "epochs enabled");
7897
7898        // Wait for two epoch changes (so we get to the first epoch that actually uses the stake
7899        // table).
7900        let mut epoch_heights = [0; 2];
7901        for (i, epoch_height) in epoch_heights.iter_mut().enumerate() {
7902            let desired_epoch = first_epoch + (i as u64) + 1;
7903            *epoch_height = loop {
7904                let leaf = leaves.next().await.unwrap();
7905                let epoch = leaf.leaf().epoch(EPOCH_HEIGHT).unwrap();
7906                if epoch > desired_epoch {
7907                    tracing::info!(
7908                        height = leaf.height(),
7909                        ?desired_epoch,
7910                        ?epoch,
7911                        "changed epoch"
7912                    );
7913                    break leaf.height();
7914                }
7915                tracing::info!(
7916                    ?desired_epoch,
7917                    height = leaf.header().height(),
7918                    view = ?leaf.leaf().view_number(),
7919                    "waiting for epoch change"
7920                );
7921            };
7922        }
7923
7924        // Wait a few more blocks.
7925        let max_block = epoch_heights[1] + 1;
7926        loop {
7927            let leaf = leaves.next().await.unwrap();
7928            if leaf.height() > max_block {
7929                break;
7930            }
7931            tracing::info!(max_block, height = leaf.height(), "waiting for block");
7932        }
7933
7934        // Stop consensus. All the blocks we are going to query have already been produced.
7935        // Continuing to run consensus would just waste resources while we check stuff.
7936        network.stop_consensus().await;
7937
7938        // Check light client. Querying every single block is too slow, so we'll check a few blocks
7939        // around various critical points:
7940        let heights =
7941        // * The first few blocks, including genesis
7942            (0..=1)
7943        // * A few blocks just before and after the upgrade
7944            .chain(upgrade_height-1..=upgrade_height+1)
7945        // * A few blocks just before and after the first epoch change
7946            .chain(epoch_heights[0]-1..=epoch_heights[0] + 1)
7947        // * A few blocks just before and after the stake table comes into effect
7948            .chain(epoch_heights[1]-1..=max_block);
7949
7950        let quorum = EpochChangeQuorum::new(EPOCH_HEIGHT);
7951        for i in heights {
7952            let leaf = &actual_leaves[i as usize];
7953            let block = &actual_blocks[i as usize];
7954            tracing::info!(i, ?leaf, ?block, "check leaf");
7955
7956            // Get the same leaf proof by various IDs.
7957            let client = &client;
7958            let proofs = try_join_all(
7959                [
7960                    format!("light-client/leaf/{i}"),
7961                    format!("light-client/leaf/hash/{}", leaf.hash()),
7962                    format!("light-client/leaf/block-hash/{}", leaf.block_hash()),
7963                ]
7964                .into_iter()
7965                .map(|path| async move {
7966                    tracing::info!(i, path, "fetch leaf proof");
7967                    let proof = client.get::<LeafProof>(&path).send().await?;
7968                    Ok::<_, anyhow::Error>((path, proof))
7969                }),
7970            )
7971            .await
7972            .unwrap();
7973
7974            // Check proofs against expected leaf.
7975            for (path, proof) in proofs {
7976                tracing::info!(i, path, ?proof, "check leaf proof");
7977                assert_eq!(
7978                    proof.verify(LeafProofHint::Quorum(&quorum)).await.unwrap(),
7979                    *leaf
7980                );
7981            }
7982
7983            // Get the corresponding header.
7984            let root_height = i + 1;
7985            let root = actual_leaves[root_height as usize].header();
7986            let proofs = try_join_all(
7987                [
7988                    format!("light-client/header/{root_height}/{i}"),
7989                    format!(
7990                        "light-client/header/{root_height}/hash/{}",
7991                        leaf.block_hash()
7992                    ),
7993                ]
7994                .into_iter()
7995                .map(|path| async move {
7996                    tracing::info!(i, path, "get header proof");
7997                    let proof = client.get::<HeaderProof>(&path).send().await?;
7998                    Ok::<_, anyhow::Error>((path, proof))
7999                }),
8000            )
8001            .await
8002            .unwrap();
8003            for (path, proof) in proofs {
8004                tracing::info!(i, path, ?proof, "check header proof");
8005                assert_eq!(
8006                    proof.verify_ref(root.block_merkle_tree_root()).unwrap(),
8007                    leaf.header()
8008                );
8009            }
8010
8011            // Get the corresponding payload.
8012            let proof = client
8013                .get::<PayloadProof>(&format!("light-client/payload/{i}"))
8014                .send()
8015                .await
8016                .unwrap();
8017            assert_eq!(proof.verify(leaf.header()).unwrap(), *block.payload());
8018        }
8019
8020        // Check light client stake table.
8021        let events: Vec<StakeTableEvent> = client
8022            .get(&format!("light-client/stake-table/{}", first_epoch + 2))
8023            .send()
8024            .await
8025            .unwrap();
8026        let mut state_from_events = StakeTableState::default();
8027        for event in events {
8028            state_from_events.apply_event(event).unwrap().unwrap();
8029        }
8030
8031        assert_eq!(
8032            state_from_events.into_validators(),
8033            network
8034                .server
8035                .consensus()
8036                .read()
8037                .await
8038                .storage()
8039                .load_all_validators(first_epoch + 2, 0, 1_000_000)
8040                .await
8041                .unwrap()
8042                .into_iter()
8043                .map(|v| (v.account, v))
8044                .collect::<RegisteredValidatorMap>()
8045        );
8046
8047        // Querying for a stake table before the first real epoch is an error.
8048        let err = client
8049            .get::<Vec<StakeTableEvent>>(&format!("light-client/stake-table/{}", first_epoch + 1))
8050            .send()
8051            .await
8052            .unwrap_err();
8053        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
8054    }
8055
8056    /// Test that `fetch_leaf` returns a leaf with exactly the requested block height.
8057    #[test_log::test(tokio::test(flavor = "multi_thread"))]
8058    async fn test_fetch_leaf_returns_exact_height() -> anyhow::Result<()> {
8059        const EPOCH_HEIGHT: u64 = 10;
8060        const NUM_NODES: usize = 5;
8061        const TARGET_HEIGHT: u64 = EPOCH_HEIGHT * 3 + 2;
8062
8063        let network_config = TestConfigBuilder::default()
8064            .epoch_height(EPOCH_HEIGHT)
8065            .build();
8066
8067        let port = reserve_tcp_port().expect("No ports free for query service");
8068
8069        let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
8070        let persistence: [_; NUM_NODES] = storage
8071            .iter()
8072            .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
8073            .collect::<Vec<_>>()
8074            .try_into()
8075            .unwrap();
8076
8077        let catchup_peers = std::array::from_fn(|_| {
8078            StatePeers::<StaticVersion<0, 1>>::from_urls(
8079                vec![format!("http://localhost:{port}").parse().unwrap()],
8080                Default::default(),
8081                Duration::from_secs(2),
8082                &NoMetrics,
8083            )
8084        });
8085
8086        let config = TestNetworkConfigBuilder::with_num_nodes()
8087            .api_config(SqlDataSource::options(
8088                &storage[0],
8089                Options::with_port(port),
8090            ))
8091            .network_config(network_config)
8092            .persistences(persistence)
8093            .catchups(catchup_peers)
8094            .pos_hook(
8095                DelegationConfig::MultipleDelegators,
8096                Default::default(),
8097                POS_V4,
8098            )
8099            .await?
8100            .build();
8101
8102        let network = TestNetwork::new(config, POS_V4).await;
8103
8104        // Wait for chain to advance past our target height
8105        let height_client: Client<ServerError, StaticVersion<0, 1>> =
8106            Client::new(format!("http://localhost:{port}").parse().unwrap());
8107        wait_until_block_height(&height_client, "node/block-height", TARGET_HEIGHT + 5).await;
8108
8109        // Get the stake table and threshold for the epoch containing TARGET_HEIGHT
8110        let coordinator = network.server.node_state().coordinator;
8111        let epoch = EpochNumber::new(epoch_from_block_number(TARGET_HEIGHT, EPOCH_HEIGHT));
8112        let membership = coordinator.membership().read().await;
8113        let stake_table = membership.stake_table(Some(epoch));
8114        let success_threshold = membership.success_threshold(Some(epoch));
8115        drop(membership);
8116
8117        // Use StatePeers to fetch the leaf at the exact target height
8118        let catchup = StatePeers::<StaticVersion<0, 1>>::from_urls(
8119            vec![format!("http://localhost:{port}").parse().unwrap()],
8120            Default::default(),
8121            Duration::from_secs(5),
8122            &NoMetrics,
8123        );
8124
8125        let leaf = catchup
8126            .fetch_leaf(TARGET_HEIGHT, stake_table, success_threshold)
8127            .await?;
8128
8129        assert_eq!(
8130            leaf.height(),
8131            TARGET_HEIGHT,
8132            "fetch_leaf must return the leaf at exactly the requested height"
8133        );
8134
8135        Ok(())
8136    }
8137}