1use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};
2
3use ::light_client::{
4 LightClient,
5 client::{FallbackClient, QueryServiceClient},
6 state::{Genesis, LightClientOptions},
7 storage::{LightClientSqliteOptions, SqliteStorage},
8};
9use alloy::primitives::U256;
10use anyhow::{Context, bail, ensure};
11use async_lock::RwLock;
12use async_once_cell::Lazy;
13use async_trait::async_trait;
14use committable::Commitment;
15use data_source::{
16 CatchupDataSource, RequestResponseDataSource, StakeTableDataSource, StakeTableWithEpochNumber,
17 StateCertDataSource, StateCertFetchingDataSource, SubmitDataSource,
18};
19use derivative::Derivative;
20use espresso_types::{
21 AccountQueryData, AuthenticatedValidatorMap, BlockMerkleTree, FeeAccount, FeeMerkleTree, Leaf2,
22 NodeState, PubKey, Transaction,
23 config::PublicNetworkConfig,
24 retain_accounts,
25 traits::EventsPersistenceRead,
26 v0::traits::{SequencerPersistence, StateCatchup},
27 v0_3::{
28 ChainConfig, RegisteredValidator, RewardAccountQueryDataV1, RewardAccountV1, RewardAmount,
29 RewardMerkleTreeV1, StakeTableEvent,
30 },
31 v0_4::{
32 PermittedRewardMerkleTreeV2, RewardAccountQueryDataV2, RewardAccountV2, RewardMerkleTreeV2,
33 },
34};
35use futures::{
36 future::{BoxFuture, Future, FutureExt},
37 stream::BoxStream,
38};
39use hotshot_contract_adapter::sol_types::EspToken;
40use hotshot_events_service::events_source::{
41 EventFilterSet, EventsSource, EventsStreamer, StartupInfo,
42};
43use hotshot_query_service::{
44 availability::VidCommonQueryData,
45 data_source::ExtensibleDataSource,
46 fetching::{self, Provider},
47};
48use hotshot_types::{
49 PeerConfig,
50 data::{EpochNumber, VidCommitment, VidCommon, VidShare, ViewNumber},
51 event::{Event, LegacyEvent},
52 light_client::LCV3StateSignatureRequestBody,
53 network::NetworkConfig,
54 simple_certificate::LightClientStateUpdateCertificateV2,
55 stake_table::HSStakeTable,
56 traits::{
57 election::{Membership, MembershipSnapshot, NonEpochMembershipSnapshot},
58 network::ConnectedNetwork,
59 },
60 utils::epoch_from_block_number,
61 vid::avidm::{AvidMScheme, init_avidm_param},
62 vote::HasViewNumber,
63};
64use itertools::Itertools;
65use jf_merkle_tree_compat::MerkleTreeScheme;
66use moka::future::Cache;
67use rand::Rng;
68use request_response::RequestType;
69use serde::{Deserialize, Serialize};
70use tokio::time::timeout;
71use url::Url;
72use vbs::version::Version;
73
74use self::data_source::{HotShotConfigDataSource, NodeStateDataSource, StateSignatureDataSource};
75use crate::{
76 SeqTypes, SequencerApiVersion, SequencerContext,
77 api::data_source::TokenDataSource,
78 catchup::{
79 CatchupStorage, add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
80 add_v2_reward_accounts_to_state,
81 },
82 consensus_handle::ConsensusHandle,
83 context::ConsensusNode,
84 request_response::{
85 data_source::{retain_v1_reward_accounts, retain_v2_reward_accounts},
86 request::{Request, Response},
87 },
88 state_cert::{StateCertFetchError, validate_state_cert},
89 state_signature::StateSigner,
90};
91
92pub mod data_source;
93pub mod endpoints;
94pub mod fs;
95pub mod light_client;
96pub mod options;
97pub mod sql;
98pub mod state;
99pub mod unlock_schedule;
100mod update;
101
102pub use options::Options;
103
104pub type BlocksFrontier = <BlockMerkleTree as MerkleTreeScheme>::MembershipProof;
105
106type BoxLazy<T> = Pin<Arc<Lazy<T, BoxFuture<'static, T>>>>;
107
108#[derive(Derivative)]
109#[derivative(Clone(bound = ""), Debug(bound = ""))]
110struct ApiState<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> {
111 #[derivative(Debug = "ignore")]
116 sequencer_context: BoxLazy<SequencerContext<N, P>>,
117
118 token_supply: Cache<(), U256>,
120}
121
122impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> ApiState<N, P> {
123 fn new(context_init: impl Future<Output = SequencerContext<N, P>> + Send + 'static) -> Self {
124 Self {
125 sequencer_context: Arc::pin(Lazy::from_future(context_init.boxed())),
126 token_supply: Cache::builder()
127 .max_capacity(1)
128 .time_to_live(Duration::from_secs(3600))
129 .build(),
130 }
131 }
132
133 async fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
134 self.sequencer_context
135 .as_ref()
136 .get()
137 .await
138 .get_ref()
139 .state_signer()
140 }
141
142 async fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
143 self.sequencer_context
144 .as_ref()
145 .get()
146 .await
147 .get_ref()
148 .event_streamer()
149 }
150
151 async fn consensus_handle(&self) -> Arc<ConsensusHandle<SeqTypes, ConsensusNode<N, P>>> {
152 self.sequencer_context
153 .as_ref()
154 .get()
155 .await
156 .get_ref()
157 .consensus_handle()
158 }
159
160 async fn network_config(&self) -> NetworkConfig<SeqTypes> {
161 self.sequencer_context
162 .as_ref()
163 .get()
164 .await
165 .get_ref()
166 .network_config()
167 }
168}
169
170type StorageState<N, P, D> = ExtensibleDataSource<D, ApiState<N, P>>;
171
172#[async_trait]
173impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> EventsSource<SeqTypes>
174 for ApiState<N, P>
175{
176 type EventStream = BoxStream<'static, Arc<Event<SeqTypes>>>;
177 type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<SeqTypes>>>;
178
179 async fn get_event_stream(
180 &self,
181 _filter: Option<EventFilterSet<SeqTypes>>,
182 ) -> Self::EventStream {
183 self.event_streamer()
184 .await
185 .read()
186 .await
187 .get_event_stream(None)
188 .await
189 }
190
191 async fn get_legacy_event_stream(
192 &self,
193 _filter: Option<EventFilterSet<SeqTypes>>,
194 ) -> Self::LegacyEventStream {
195 self.event_streamer()
196 .await
197 .read()
198 .await
199 .get_legacy_event_stream(None)
200 .await
201 }
202
203 async fn get_startup_info(&self) -> StartupInfo<SeqTypes> {
204 self.event_streamer()
205 .await
206 .read()
207 .await
208 .get_startup_info()
209 .await
210 }
211}
212
213impl<N: ConnectedNetwork<PubKey>, D: Send + Sync, P: SequencerPersistence> TokenDataSource<SeqTypes>
214 for StorageState<N, P, D>
215{
216 async fn get_initial_supply_l1(&self) -> anyhow::Result<U256> {
217 self.as_ref().get_initial_supply_l1().await
218 }
219
220 async fn get_total_supply_l1(&self) -> anyhow::Result<U256> {
221 self.as_ref().get_total_supply_l1().await
222 }
223
224 async fn get_decided_header(&self) -> espresso_types::Header {
225 self.as_ref().get_decided_header().await
226 }
227}
228
229impl<N: ConnectedNetwork<PubKey>, D: Send + Sync, P: SequencerPersistence> SubmitDataSource<N, P>
230 for StorageState<N, P, D>
231{
232 async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
233 self.as_ref().submit(tx).await
234 }
235}
236
237impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence> StakeTableDataSource<SeqTypes>
238 for StorageState<N, P, D>
239{
240 async fn get_stake_table(
242 &self,
243 epoch: Option<EpochNumber>,
244 ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
245 self.as_ref().get_stake_table(epoch).await
246 }
247
248 async fn get_stake_table_current(&self) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
250 self.as_ref().get_stake_table_current().await
251 }
252
253 async fn get_da_stake_table(
255 &self,
256 epoch: Option<EpochNumber>,
257 ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
258 self.as_ref().get_da_stake_table(epoch).await
259 }
260
261 async fn get_da_stake_table_current(
263 &self,
264 ) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
265 self.as_ref().get_da_stake_table_current().await
266 }
267
268 async fn get_validators(
270 &self,
271 epoch: EpochNumber,
272 ) -> anyhow::Result<AuthenticatedValidatorMap> {
273 self.as_ref().get_validators(epoch).await
274 }
275
276 async fn get_block_reward(
277 &self,
278 epoch: Option<EpochNumber>,
279 ) -> anyhow::Result<Option<RewardAmount>> {
280 self.as_ref().get_block_reward(epoch).await
281 }
282 async fn current_proposal_participation(&self) -> HashMap<PubKey, f64> {
284 self.as_ref().current_proposal_participation().await
285 }
286 async fn proposal_participation(&self, epoch: EpochNumber) -> HashMap<PubKey, f64> {
288 self.as_ref().proposal_participation(epoch).await
289 }
290 async fn current_vote_participation(&self) -> HashMap<PubKey, f64> {
292 self.as_ref().current_vote_participation().await
293 }
294 async fn vote_participation(&self, epoch: EpochNumber) -> HashMap<PubKey, f64> {
296 self.as_ref().vote_participation(epoch).await
297 }
298
299 async fn get_all_validators(
300 &self,
301 epoch: EpochNumber,
302 offset: u64,
303 limit: u64,
304 ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>> {
305 self.as_ref().get_all_validators(epoch, offset, limit).await
306 }
307
308 async fn stake_table_events(
309 &self,
310 from_l1_block: u64,
311 to_l1_block: u64,
312 ) -> anyhow::Result<Vec<StakeTableEvent>> {
313 self.as_ref()
314 .stake_table_events(from_l1_block, to_l1_block)
315 .await
316 }
317}
318
319impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> TokenDataSource<SeqTypes>
320 for ApiState<N, P>
321{
322 async fn get_initial_supply_l1(&self) -> anyhow::Result<U256> {
323 let node_state = self.sequencer_context.as_ref().get().await.node_state();
324 let fetcher = node_state.coordinator.membership().fetcher().clone();
325 let cached = *fetcher.initial_supply.read().await;
326 match cached {
327 Some(supply) => Ok(supply),
328 None => Ok(fetcher.fetch_and_update_initial_supply().await?),
329 }
330 }
331
332 async fn get_total_supply_l1(&self) -> anyhow::Result<U256> {
333 match self.token_supply.get(&()).await {
334 Some(supply) => Ok(supply),
335 None => {
336 let node_state = self.sequencer_context.as_ref().get().await.node_state();
337 let token_contract_address = node_state.token_contract_address().await?;
338
339 let provider = node_state.l1_client.provider;
340
341 let token = EspToken::new(token_contract_address, provider.clone());
342
343 let supply = token
344 .totalSupply()
345 .call()
346 .await
347 .context("Failed to retrieve totalSupply from the contract")?;
348
349 self.token_supply.insert((), supply).await;
350
351 Ok(supply)
352 },
353 }
354 }
355
356 async fn get_decided_header(&self) -> espresso_types::Header {
357 self.consensus_handle()
358 .await
359 .decided_leaf()
360 .await
361 .block_header()
362 .clone()
363 }
364}
365
366impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> StakeTableDataSource<SeqTypes>
367 for ApiState<N, P>
368{
369 async fn get_stake_table(
371 &self,
372 epoch: Option<EpochNumber>,
373 ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
374 let handle = self.consensus_handle().await;
375 if let Some(requested) = epoch {
376 let first_epoch = handle
377 .membership_coordinator()
378 .await
379 .membership()
380 .first_epoch();
381 if let Some(first_epoch) = first_epoch
382 && requested < first_epoch
383 {
384 return Err(anyhow::anyhow!(
385 "requested stake table for epoch {requested:?} is below the first epoch \
386 {first_epoch:?}"
387 ));
388 }
389 }
390 let highest_epoch = handle.current_epoch().await.map(|e| e + 1);
391 if epoch > highest_epoch {
392 return Err(anyhow::anyhow!(
393 "requested stake table for epoch {epoch:?} is beyond the current epoch + 1 \
394 {highest_epoch:?}"
395 ));
396 }
397 let mem = handle
398 .membership_coordinator()
399 .await
400 .stake_table_for_epoch(epoch)?;
401
402 Ok(mem.stake_table().cloned().collect())
403 }
404
405 async fn get_stake_table_current(&self) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
407 let epoch = self.consensus_handle().await.current_epoch().await;
408
409 Ok(StakeTableWithEpochNumber {
410 epoch,
411 stake_table: self.get_stake_table(epoch).await?,
412 })
413 }
414
415 async fn get_da_stake_table(
417 &self,
418 epoch: Option<EpochNumber>,
419 ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
420 let coordinator = self.consensus_handle().await.membership_coordinator().await;
421 Ok(match epoch {
422 Some(e) => coordinator
423 .membership()
424 .snapshot(e)
425 .map(|s| s.da_stake_table().cloned().collect())
426 .unwrap_or_default(),
427 None => coordinator
428 .membership()
429 .non_epoch_snapshot()
430 .da_stake_table()
431 .cloned()
432 .collect(),
433 })
434 }
435
436 async fn get_da_stake_table_current(
438 &self,
439 ) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
440 let epoch = self.consensus_handle().await.current_epoch().await;
441
442 Ok(StakeTableWithEpochNumber {
443 epoch,
444 stake_table: self.get_da_stake_table(epoch).await?,
445 })
446 }
447
448 async fn get_block_reward(
449 &self,
450 epoch: Option<EpochNumber>,
451 ) -> anyhow::Result<Option<RewardAmount>> {
452 let coordinator = self.consensus_handle().await.membership_coordinator().await;
453
454 let membership = coordinator.membership();
455 let block_reward = match epoch {
456 None => membership.fixed_block_reward(),
457 Some(e) => membership.epoch_block_reward(e),
458 };
459
460 Ok(block_reward)
461 }
462
463 async fn get_validators(&self, e: EpochNumber) -> anyhow::Result<AuthenticatedValidatorMap> {
465 Ok(self
466 .consensus_handle()
467 .await
468 .membership_coordinator()
469 .await
470 .membership_for_epoch(Some(e))
471 .context("membership not found")?
472 .snapshot()
473 .with_context(|| format!("no committee for epoch={e}"))?
474 .validators()
475 .clone())
476 }
477
478 async fn current_proposal_participation(&self) -> HashMap<PubKey, f64> {
480 self.consensus_handle()
481 .await
482 .current_proposal_participation()
483 .await
484 }
485
486 async fn proposal_participation(&self, epoch: EpochNumber) -> HashMap<PubKey, f64> {
488 self.consensus_handle()
489 .await
490 .proposal_participation(epoch)
491 .await
492 }
493
494 async fn current_vote_participation(&self) -> HashMap<PubKey, f64> {
496 self.consensus_handle()
497 .await
498 .current_vote_participation()
499 .await
500 }
501
502 async fn vote_participation(&self, epoch: EpochNumber) -> HashMap<PubKey, f64> {
504 self.consensus_handle()
505 .await
506 .vote_participation(Some(epoch))
507 .await
508 }
509
510 async fn get_all_validators(
511 &self,
512 epoch: EpochNumber,
513 offset: u64,
514 limit: u64,
515 ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>> {
516 let storage = self.consensus_handle().await.storage().await;
517 storage.load_all_validators(epoch, offset, limit).await
518 }
519
520 async fn stake_table_events(
521 &self,
522 from_l1_block: u64,
523 to_l1_block: u64,
524 ) -> anyhow::Result<Vec<StakeTableEvent>> {
525 let storage = self.consensus_handle().await.storage().await;
526 let (status, events) = storage.load_events(from_l1_block, to_l1_block).await?;
527 ensure!(
528 status == Some(EventsPersistenceRead::Complete),
529 "some events in range [{from_l1_block}, {to_l1_block}] are not available ({status:?})"
530 );
531 Ok(events.into_iter().map(|(_, event)| event).collect())
532 }
533}
534
535impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence>
536 RequestResponseDataSource<SeqTypes> for StorageState<N, P, D>
537{
538 async fn request_vid_shares(
539 &self,
540 block_number: u64,
541 vid_common_data: VidCommonQueryData<SeqTypes>,
542 timeout_duration: Duration,
543 ) -> BoxFuture<'static, anyhow::Result<Vec<VidShare>>> {
544 self.as_ref()
545 .request_vid_shares(block_number, vid_common_data, timeout_duration)
546 .await
547 }
548}
549
550#[async_trait]
551impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence>
552 StateCertFetchingDataSource<SeqTypes> for StorageState<N, P, D>
553{
554 async fn request_state_cert(
555 &self,
556 epoch: u64,
557 timeout: Duration,
558 ) -> Result<LightClientStateUpdateCertificateV2<SeqTypes>, StateCertFetchError> {
559 self.as_ref().request_state_cert(epoch, timeout).await
560 }
561}
562
563impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> RequestResponseDataSource<SeqTypes>
564 for ApiState<N, P>
565{
566 async fn request_vid_shares(
567 &self,
568 block_number: u64,
569 vid_common_data: VidCommonQueryData<SeqTypes>,
570 duration: Duration,
571 ) -> BoxFuture<'static, anyhow::Result<Vec<VidShare>>> {
572 let request_response_protocol = self
574 .sequencer_context
575 .as_ref()
576 .get()
577 .await
578 .request_response_protocol
579 .clone();
580
581 async move {
582 let total_weight = match vid_common_data.common() {
584 VidCommon::V0(_) => {
585 return Err(anyhow::anyhow!(
587 "V0 total weight calculation not supported yet"
588 ));
589 },
590 VidCommon::V1(v1) => v1.total_weights,
591 VidCommon::V2(v2) => v2.param.total_weights,
592 };
593
594 let avidm_param = init_avidm_param(total_weight)
596 .with_context(|| "failed to initialize avidm param")?;
597
598 let VidCommitment::V1(local_payload_hash) = vid_common_data.payload_hash() else {
600 bail!("V0 share verification not supported yet");
601 };
602
603 let request_id = rand::thread_rng().r#gen();
605
606 let received_shares = Arc::new(parking_lot::Mutex::new(Vec::new()));
608 let received_shares_clone = received_shares.clone();
609 let request_result: anyhow::Result<_, _> = timeout(
610 duration,
611 request_response_protocol.request_indefinitely::<_, _, _>(
612 Request::VidShare(block_number, request_id),
613 RequestType::Broadcast,
614 move |_request, response| {
615 let avidm_param = avidm_param.clone();
616 let received_shares = received_shares_clone.clone();
617 async move {
618 let Response::VidShare(VidShare::V1(received_share)) = response else {
620 bail!("V0 share verification not supported yet");
621 };
622
623 let Ok(Ok(_)) = AvidMScheme::verify_share(
625 &avidm_param,
626 &local_payload_hash,
627 &received_share,
628 ) else {
629 bail!("share verification failed");
630 };
631
632 received_shares.lock().push(received_share);
634
635 bail!("waiting for more shares");
636
637 #[allow(unreachable_code)]
638 Ok(())
639 }
640 },
641 ),
642 )
643 .await;
644
645 match request_result {
647 Err(_) => {
648 Ok(received_shares
650 .lock()
651 .clone()
652 .into_iter()
653 .map(VidShare::V1)
654 .collect())
655 },
656
657 Ok(Err(e)) => Err(e).with_context(|| "failed to request vid shares"),
659
660 Ok(Ok(_)) => bail!("this should not be possible"),
662 }
663 }
664 .boxed()
665 }
666}
667
668#[async_trait]
669impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> StateCertFetchingDataSource<SeqTypes>
670 for ApiState<N, P>
671{
672 async fn request_state_cert(
673 &self,
674 epoch: u64,
675 timeout: Duration,
676 ) -> Result<LightClientStateUpdateCertificateV2<SeqTypes>, StateCertFetchError> {
677 tracing::info!("fetching state certificate for epoch={epoch}");
678 let handle = self.consensus_handle().await;
679
680 let current_epoch = handle.current_epoch().await;
681
682 let highest_epoch = current_epoch.map(|e| e.u64() + 1);
685
686 if Some(epoch) > highest_epoch {
687 return Err(StateCertFetchError::Other(anyhow::anyhow!(
688 "requested state certificate for epoch {epoch} is beyond the highest possible \
689 epoch {highest_epoch:?}"
690 )));
691 }
692
693 let coordinator = handle.membership_coordinator().await;
695 if let Err(err) = coordinator.stake_table_for_epoch(Some(EpochNumber::new(epoch))) {
696 tracing::warn!(
697 "Failed to get membership for epoch {epoch}: {err:#}. Waiting for catchup"
698 );
699
700 coordinator
701 .wait_for_catchup(EpochNumber::new(epoch))
702 .await
703 .map_err(|e| {
704 StateCertFetchError::Other(
705 anyhow::Error::new(e)
706 .context(format!("failed to catch up for stake table epoch={epoch}")),
707 )
708 })?;
709 }
710
711 let membership = coordinator
712 .stake_table_for_epoch(Some(EpochNumber::new(epoch)))
713 .map_err(|e| {
714 StateCertFetchError::Other(
715 anyhow::Error::new(e)
716 .context(format!("failed to get stake table for epoch={epoch}")),
717 )
718 })?;
719
720 let stake_table = HSStakeTable::from_iter(membership.stake_table());
721
722 let state_catchup = self
723 .sequencer_context
724 .as_ref()
725 .get()
726 .await
727 .node_state()
728 .state_catchup
729 .clone();
730
731 let result = tokio::time::timeout(timeout, state_catchup.fetch_state_cert(epoch)).await;
732
733 match result {
734 Err(_) => Err(StateCertFetchError::FetchError(anyhow::anyhow!(
735 "timeout while fetching state cert for epoch {epoch}"
736 ))),
737 Ok(Ok(cert)) => {
738 validate_state_cert(&cert, &stake_table).map_err(|e| {
740 StateCertFetchError::ValidationError(e.context(format!(
741 "state certificate validation failed for epoch={epoch}"
742 )))
743 })?;
744
745 tracing::info!("fetched and validated state certificate for epoch {epoch}");
746 Ok(cert)
747 },
748 Ok(Err(e)) => Err(StateCertFetchError::FetchError(
749 e.context(format!("failed to fetch state cert for epoch {epoch}")),
750 )),
751 }
752 }
753}
754
755#[async_trait]
757impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence> StateCertDataSource
758 for StorageState<N, P, D>
759{
760 async fn get_state_cert_by_epoch(
761 &self,
762 epoch: u64,
763 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
764 self.as_ref().get_state_cert_by_epoch(epoch).await
765 }
766
767 async fn insert_state_cert(
768 &self,
769 epoch: u64,
770 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
771 ) -> anyhow::Result<()> {
772 self.as_ref().insert_state_cert(epoch, cert).await
773 }
774}
775
776#[async_trait]
777impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> StateCertDataSource for ApiState<N, P> {
778 async fn get_state_cert_by_epoch(
779 &self,
780 epoch: u64,
781 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
782 let storage = self.consensus_handle().await.storage().await;
783 storage.get_state_cert_by_epoch(epoch).await
784 }
785
786 async fn insert_state_cert(
787 &self,
788 epoch: u64,
789 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
790 ) -> anyhow::Result<()> {
791 let storage = self.consensus_handle().await.storage().await;
792 storage.insert_state_cert(epoch, cert).await
793 }
794}
795
796impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> SubmitDataSource<N, P>
797 for ApiState<N, P>
798{
799 async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
800 let handle = self.consensus_handle().await;
801
802 let cf = handle
806 .decided_state()
807 .await
808 .and_then(|state| state.chain_config.resolve());
809
810 let cf = match cf {
814 Some(cf) => cf,
815 None => self.node_state().await.chain_config,
816 };
817
818 let max_block_size: u64 = cf.max_block_size.into();
819 let txn_size = tx.payload().len() as u64;
820
821 if txn_size > max_block_size {
823 bail!("transaction size ({txn_size}) is greater than max_block_size ({max_block_size})")
824 }
825
826 handle.submit_transaction(tx).await?;
827 Ok(())
828 }
829}
830
831impl<N, P, D> NodeStateDataSource for StorageState<N, P, D>
832where
833 N: ConnectedNetwork<PubKey>,
834 P: SequencerPersistence,
835 D: Sync,
836{
837 async fn node_state(&self) -> NodeState {
838 self.as_ref().node_state().await
839 }
840}
841
842impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, D: CatchupStorage + Send + Sync>
843 data_source::DatabaseMetadataSource for StorageState<N, P, D>
844where
845 N: ConnectedNetwork<PubKey>,
846 P: SequencerPersistence,
847 D: data_source::DatabaseMetadataSource + Send + Sync,
848{
849 async fn get_table_sizes(&self) -> anyhow::Result<Vec<data_source::TableSize>> {
850 self.inner().get_table_sizes().await
851 }
852}
853
854impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, D: CatchupStorage + Send + Sync>
855 data_source::PruningDataSource for StorageState<N, P, D>
856where
857 N: ConnectedNetwork<PubKey>,
858 P: SequencerPersistence,
859 D: data_source::PruningDataSource + Send + Sync,
860{
861 async fn get_oldest_block(
862 &self,
863 ) -> anyhow::Result<Option<hotshot_query_service::availability::BlockQueryData<crate::SeqTypes>>>
864 {
865 self.inner().get_oldest_block().await
866 }
867
868 async fn get_oldest_leaf(
869 &self,
870 ) -> anyhow::Result<Option<hotshot_query_service::availability::LeafQueryData<crate::SeqTypes>>>
871 {
872 self.inner().get_oldest_leaf().await
873 }
874}
875
876impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, D: CatchupStorage + Send + Sync>
877 CatchupDataSource for StorageState<N, P, D>
878{
879 #[tracing::instrument(skip(self, instance))]
880 async fn get_accounts(
881 &self,
882 instance: &NodeState,
883 height: u64,
884 view: ViewNumber,
885 accounts: &[FeeAccount],
886 ) -> anyhow::Result<FeeMerkleTree> {
887 match self
889 .as_ref()
890 .get_accounts(instance, height, view, accounts)
891 .await
892 {
893 Ok(accounts) => return Ok(accounts),
894 Err(err) => {
895 tracing::info!("accounts not in memory, trying storage: {err:#}");
896 },
897 }
898
899 let (tree, leaf) = self
901 .inner()
902 .get_accounts(instance, height, view, accounts)
903 .await
904 .context("accounts not in memory, and could not fetch from storage")?;
905 let handle = self.as_ref().consensus_handle().await;
909 if let Err(err) = add_fee_accounts_to_state(&*handle, &view, accounts, &tree, leaf).await {
910 tracing::warn!(?view, "cannot update fetched account state: {err:#}");
911 }
912 tracing::info!(?view, "updated with fetched account state");
913
914 Ok(tree)
915 }
916
917 #[tracing::instrument(skip(self, instance))]
918 async fn get_frontier(
919 &self,
920 instance: &NodeState,
921 height: u64,
922 view: ViewNumber,
923 ) -> anyhow::Result<BlocksFrontier> {
924 match self.as_ref().get_frontier(instance, height, view).await {
926 Ok(frontier) => return Ok(frontier),
927 Err(err) => {
928 tracing::info!("frontier is not in memory, trying storage: {err:#}");
929 },
930 }
931
932 self.inner().get_frontier(instance, height, view).await
934 }
935
936 async fn get_chain_config(
937 &self,
938 commitment: Commitment<ChainConfig>,
939 ) -> anyhow::Result<ChainConfig> {
940 match self.as_ref().get_chain_config(commitment).await {
942 Ok(cf) => return Ok(cf),
943 Err(err) => {
944 tracing::info!("chain config is not in memory, trying storage: {err:#}");
945 },
946 }
947
948 self.inner().get_chain_config(commitment).await
950 }
951 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
952 match self.as_ref().get_leaf_chain(height).await {
954 Ok(cf) => return Ok(cf),
955 Err(err) => {
956 tracing::info!("leaf chain is not in memory, trying storage: {err:#}");
957 },
958 }
959
960 self.inner().get_leaf_chain(height).await
962 }
963
964 async fn get_cert2(
965 &self,
966 height: u64,
967 ) -> anyhow::Result<Option<espresso_types::Certificate2<SeqTypes>>> {
968 self.inner().load_earliest_cert2(height).await
969 }
970
971 #[tracing::instrument(skip(self, instance))]
972 async fn get_reward_accounts_v2(
973 &self,
974 instance: &NodeState,
975 height: u64,
976 view: ViewNumber,
977 accounts: &[RewardAccountV2],
978 ) -> anyhow::Result<RewardMerkleTreeV2> {
979 match self
981 .as_ref()
982 .get_reward_accounts_v2(instance, height, view, accounts)
983 .await
984 {
985 Ok(accounts) => return Ok(accounts),
986 Err(err) => {
987 tracing::info!("reward accounts not in memory, trying storage: {err:#}");
988 },
989 }
990
991 let (tree, leaf) = self
993 .inner()
994 .get_reward_accounts_v2(instance, height, view, accounts)
995 .await
996 .context("accounts not in memory, and could not fetch from storage")?;
997
998 let handle = self.as_ref().consensus_handle().await;
1001 if let Err(err) =
1002 add_v2_reward_accounts_to_state(&*handle, &view, accounts, &tree, leaf).await
1003 {
1004 tracing::warn!(?view, "cannot update fetched account state: {err:#}");
1005 }
1006 tracing::info!(?view, "updated with fetched account state");
1007
1008 Ok(tree)
1009 }
1010
1011 #[tracing::instrument(skip(self, instance))]
1012 async fn get_reward_accounts_v1(
1013 &self,
1014 instance: &NodeState,
1015 height: u64,
1016 view: ViewNumber,
1017 accounts: &[RewardAccountV1],
1018 ) -> anyhow::Result<RewardMerkleTreeV1> {
1019 match self
1021 .as_ref()
1022 .get_reward_accounts_v1(instance, height, view, accounts)
1023 .await
1024 {
1025 Ok(accounts) => return Ok(accounts),
1026 Err(err) => {
1027 tracing::info!("reward accounts not in memory, trying storage: {err:#}");
1028 },
1029 }
1030
1031 let (tree, leaf) = self
1033 .inner()
1034 .get_reward_accounts_v1(instance, height, view, accounts)
1035 .await
1036 .context("accounts not in memory, and could not fetch from storage")?;
1037
1038 let handle = self.as_ref().consensus_handle().await;
1041 if let Err(err) =
1042 add_v1_reward_accounts_to_state(&*handle, &view, accounts, &tree, leaf).await
1043 {
1044 tracing::warn!(?view, "cannot update fetched account state: {err:#}");
1045 }
1046 tracing::info!(?view, "updated with fetched account state");
1047
1048 Ok(tree)
1049 }
1050
1051 async fn get_reward_merkle_tree_v2(
1052 &self,
1053 height: u64,
1054 view: ViewNumber,
1055 ) -> anyhow::Result<Vec<u8>> {
1056 self.as_ref().get_reward_merkle_tree_v2(height, view).await
1057 }
1058
1059 #[tracing::instrument(skip(self))]
1060 async fn get_state_cert(
1061 &self,
1062 epoch: u64,
1063 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1064 let storage = self.as_ref().consensus_handle().await.storage().await;
1065 storage
1066 .get_state_cert_by_epoch(epoch)
1067 .await?
1068 .context(format!("state cert for epoch {epoch} not found"))
1069 }
1070}
1071
1072impl<N, P> NodeStateDataSource for ApiState<N, P>
1073where
1074 N: ConnectedNetwork<PubKey>,
1075 P: SequencerPersistence,
1076{
1077 async fn node_state(&self) -> NodeState {
1078 self.sequencer_context.as_ref().get().await.node_state()
1079 }
1080}
1081
1082impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> CatchupDataSource for ApiState<N, P> {
1083 #[tracing::instrument(skip(self, _instance))]
1084 async fn get_accounts(
1085 &self,
1086 _instance: &NodeState,
1087 height: u64,
1088 view: ViewNumber,
1089 accounts: &[FeeAccount],
1090 ) -> anyhow::Result<FeeMerkleTree> {
1091 let state = self
1092 .consensus_handle()
1093 .await
1094 .state(view)
1095 .await
1096 .context(format!(
1097 "state not available for height {height}, view {view}"
1098 ))?;
1099 retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
1100 }
1101
1102 #[tracing::instrument(skip(self, _instance))]
1103 async fn get_frontier(
1104 &self,
1105 _instance: &NodeState,
1106 height: u64,
1107 view: ViewNumber,
1108 ) -> anyhow::Result<BlocksFrontier> {
1109 let state = self
1110 .consensus_handle()
1111 .await
1112 .state(view)
1113 .await
1114 .context(format!(
1115 "state not available for height {height}, view {view}"
1116 ))?;
1117 let tree = &state.block_merkle_tree;
1118 let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
1119 Ok(frontier)
1120 }
1121
1122 async fn get_chain_config(
1123 &self,
1124 commitment: Commitment<ChainConfig>,
1125 ) -> anyhow::Result<ChainConfig> {
1126 let state = self
1127 .consensus_handle()
1128 .await
1129 .decided_state()
1130 .await
1131 .context("decided state not available")?;
1132 let chain_config = state.chain_config;
1133
1134 if chain_config.commit() == commitment {
1135 chain_config.resolve().context("chain config found")
1136 } else {
1137 bail!("chain config not found")
1138 }
1139 }
1140
1141 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
1142 let mut leaves = self.consensus_handle().await.undecided_leaves().await;
1145 leaves.sort_by_key(|l| l.view_number());
1146 let (position, mut last_leaf) = leaves
1147 .iter()
1148 .find_position(|l| l.height() == height)
1149 .context(format!("leaf chain not available for {height}"))?;
1150 let mut chain = vec![last_leaf.clone()];
1151 for leaf in leaves.iter().skip(position + 1) {
1152 if leaf.justify_qc().view_number() == last_leaf.view_number() {
1153 chain.push(leaf.clone());
1154 } else {
1155 continue;
1156 }
1157 if leaf.view_number() == last_leaf.view_number() + 1 {
1158 last_leaf = leaf;
1160 break;
1161 }
1162 last_leaf = leaf;
1163 }
1164 for leaf in leaves
1166 .iter()
1167 .skip_while(|l| l.view_number() <= last_leaf.view_number())
1168 {
1169 if leaf.justify_qc().view_number() == last_leaf.view_number() {
1170 chain.push(leaf.clone());
1171 return Ok(chain);
1172 }
1173 }
1174 bail!(format!("leaf chain not available for {height}"))
1175 }
1176
1177 #[tracing::instrument(skip(self, _instance))]
1178 async fn get_reward_accounts_v2(
1179 &self,
1180 _instance: &NodeState,
1181 height: u64,
1182 view: ViewNumber,
1183 accounts: &[RewardAccountV2],
1184 ) -> anyhow::Result<RewardMerkleTreeV2> {
1185 let state = self
1186 .consensus_handle()
1187 .await
1188 .state(view)
1189 .await
1190 .context(format!(
1191 "state not available for height {height}, view {view}"
1192 ))?;
1193
1194 retain_v2_reward_accounts(&state.reward_merkle_tree_v2, accounts.iter().copied())
1195 }
1196
1197 #[tracing::instrument(skip(self, _instance))]
1198 async fn get_reward_accounts_v1(
1199 &self,
1200 _instance: &NodeState,
1201 height: u64,
1202 view: ViewNumber,
1203 accounts: &[RewardAccountV1],
1204 ) -> anyhow::Result<RewardMerkleTreeV1> {
1205 let state = self
1206 .consensus_handle()
1207 .await
1208 .state(view)
1209 .await
1210 .context(format!(
1211 "state not available for height {height}, view {view}"
1212 ))?;
1213
1214 retain_v1_reward_accounts(&state.reward_merkle_tree_v1, accounts.iter().copied())
1215 }
1216
1217 async fn get_reward_merkle_tree_v2(
1218 &self,
1219 height: u64,
1220 view: ViewNumber,
1221 ) -> anyhow::Result<Vec<u8>> {
1222 let state = self
1223 .consensus_handle()
1224 .await
1225 .state(view)
1226 .await
1227 .context(format!(
1228 "state not available for height {height}, view {view}"
1229 ))?;
1230
1231 let merkle_tree_bytes = bincode::serialize(&TryInto::<RewardMerkleTreeV2Data>::try_into(
1232 &state.reward_merkle_tree_v2,
1233 )?)
1234 .context("Merkle tree serialization failed; this should never happen.")?;
1235
1236 Ok(merkle_tree_bytes)
1237 }
1238
1239 async fn get_state_cert(
1240 &self,
1241 epoch: u64,
1242 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1243 self.get_state_cert_by_epoch(epoch)
1244 .await?
1245 .context(format!("state cert not found for epoch {epoch}"))
1246 }
1247}
1248
1249impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence> HotShotConfigDataSource
1250 for StorageState<N, P, D>
1251{
1252 async fn get_config(&self) -> PublicNetworkConfig {
1253 self.as_ref().network_config().await.into()
1254 }
1255}
1256
1257impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> HotShotConfigDataSource
1258 for ApiState<N, P>
1259{
1260 async fn get_config(&self) -> PublicNetworkConfig {
1261 self.network_config().await.into()
1262 }
1263}
1264
1265#[async_trait]
1266impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence> StateSignatureDataSource<N>
1267 for StorageState<N, P, D>
1268{
1269 async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody> {
1270 self.as_ref().get_state_signature(height).await
1271 }
1272}
1273
1274#[async_trait]
1275impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> StateSignatureDataSource<N>
1276 for ApiState<N, P>
1277{
1278 async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody> {
1279 self.state_signer()
1280 .await
1281 .read()
1282 .await
1283 .get_state_signature(height)
1284 .await
1285 }
1286}
1287
1288#[derive(Serialize, Deserialize, Debug, Clone)]
1289pub struct RewardMerkleTreeV2Data {
1291 pub balances: Vec<(RewardAccountV2, RewardAmount)>,
1292}
1293
1294impl TryInto<RewardMerkleTreeV2Data> for &RewardMerkleTreeV2 {
1295 type Error = anyhow::Error;
1296 fn try_into(self) -> anyhow::Result<RewardMerkleTreeV2Data> {
1298 let num_leaves = self.num_leaves();
1299
1300 let balances: Vec<_> = self
1301 .iter()
1302 .map(|(account, balance)| (*account, *balance))
1303 .collect();
1304
1305 if balances.len() as u64 == num_leaves {
1306 Ok(RewardMerkleTreeV2Data { balances })
1307 } else {
1308 tracing::error!(
1309 "Attempted to serialize an incomplete RewardMerkleTreeV2. This is not a fatal \
1310 error, but it should never happen and indicates that something may be seriously \
1311 wrong. Balances length: {}, num_leaves: {}.",
1312 balances.len(),
1313 num_leaves
1314 );
1315 bail!(
1316 "Failed to convert RewardMerkleTreeV2 into key-value pairs. Some accounts are \
1317 missing."
1318 );
1319 }
1320 }
1321}
1322
1323pub(crate) trait RewardMerkleTreeDataSource: Send + Sync + Clone + 'static {
1324 fn load_v1_reward_account_proof(
1325 &self,
1326 _height: u64,
1327 _account: RewardAccountV1,
1328 ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV1>>;
1329
1330 fn save_and_gc_reward_tree_v2(
1331 &self,
1332 node_state: &NodeState,
1333 height: u64,
1334 version: Version,
1335 merkle_tree: &RewardMerkleTreeV2,
1336 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1337 async move {
1338 let serialization =
1339 bincode::serialize(&TryInto::<RewardMerkleTreeV2Data>::try_into(merkle_tree)?)
1340 .context("Merkle tree serialization failed")?;
1341 self.persist_tree(height, serialization).await?;
1342
1343 if cfg!(any(test, feature = "testing")) {
1345 return Ok(());
1346 }
1347
1348 let finalized_hotshot_height = match node_state.finalized_hotshot_height().await {
1349 Ok(h) => h,
1350 Err(err) => {
1351 tracing::warn!("failed to get finalized hotshot height: {err:#}");
1352 return Ok(());
1353 },
1354 };
1355
1356 let epoch_height = node_state
1371 .epoch_height
1372 .context("reward tree gc requires an epoch height")?;
1373 let epochs_to_retain = if version >= versions::EPOCH_REWARD_VERSION {
1379 5
1380 } else {
1381 1
1382 };
1383 let current_epoch = epoch_from_block_number(height, epoch_height);
1384 let epoch_start_block = current_epoch.saturating_sub(epochs_to_retain) * epoch_height;
1386
1387 let gc_height = epoch_start_block.min(finalized_hotshot_height);
1388
1389 if let Err(err) = self.garbage_collect(gc_height).await {
1390 tracing::info!(gc_height, "failed to garbage collect: {err:#}");
1391 }
1392
1393 Ok(())
1394 }
1395 }
1396
1397 fn persist_reward_proofs(
1398 &self,
1399 node_state: &NodeState,
1400 height: u64,
1401 version: Version,
1402 ) -> impl Send + Future<Output = anyhow::Result<()>>;
1403
1404 fn load_reward_merkle_tree_v2(
1405 &self,
1406 height: u64,
1407 ) -> impl Send + Future<Output = anyhow::Result<PermittedRewardMerkleTreeV2>> {
1408 async move {
1409 let tree_bytes = self.load_tree(height).await?;
1410
1411 let tree_data = bincode::deserialize::<RewardMerkleTreeV2Data>(&tree_bytes).context(
1412 "Failed to deserialize RewardMerkleTreeV2 for height {height} from storage; this \
1413 should never happen.",
1414 )?;
1415
1416 PermittedRewardMerkleTreeV2::try_from_kv_set(tree_data.balances)
1417 .await
1418 .context("Failed to reconstruct reward merkle tree from storage")
1419 }
1420 }
1421
1422 fn load_latest_reward_merkle_tree_v2(
1431 &self,
1432 height: u64,
1433 ) -> impl Send + Future<Output = anyhow::Result<PermittedRewardMerkleTreeV2>> {
1434 async move {
1435 let tree_bytes = self.load_latest_tree(height).await?;
1436
1437 let tree_data = bincode::deserialize::<RewardMerkleTreeV2Data>(&tree_bytes)
1438 .context("Failed to deserialize RewardMerkleTreeV2 from storage")?;
1439
1440 PermittedRewardMerkleTreeV2::try_from_kv_set(tree_data.balances)
1441 .await
1442 .context("Failed to reconstruct reward merkle tree from storage")
1443 }
1444 }
1445
1446 fn load_reward_account_proof_v2(
1447 &self,
1448 _height: u64,
1449 _account: RewardAccountV2,
1450 ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV2>> {
1451 async {
1452 bail!("load_reward_account_proof_v2 is not supported for this data source");
1453 }
1454 }
1455
1456 fn load_latest_reward_account_proof_v2(
1457 &self,
1458 account: RewardAccountV2,
1459 ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV2>> {
1460 async move {
1461 let serialized_account = bincode::serialize(&account).context(
1462 "Failed to serialize RewardAccountV2 for lookup; this should never happen.",
1463 )?;
1464 let proof_bytes = self.load_latest_proof(serialized_account).await?;
1465
1466 bincode::deserialize::<RewardAccountQueryDataV2>(&proof_bytes).context(
1467 "Failed to deserialize RewardAccountQueryDataV2 for account {account} from \
1468 storage; this should never happen.",
1469 )
1470 }
1471 }
1472
1473 fn persist_tree(
1474 &self,
1475 height: u64,
1476 merkle_tree: Vec<u8>,
1477 ) -> impl Send + Future<Output = anyhow::Result<()>>;
1478
1479 fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>>;
1480
1481 fn load_latest_tree(&self, height: u64)
1483 -> impl Send + Future<Output = anyhow::Result<Vec<u8>>>;
1484
1485 fn persist_proofs(
1486 &self,
1487 height: u64,
1488 proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
1489 ) -> impl Send + Future<Output = anyhow::Result<()>>;
1490
1491 fn load_proof(
1492 &self,
1493 height: u64,
1494 account: Vec<u8>,
1495 epoch_height: u64,
1496 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>>;
1497
1498 fn load_latest_proof(
1499 &self,
1500 account: Vec<u8>,
1501 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>>;
1502
1503 fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool>;
1504
1505 fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>>;
1507}
1508
1509impl RewardMerkleTreeDataSource for hotshot_query_service::data_source::MetricsDataSource {
1510 fn load_v1_reward_account_proof(
1511 &self,
1512 _height: u64,
1513 _account: RewardAccountV1,
1514 ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV1>> {
1515 async {
1516 bail!("reward merklized state is not supported for this data source");
1517 }
1518 }
1519
1520 fn persist_reward_proofs(
1521 &self,
1522 _node_state: &NodeState,
1523 _height: u64,
1524 _version: Version,
1525 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1526 async {
1527 bail!("reward merklized state is not supported for this data source");
1528 }
1529 }
1530
1531 fn persist_tree(
1532 &self,
1533 _height: u64,
1534 _merkle_tree: Vec<u8>,
1535 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1536 async move {
1537 bail!("reward merklized state is not supported for this data source");
1538 }
1539 }
1540
1541 fn load_tree(&self, _height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1542 async move {
1543 bail!("reward merklized state is not supported for this data source");
1544 }
1545 }
1546
1547 fn load_latest_tree(
1548 &self,
1549 _height: u64,
1550 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1551 async move {
1552 bail!("reward merklized state is not supported for this data source");
1553 }
1554 }
1555
1556 fn garbage_collect(&self, _height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
1557 async move {
1558 bail!("reward merklized state is not supported for this data source");
1559 }
1560 }
1561
1562 fn persist_proofs(
1563 &self,
1564 _height: u64,
1565 _proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
1566 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1567 async move {
1568 bail!("reward merklized state is not supported for this data source");
1569 }
1570 }
1571
1572 fn load_proof(
1573 &self,
1574 _height: u64,
1575 _account: Vec<u8>,
1576 _epoch_height: u64,
1577 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1578 async move {
1579 bail!("reward merklized state is not supported for this data source");
1580 }
1581 }
1582
1583 fn load_latest_proof(
1584 &self,
1585 _account: Vec<u8>,
1586 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1587 async move {
1588 bail!("reward merklized state is not supported for this data source");
1589 }
1590 }
1591
1592 fn proof_exists(&self, _height: u64) -> impl Send + Future<Output = bool> {
1593 async move { false }
1594 }
1595}
1596
1597impl<T, S> RewardMerkleTreeDataSource
1598 for hotshot_query_service::data_source::ExtensibleDataSource<T, S>
1599where
1600 T: RewardMerkleTreeDataSource,
1601 S: Send + Sync + Clone + NodeStateDataSource + 'static,
1602{
1603 async fn load_v1_reward_account_proof(
1604 &self,
1605 height: u64,
1606 account: RewardAccountV1,
1607 ) -> anyhow::Result<RewardAccountQueryDataV1> {
1608 self.inner()
1609 .load_v1_reward_account_proof(height, account)
1610 .await
1611 }
1612
1613 async fn load_reward_account_proof_v2(
1614 &self,
1615 height: u64,
1616 account: RewardAccountV2,
1617 ) -> anyhow::Result<RewardAccountQueryDataV2> {
1618 let epoch_height = self
1619 .as_ref()
1620 .node_state()
1621 .await
1622 .epoch_height
1623 .context("epoch height not found")?;
1624 let serialized_account = bincode::serialize(&account)
1625 .context("Failed to serialize RewardAccountV2 for lookup; this should never happen.")?;
1626 let proof_bytes = self
1627 .inner()
1628 .load_proof(height, serialized_account, epoch_height)
1629 .await?;
1630
1631 bincode::deserialize::<RewardAccountQueryDataV2>(&proof_bytes)
1632 .context("Failed to deserialize RewardAccountQueryDataV2 from storage")
1633 }
1634
1635 fn persist_tree(
1636 &self,
1637 height: u64,
1638 merkle_tree: Vec<u8>,
1639 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1640 async move { self.inner().persist_tree(height, merkle_tree).await }
1641 }
1642
1643 fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1644 async move { self.inner().load_tree(height).await }
1645 }
1646
1647 fn load_latest_tree(
1648 &self,
1649 height: u64,
1650 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1651 async move { self.inner().load_latest_tree(height).await }
1652 }
1653
1654 fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
1655 async move { self.inner().garbage_collect(height).await }
1656 }
1657
1658 fn persist_proofs(
1659 &self,
1660 height: u64,
1661 proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
1662 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1663 async move { self.inner().persist_proofs(height, proofs).await }
1664 }
1665
1666 fn load_proof(
1667 &self,
1668 height: u64,
1669 account: Vec<u8>,
1670 epoch_height: u64,
1671 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1672 async move { self.inner().load_proof(height, account, epoch_height).await }
1673 }
1674
1675 fn load_latest_proof(
1676 &self,
1677 account: Vec<u8>,
1678 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1679 async move { self.inner().load_latest_proof(account).await }
1680 }
1681
1682 fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool> {
1683 async move { self.inner().proof_exists(height).await }
1684 }
1685
1686 fn persist_reward_proofs(
1687 &self,
1688 node_state: &NodeState,
1689 height: u64,
1690 version: Version,
1691 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1692 async move {
1693 self.inner()
1694 .persist_reward_proofs(node_state, height, version)
1695 .await
1696 }
1697 }
1698}
1699
1700impl<D> RewardMerkleTreeDataSource for Arc<D>
1702where
1703 D: RewardMerkleTreeDataSource,
1704{
1705 async fn load_v1_reward_account_proof(
1706 &self,
1707 height: u64,
1708 account: RewardAccountV1,
1709 ) -> anyhow::Result<RewardAccountQueryDataV1> {
1710 (**self).load_v1_reward_account_proof(height, account).await
1711 }
1712
1713 fn persist_tree(
1714 &self,
1715 height: u64,
1716 merkle_tree: Vec<u8>,
1717 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1718 async move { (**self).persist_tree(height, merkle_tree).await }
1719 }
1720
1721 fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1722 async move { (**self).load_tree(height).await }
1723 }
1724
1725 fn load_latest_tree(
1726 &self,
1727 height: u64,
1728 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1729 async move { (**self).load_latest_tree(height).await }
1730 }
1731
1732 fn load_reward_merkle_tree_v2(
1733 &self,
1734 height: u64,
1735 ) -> impl Send + Future<Output = anyhow::Result<PermittedRewardMerkleTreeV2>> {
1736 async move { (**self).load_reward_merkle_tree_v2(height).await }
1737 }
1738
1739 fn load_reward_account_proof_v2(
1740 &self,
1741 height: u64,
1742 account: RewardAccountV2,
1743 ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV2>> {
1744 async move { (**self).load_reward_account_proof_v2(height, account).await }
1745 }
1746
1747 fn persist_proofs(
1748 &self,
1749 height: u64,
1750 proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
1751 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1752 async move { (**self).persist_proofs(height, proofs).await }
1753 }
1754
1755 fn persist_reward_proofs(
1756 &self,
1757 node_state: &NodeState,
1758 height: u64,
1759 version: Version,
1760 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1761 async move {
1762 (**self)
1763 .persist_reward_proofs(node_state, height, version)
1764 .await
1765 }
1766 }
1767
1768 fn load_proof(
1769 &self,
1770 height: u64,
1771 account: Vec<u8>,
1772 epoch_height: u64,
1773 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1774 async move { (**self).load_proof(height, account, epoch_height).await }
1775 }
1776
1777 fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool> {
1778 async move { (**self).proof_exists(height).await }
1779 }
1780
1781 fn load_latest_proof(
1782 &self,
1783 account: Vec<u8>,
1784 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1785 async move { (**self).load_latest_proof(account).await }
1786 }
1787
1788 fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
1789 async move { (**self).garbage_collect(height).await }
1790 }
1791}
1792
1793#[derive(Debug)]
1802struct LightClientProvider {
1803 light_client: BoxLazy<LightClient<SqliteStorage, FallbackClient<QueryServiceClient>>>,
1804}
1805
1806impl LightClientProvider {
1807 pub async fn new<N, P>(
1808 peers: impl IntoIterator<Item = Url>,
1809 state: ApiState<N, P>,
1810 opt: LightClientOptions,
1811 db_opt: LightClientSqliteOptions,
1812 ) -> anyhow::Result<Self>
1813 where
1814 N: ConnectedNetwork<PubKey>,
1815 P: SequencerPersistence,
1816 {
1817 let db = db_opt
1818 .connect()
1819 .await
1820 .context("creating SQLite database for light client")?;
1821 let client = FallbackClient::new(peers.into_iter().map(QueryServiceClient::new).collect())?;
1822 let init_light_client = async move {
1823 let config = state.network_config().await;
1824 let epoch_height = config.config.epoch_height;
1825 let first_epoch =
1826 epoch_from_block_number(config.config.epoch_start_block, epoch_height);
1827
1828 let genesis = Genesis {
1829 epoch_height,
1830
1831 first_epoch_with_dynamic_stake_table: EpochNumber::new(first_epoch + 2),
1834
1835 stake_table: config
1836 .config
1837 .known_nodes_with_stake
1838 .into_iter()
1839 .map(|peer| peer.stake_table_entry)
1840 .collect(),
1841 };
1842 LightClient::from_genesis_with_options(db, client, genesis, opt)
1843 };
1844 Ok(Self {
1845 light_client: Arc::pin(Lazy::from_future(init_light_client.boxed())),
1846 })
1847 }
1848}
1849
1850#[async_trait]
1851impl<T> Provider<SeqTypes, T> for LightClientProvider
1852where
1853 T: fetching::Request<SeqTypes> + 'static,
1854 LightClient<SqliteStorage, FallbackClient<QueryServiceClient>>: Provider<SeqTypes, T>,
1855{
1856 async fn fetch(&self, req: T) -> Option<T::Response> {
1857 self.light_client.as_ref().get().await.fetch(req).await
1858 }
1859}
1860
1861#[cfg(any(test, feature = "testing"))]
1862pub mod test_helpers {
1863 use std::{cmp::max, time::Duration};
1864
1865 use alloy::{
1866 network::EthereumWallet,
1867 primitives::{Address, U256},
1868 providers::{ProviderBuilder, ext::AnvilApi},
1869 };
1870 use committable::Committable;
1871 use espresso_contract_deployer::{
1872 Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS, builder::DeployerArgsBuilder,
1873 network_config::light_client_genesis_from_stake_table,
1874 };
1875 use espresso_types::{
1876 MOCK_SEQUENCER_VERSIONS, NamespaceId, ValidatedState,
1877 v0::traits::{NullEventConsumer, PersistenceOptions, StateCatchup},
1878 };
1879 use futures::{
1880 future::{FutureExt, join_all},
1881 stream::StreamExt,
1882 };
1883 use hotshot::types::{Event, EventType};
1884 use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
1885 use hotshot_types::{
1886 event::LeafInfo, light_client::LCV3StateSignatureRequestBody,
1887 new_protocol::CoordinatorEvent, traits::metrics::NoMetrics,
1888 };
1889 use itertools::izip;
1890 use jf_merkle_tree_compat::{MerkleCommitment, MerkleTreeScheme};
1891 use staking_cli::demo::{DelegationConfig, StakingTransactions};
1892 use surf_disco::Client;
1893 use tempfile::TempDir;
1894 use test_utils::reserve_tcp_port;
1895 use tide_disco::{Api, App, Error, StatusCode, error::ServerError};
1896 use tokio::{spawn, task::JoinHandle, time::sleep};
1897 use url::Url;
1898 use vbs::version::{StaticVersion, StaticVersionType};
1899 use versions::{EPOCH_VERSION, Upgrade};
1900
1901 use super::*;
1902 use crate::{
1903 catchup::NullStateCatchup,
1904 network,
1905 persistence::no_storage,
1906 testing::{TestConfig, TestConfigBuilder, run_legacy_builder, wait_for_decide_on_handle},
1907 };
1908
1909 pub const STAKE_TABLE_CAPACITY_FOR_TEST: usize = 10;
1910
1911 pub struct TestNetwork<P: PersistenceOptions, const NUM_NODES: usize> {
1912 pub server: SequencerContext<network::Memory, P::Persistence>,
1913 pub peers: Vec<SequencerContext<network::Memory, P::Persistence>>,
1914 pub cfg: TestConfig<{ NUM_NODES }>,
1915 pub temp_dir: Option<TempDir>,
1917 pub contracts: Option<Contracts>,
1918 }
1919
1920 pub struct TestNetworkConfig<const NUM_NODES: usize, P, C>
1921 where
1922 P: PersistenceOptions,
1923 C: StateCatchup + 'static,
1924 {
1925 state: [ValidatedState; NUM_NODES],
1926 persistence: [P; NUM_NODES],
1927 catchup: [C; NUM_NODES],
1928 network_config: TestConfig<{ NUM_NODES }>,
1929 api_config: Options,
1930 contracts: Option<Contracts>,
1931 }
1932
1933 impl<const NUM_NODES: usize, P, C> TestNetworkConfig<{ NUM_NODES }, P, C>
1934 where
1935 P: PersistenceOptions,
1936 C: StateCatchup + 'static,
1937 {
1938 pub fn states(&self) -> [ValidatedState; NUM_NODES] {
1939 self.state.clone()
1940 }
1941 }
1942
1943 #[derive(Clone)]
1944 pub struct TestNetworkConfigBuilder<const NUM_NODES: usize, P, C>
1945 where
1946 P: PersistenceOptions,
1947 C: StateCatchup + 'static,
1948 {
1949 state: [ValidatedState; NUM_NODES],
1950 persistence: Option<[P; NUM_NODES]>,
1951 catchup: Option<[C; NUM_NODES]>,
1952 api_config: Option<Options>,
1953 network_config: Option<TestConfig<{ NUM_NODES }>>,
1954 contracts: Option<Contracts>,
1955 initial_token_supply: Option<U256>,
1956 }
1957
1958 impl Default for TestNetworkConfigBuilder<5, no_storage::Options, NullStateCatchup> {
1959 fn default() -> Self {
1960 TestNetworkConfigBuilder {
1961 state: std::array::from_fn(|_| ValidatedState::default()),
1962 persistence: Some([no_storage::Options; 5]),
1963 catchup: Some(std::array::from_fn(|_| NullStateCatchup::default())),
1964 network_config: None,
1965 api_config: None,
1966 contracts: None,
1967 initial_token_supply: None,
1968 }
1969 }
1970 }
1971
1972 impl<const NUM_NODES: usize>
1973 TestNetworkConfigBuilder<{ NUM_NODES }, no_storage::Options, NullStateCatchup>
1974 {
1975 pub fn with_num_nodes()
1976 -> TestNetworkConfigBuilder<{ NUM_NODES }, no_storage::Options, NullStateCatchup> {
1977 TestNetworkConfigBuilder {
1978 state: std::array::from_fn(|_| ValidatedState::default()),
1979 persistence: Some([no_storage::Options; { NUM_NODES }]),
1980 catchup: Some(std::array::from_fn(|_| NullStateCatchup::default())),
1981 network_config: None,
1982 api_config: None,
1983 contracts: None,
1984 initial_token_supply: None,
1985 }
1986 }
1987 }
1988
1989 impl<const NUM_NODES: usize, P, C> TestNetworkConfigBuilder<{ NUM_NODES }, P, C>
1990 where
1991 P: PersistenceOptions,
1992 C: StateCatchup + 'static,
1993 {
1994 pub fn states(mut self, state: [ValidatedState; NUM_NODES]) -> Self {
1995 self.state = state;
1996 self
1997 }
1998
1999 pub fn initial_token_supply(mut self, supply: U256) -> Self {
2000 self.initial_token_supply = Some(supply);
2001 self
2002 }
2003
2004 pub fn persistences<NP: PersistenceOptions>(
2005 self,
2006 persistence: [NP; NUM_NODES],
2007 ) -> TestNetworkConfigBuilder<{ NUM_NODES }, NP, C> {
2008 TestNetworkConfigBuilder {
2009 state: self.state,
2010 catchup: self.catchup,
2011 network_config: self.network_config,
2012 api_config: self.api_config,
2013 persistence: Some(persistence),
2014 contracts: self.contracts,
2015 initial_token_supply: self.initial_token_supply,
2016 }
2017 }
2018
2019 pub fn api_config(mut self, api_config: Options) -> Self {
2020 self.api_config = Some(api_config);
2021 self
2022 }
2023
2024 pub fn catchups<NC: StateCatchup + 'static>(
2025 self,
2026 catchup: [NC; NUM_NODES],
2027 ) -> TestNetworkConfigBuilder<{ NUM_NODES }, P, NC> {
2028 TestNetworkConfigBuilder {
2029 state: self.state,
2030 catchup: Some(catchup),
2031 network_config: self.network_config,
2032 api_config: self.api_config,
2033 persistence: self.persistence,
2034 contracts: self.contracts,
2035 initial_token_supply: self.initial_token_supply,
2036 }
2037 }
2038
2039 pub fn network_config(mut self, network_config: TestConfig<{ NUM_NODES }>) -> Self {
2040 self.network_config = Some(network_config);
2041 self
2042 }
2043
2044 pub fn contracts(mut self, contracts: Contracts) -> Self {
2045 self.contracts = Some(contracts);
2046 self
2047 }
2048
2049 pub async fn pos_hook(
2052 self,
2053 delegation_config: DelegationConfig,
2054 stake_table_version: StakeTableContractVersion,
2055 upgrade: Upgrade,
2056 ) -> anyhow::Result<Self> {
2057 if upgrade.base < EPOCH_VERSION && upgrade.target < EPOCH_VERSION {
2058 panic!("given version does not require pos deployment");
2059 };
2060
2061 let network_config = self
2062 .network_config
2063 .as_ref()
2064 .expect("network_config is required");
2065
2066 let l1_url = network_config.l1_url();
2067 let signer = network_config.signer();
2068 let deployer = ProviderBuilder::new()
2069 .wallet(EthereumWallet::from(signer.clone()))
2070 .connect_http(l1_url.clone());
2071
2072 let blocks_per_epoch = network_config.hotshot_config().epoch_height;
2073 let epoch_start_block = network_config.hotshot_config().epoch_start_block;
2074 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
2075 &network_config.hotshot_config().hotshot_stake_table(),
2076 STAKE_TABLE_CAPACITY_FOR_TEST,
2077 )
2078 .unwrap();
2079
2080 let mut contracts = Contracts::new();
2081 let args = DeployerArgsBuilder::default()
2082 .deployer(deployer.clone())
2083 .rpc_url(l1_url.clone())
2084 .mock_light_client(true)
2085 .genesis_lc_state(genesis_state)
2086 .genesis_st_state(genesis_stake)
2087 .blocks_per_epoch(blocks_per_epoch)
2088 .epoch_start_block(epoch_start_block)
2089 .exit_escrow_period(U256::from(max(
2090 blocks_per_epoch * 15 + 100,
2091 DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
2092 )))
2093 .multisig_pauser(signer.address())
2094 .token_name("Espresso".to_string())
2095 .token_symbol("ESP".to_string())
2096 .initial_token_supply(self.initial_token_supply.unwrap_or(U256::from(100000u64)))
2097 .ops_timelock_delay(U256::from(0))
2098 .ops_timelock_admin(signer.address())
2099 .ops_timelock_proposers(vec![signer.address()])
2100 .ops_timelock_executors(vec![signer.address()])
2101 .safe_exit_timelock_delay(U256::from(10))
2102 .safe_exit_timelock_admin(signer.address())
2103 .safe_exit_timelock_proposers(vec![signer.address()])
2104 .safe_exit_timelock_executors(vec![signer.address()])
2105 .build()
2106 .unwrap();
2107
2108 match stake_table_version {
2109 StakeTableContractVersion::V1 => {
2110 args.deploy_to_stake_table_v1(&mut contracts).await
2111 },
2112 StakeTableContractVersion::V2 => {
2113 args.deploy_to_stake_table_v2(&mut contracts).await
2114 },
2115 StakeTableContractVersion::V3 => {
2116 args.deploy_to_stake_table_v3(&mut contracts).await
2117 },
2118 }
2119 .context("failed to deploy contracts")?;
2120
2121 let stake_table_address = contracts
2122 .address(Contract::StakeTableProxy)
2123 .expect("StakeTableProxy address not found");
2124
2125 StakingTransactions::create(
2126 l1_url.clone(),
2127 &deployer,
2128 stake_table_address,
2129 network_config.staking_priv_keys(),
2130 None,
2131 delegation_config,
2132 )
2133 .await
2134 .expect("stake table setup failed")
2135 .apply_all()
2136 .await
2137 .expect("send all txns failed");
2138
2139 if let Some(anvil) = network_config.anvil() {
2144 anvil
2145 .anvil_set_interval_mining(1)
2146 .await
2147 .expect("interval mining");
2148 }
2149
2150 let state = self.state[0].clone();
2154 let chain_config = if let Some(cf) = state.chain_config.resolve() {
2155 ChainConfig {
2156 base_fee: 0.into(),
2157 stake_table_contract: Some(stake_table_address),
2158 ..cf
2159 }
2160 } else {
2161 ChainConfig {
2162 base_fee: 0.into(),
2163 stake_table_contract: Some(stake_table_address),
2164 ..Default::default()
2165 }
2166 };
2167
2168 let state = ValidatedState {
2169 chain_config: chain_config.into(),
2170 ..state
2171 };
2172 Ok(self
2173 .states(std::array::from_fn(|_| state.clone()))
2174 .contracts(contracts))
2175 }
2176
2177 pub fn build(self) -> TestNetworkConfig<{ NUM_NODES }, P, C> {
2178 TestNetworkConfig {
2179 state: self.state,
2180 persistence: self.persistence.unwrap(),
2181 catchup: self.catchup.unwrap(),
2182 network_config: self.network_config.unwrap(),
2183 api_config: self.api_config.unwrap(),
2184 contracts: self.contracts,
2185 }
2186 }
2187 }
2188
2189 impl<P: PersistenceOptions, const NUM_NODES: usize> TestNetwork<P, { NUM_NODES }> {
2190 pub async fn new<C: StateCatchup + 'static>(
2191 cfg: TestNetworkConfig<{ NUM_NODES }, P, C>,
2192 upgrade: versions::Upgrade,
2193 ) -> Self {
2194 let mut cfg = cfg;
2195 let mut builder_tasks = Vec::new();
2196
2197 let chain_config = cfg.state[0].chain_config.resolve();
2198 if chain_config.is_none() {
2199 tracing::warn!("Chain config is not set, using default max_block_size");
2200 }
2201 let (task, builder_url) = run_legacy_builder::<{ NUM_NODES }>(
2202 cfg.network_config.builder_port(),
2203 chain_config.map(|c| *c.max_block_size),
2204 )
2205 .await;
2206 builder_tasks.push(task);
2207 cfg.network_config
2208 .set_builder_urls(vec1::vec1![builder_url.clone()]);
2209
2210 let mut opt = cfg.api_config.clone();
2212 let temp_dir = if opt.storage_fs.is_none() && opt.storage_sql.is_none() {
2213 let temp_dir = tempfile::tempdir().unwrap();
2214 opt = opt.query_fs(
2215 Default::default(),
2216 crate::persistence::fs::Options::new(temp_dir.path().to_path_buf()),
2217 );
2218 Some(temp_dir)
2219 } else {
2220 None
2221 };
2222
2223 let mut nodes = join_all(
2224 izip!(cfg.state, cfg.persistence, cfg.catchup)
2225 .enumerate()
2226 .map(|(i, (state, persistence, state_peers))| {
2227 let opt = opt.clone();
2228 let cfg = &cfg.network_config;
2229 let upgrades_map = cfg.upgrades();
2230 async move {
2231 if i == 0 {
2232 opt.serve(|metrics, consumer, storage| {
2233 let cfg = cfg.clone();
2234 async move {
2235 Ok(cfg
2236 .init_node(
2237 0,
2238 state,
2239 persistence,
2240 Some(state_peers),
2241 storage,
2242 &*metrics,
2243 STAKE_TABLE_CAPACITY_FOR_TEST,
2244 consumer,
2245 upgrade,
2246 upgrades_map,
2247 )
2248 .await)
2249 }
2250 .boxed()
2251 })
2252 .await
2253 .unwrap()
2254 } else {
2255 cfg.init_node(
2256 i,
2257 state,
2258 persistence,
2259 Some(state_peers),
2260 None,
2261 &NoMetrics,
2262 STAKE_TABLE_CAPACITY_FOR_TEST,
2263 NullEventConsumer,
2264 upgrade,
2265 upgrades_map,
2266 )
2267 .await
2268 }
2269 }
2270 .boxed()
2271 }),
2272 )
2273 .await;
2274
2275 let handle_0 = &nodes[0];
2276
2277 for builder_task in builder_tasks {
2279 builder_task.start(Box::new(
2280 handle_0
2281 .consensus_handle()
2282 .legacy_consensus()
2283 .read()
2284 .await
2285 .event_stream(),
2286 ));
2287 }
2288
2289 for ctx in &nodes {
2290 ctx.start_consensus().await;
2291 }
2292
2293 let server = nodes.remove(0);
2294 let peers = nodes;
2295
2296 Self {
2297 server,
2298 peers,
2299 cfg: cfg.network_config,
2300 temp_dir,
2301 contracts: cfg.contracts,
2302 }
2303 }
2304
2305 pub async fn stop_consensus(&mut self) {
2306 self.server.shutdown_consensus().await;
2307
2308 for ctx in &mut self.peers {
2309 ctx.shutdown_consensus().await;
2310 }
2311 }
2312 }
2313
2314 pub async fn status_test_helper(opt: impl FnOnce(Options) -> Options) {
2322 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2323 let url = format!("http://localhost:{port}").parse().unwrap();
2324 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2325
2326 let options = opt(Options::with_port(port));
2327 let network_config = TestConfigBuilder::default().build();
2328 let config = TestNetworkConfigBuilder::default()
2329 .api_config(options)
2330 .network_config(network_config)
2331 .build();
2332 let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2333 client.connect(None).await;
2334
2335 while client
2339 .get::<u64>("status/block-height")
2340 .send()
2341 .await
2342 .unwrap()
2343 <= 1
2344 {
2345 sleep(Duration::from_secs(1)).await;
2346 }
2347 let success_rate = client
2348 .get::<f64>("status/success-rate")
2349 .send()
2350 .await
2351 .unwrap();
2352 assert!(success_rate.is_finite(), "{success_rate}");
2355 assert!(success_rate > 0.0, "{success_rate}");
2357 }
2358
2359 pub async fn submit_test_helper(opt: impl FnOnce(Options) -> Options) {
2367 let txn = Transaction::new(NamespaceId::from(1_u32), vec![1, 2, 3, 4]);
2368
2369 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2370
2371 let url = format!("http://localhost:{port}").parse().unwrap();
2372 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2373
2374 let options = opt(Options::with_port(port).submit(Default::default()));
2375 let network_config = TestConfigBuilder::default().build();
2376 let config = TestNetworkConfigBuilder::default()
2377 .api_config(options)
2378 .network_config(network_config)
2379 .build();
2380 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2381 let mut events = network.server.event_stream();
2382
2383 client.connect(None).await;
2384
2385 let hash = client
2386 .post("submit/submit")
2387 .body_json(&txn)
2388 .unwrap()
2389 .send()
2390 .await
2391 .unwrap();
2392 assert_eq!(txn.commit(), hash);
2393
2394 wait_for_decide_on_handle(&mut events, &txn).await;
2396 }
2397
2398 pub async fn state_signature_test_helper(opt: impl FnOnce(Options) -> Options) {
2400 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2401
2402 let url = format!("http://localhost:{port}").parse().unwrap();
2403
2404 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2405
2406 let options = opt(Options::with_port(port));
2407 let network_config = TestConfigBuilder::default().build();
2408 let config = TestNetworkConfigBuilder::default()
2409 .api_config(options)
2410 .network_config(network_config)
2411 .build();
2412 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2413
2414 let mut height: u64;
2415 loop {
2418 height = network.server.decided_leaf().await.height();
2419 sleep(std::time::Duration::from_secs(1)).await;
2420 if height >= 2 {
2421 break;
2422 }
2423 }
2424 client
2426 .get::<LCV3StateSignatureRequestBody>(&format!("state-signature/block/{height}"))
2427 .send()
2428 .await
2429 .unwrap();
2430 }
2431
2432 pub async fn catchup_test_helper(opt: impl FnOnce(Options) -> Options) {
2440 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2441 let url = format!("http://localhost:{port}").parse().unwrap();
2442 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2443
2444 let options = opt(Options::with_port(port));
2445 let network_config = TestConfigBuilder::default().build();
2446 let config = TestNetworkConfigBuilder::default()
2447 .api_config(options)
2448 .network_config(network_config)
2449 .build();
2450 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2451 client.connect(None).await;
2452
2453 let mut events = network.server.event_stream();
2455 loop {
2456 if let CoordinatorEvent::LegacyEvent(Event {
2457 event: EventType::Decide { leaf_chain, .. },
2458 ..
2459 }) = events.next().await.unwrap()
2460 && leaf_chain
2461 .iter()
2462 .any(|LeafInfo { leaf, .. }| leaf.block_header().height() > 2)
2463 {
2464 break;
2465 }
2466 }
2467
2468 {
2471 network.server.shutdown_consensus().await;
2472 }
2473
2474 let leaf = network.server.decided_leaf().await;
2476 let height = leaf.height() + 1;
2477 let view = leaf.view_number() + 1;
2478 let res = client
2479 .get::<AccountQueryData>(&format!(
2480 "catchup/{height}/{}/account/{:x}",
2481 view.u64(),
2482 Address::default()
2483 ))
2484 .send()
2485 .await
2486 .unwrap();
2487 assert_eq!(res.balance, U256::ZERO);
2488 assert_eq!(
2489 res.proof
2490 .verify(
2491 &network
2492 .server
2493 .state(view)
2494 .await
2495 .unwrap()
2496 .fee_merkle_tree
2497 .commitment()
2498 )
2499 .unwrap(),
2500 U256::ZERO,
2501 );
2502
2503 let res = client
2505 .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
2506 .send()
2507 .await
2508 .unwrap();
2509 let root = &network
2510 .server
2511 .state(view)
2512 .await
2513 .unwrap()
2514 .block_merkle_tree
2515 .commitment();
2516 BlockMerkleTree::verify(root, root.size() - 1, res)
2517 .unwrap()
2518 .unwrap();
2519 }
2520
2521 pub async fn spawn_dishonest_peer_catchup_api() -> anyhow::Result<(Url, JoinHandle<()>)> {
2522 let toml = toml::from_str::<toml::Value>(include_str!("../api/catchup.toml")).unwrap();
2523 let mut api =
2524 Api::<(), hotshot_query_service::Error, SequencerApiVersion>::new(toml).unwrap();
2525
2526 api.get("account", |_req, _state: &()| {
2527 async move {
2528 Result::<AccountQueryData, _>::Err(hotshot_query_service::Error::catch_all(
2529 StatusCode::BAD_REQUEST,
2530 "no account found".to_string(),
2531 ))
2532 }
2533 .boxed()
2534 })?
2535 .get("blocks", |_req, _state| {
2536 async move {
2537 Result::<BlocksFrontier, _>::Err(hotshot_query_service::Error::catch_all(
2538 StatusCode::BAD_REQUEST,
2539 "no block found".to_string(),
2540 ))
2541 }
2542 .boxed()
2543 })?
2544 .get("chainconfig", |_req, _state| {
2545 async move {
2546 Result::<ChainConfig, _>::Ok(ChainConfig {
2547 max_block_size: 300.into(),
2548 base_fee: 1.into(),
2549 fee_recipient: "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
2550 .parse()
2551 .unwrap(),
2552 ..Default::default()
2553 })
2554 }
2555 .boxed()
2556 })?
2557 .get("leafchain", |_req, _state| {
2558 async move {
2559 Result::<Vec<Leaf2>, _>::Err(hotshot_query_service::Error::catch_all(
2560 StatusCode::BAD_REQUEST,
2561 "No leafchain found".to_string(),
2562 ))
2563 }
2564 .boxed()
2565 })?;
2566
2567 let mut app = App::<_, hotshot_query_service::Error>::with_state(());
2568 app.with_version(env!("CARGO_PKG_VERSION").parse().unwrap());
2569
2570 app.register_module::<_, _>("catchup", api).unwrap();
2571
2572 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2573 let url: Url = Url::parse(&format!("http://localhost:{port}")).unwrap();
2574
2575 let handle = spawn({
2576 let url = url.clone();
2577 async move {
2578 let _ = app.serve(url, SequencerApiVersion::instance()).await;
2579 }
2580 });
2581
2582 Ok((url, handle))
2583 }
2584}
2585
2586#[cfg(test)]
2587mod api_tests {
2588 use std::{fmt::Debug, marker::PhantomData};
2589
2590 use committable::Committable;
2591 use data_source::testing::TestableSequencerDataSource;
2592 use espresso_types::{
2593 Header, Leaf2, MOCK_SEQUENCER_VERSIONS, NamespaceId, NamespaceProofQueryData,
2594 ValidatedState,
2595 traits::{EventConsumer, PersistenceOptions},
2596 };
2597 use futures::{future, stream::StreamExt};
2598 use hotshot_example_types::node_types::TEST_VERSIONS;
2599 use hotshot_query_service::availability::{
2600 AvailabilityDataSource, BlockQueryData, VidCommonQueryData,
2601 };
2602 use hotshot_types::{
2603 data::{
2604 DaProposal2, EpochNumber, QuorumProposal2, QuorumProposalWrapper, VidCommitment,
2605 VidDisperseShare, ns_table::parse_ns_table, vid_disperse::AvidMDisperseShare,
2606 },
2607 event::LeafInfo,
2608 message::Proposal,
2609 simple_certificate::{CertificatePair, QuorumCertificate2},
2610 traits::{EncodeBytes, signature_key::SignatureKey},
2611 utils::EpochTransitionIndicator,
2612 vid::avidm::{AvidMScheme, init_avidm_param},
2613 };
2614 use surf_disco::Client;
2615 use test_helpers::{
2616 TestNetwork, TestNetworkConfigBuilder, catchup_test_helper, state_signature_test_helper,
2617 status_test_helper, submit_test_helper,
2618 };
2619 use test_utils::reserve_tcp_port;
2620 use tide_disco::error::ServerError;
2621 use vbs::version::StaticVersion;
2622
2623 use super::{update::ApiEventConsumer, *};
2624 use crate::{
2625 network,
2626 persistence::no_storage::NoStorage,
2627 testing::{TestConfigBuilder, wait_for_decide_on_handle},
2628 };
2629
2630 #[rstest_reuse::template]
2631 #[rstest::rstest]
2632 #[case(PhantomData::<crate::api::sql::DataSource>)]
2633 #[case(PhantomData::<crate::api::fs::DataSource>)]
2634 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2635 pub fn testable_sequencer_data_source<D: TestableSequencerDataSource>(
2636 #[case] _d: PhantomData<D>,
2637 ) {
2638 }
2639
2640 #[rstest_reuse::apply(testable_sequencer_data_source)]
2641 pub(crate) async fn submit_test_with_query_module<D: TestableSequencerDataSource>(
2642 _d: PhantomData<D>,
2643 ) {
2644 let storage = D::create_storage().await;
2645 submit_test_helper(|opt| D::options(&storage, opt)).await
2646 }
2647
2648 #[rstest_reuse::apply(testable_sequencer_data_source)]
2649 pub(crate) async fn status_test_with_query_module<D: TestableSequencerDataSource>(
2650 _d: PhantomData<D>,
2651 ) {
2652 let storage = D::create_storage().await;
2653 status_test_helper(|opt| D::options(&storage, opt)).await
2654 }
2655
2656 #[rstest_reuse::apply(testable_sequencer_data_source)]
2657 pub(crate) async fn state_signature_test_with_query_module<D: TestableSequencerDataSource>(
2658 _d: PhantomData<D>,
2659 ) {
2660 let storage = D::create_storage().await;
2661 state_signature_test_helper(|opt| D::options(&storage, opt)).await
2662 }
2663
2664 #[rstest_reuse::apply(testable_sequencer_data_source)]
2665 pub(crate) async fn test_namespace_query<D: TestableSequencerDataSource>(_d: PhantomData<D>) {
2666 let ns_id = NamespaceId::from(42_u32);
2668 let txn = Transaction::new(ns_id, vec![1, 2, 3, 4]);
2669
2670 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2672 let storage = D::create_storage().await;
2673 let network_config = TestConfigBuilder::default().build();
2674 let config = TestNetworkConfigBuilder::default()
2675 .api_config(D::options(&storage, Options::with_port(port)).submit(Default::default()))
2676 .network_config(network_config)
2677 .build();
2678 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2679 let mut events = network.server.event_stream();
2680
2681 let client: Client<ServerError, StaticVersion<0, 1>> =
2683 Client::new(format!("http://localhost:{port}").parse().unwrap());
2684 client.connect(None).await;
2685
2686 let hash = client
2687 .post("submit/submit")
2688 .body_json(&txn)
2689 .unwrap()
2690 .send()
2691 .await
2692 .unwrap();
2693 assert_eq!(txn.commit(), hash);
2694
2695 let block_height = wait_for_decide_on_handle(&mut events, &txn).await.0 as usize;
2697 tracing::info!(block_height, "transaction sequenced");
2698
2699 let txn2 = Transaction::new(ns_id, vec![5, 6, 7, 8]);
2701 client
2702 .post::<Commitment<Transaction>>("submit/submit")
2703 .body_json(&txn2)
2704 .unwrap()
2705 .send()
2706 .await
2707 .unwrap();
2708 let block_height2 = wait_for_decide_on_handle(&mut events, &txn2).await.0 as usize;
2709 tracing::info!(block_height2, "transaction sequenced");
2710
2711 client
2713 .socket(&format!("availability/stream/blocks/{block_height2}"))
2714 .subscribe::<BlockQueryData<SeqTypes>>()
2715 .await
2716 .unwrap()
2717 .next()
2718 .await
2719 .unwrap()
2720 .unwrap();
2721
2722 let mut found_txn = false;
2723 let mut found_empty_block = false;
2724 for block_num in 0..=block_height {
2725 let header: Header = client
2726 .get(&format!("availability/header/{block_num}"))
2727 .send()
2728 .await
2729 .unwrap();
2730 let ns_query_res: NamespaceProofQueryData = client
2731 .get(&format!("availability/block/{block_num}/namespace/{ns_id}"))
2732 .send()
2733 .await
2734 .unwrap();
2735
2736 assert_eq!(
2738 ns_query_res,
2739 client
2740 .get(&format!(
2741 "availability/block/hash/{}/namespace/{ns_id}",
2742 header.commit()
2743 ))
2744 .send()
2745 .await
2746 .unwrap()
2747 );
2748 assert_eq!(
2749 ns_query_res,
2750 client
2751 .get(&format!(
2752 "availability/block/payload-hash/{}/namespace/{ns_id}",
2753 header.payload_commitment()
2754 ))
2755 .send()
2756 .await
2757 .unwrap()
2758 );
2759
2760 if let Some(ns_proof) = ns_query_res.proof {
2762 let vid_common: VidCommonQueryData<SeqTypes> = client
2763 .get(&format!("availability/vid/common/{block_num}"))
2764 .send()
2765 .await
2766 .unwrap();
2767 ns_proof
2768 .verify(
2769 header.ns_table(),
2770 &header.payload_commitment(),
2771 vid_common.common(),
2772 )
2773 .unwrap();
2774 } else {
2775 assert!(header.ns_table().find_ns_id(&ns_id).is_none());
2777 assert!(ns_query_res.transactions.is_empty());
2778 }
2779
2780 found_empty_block = found_empty_block || ns_query_res.transactions.is_empty();
2781
2782 for txn in ns_query_res.transactions {
2783 if txn.commit() == hash {
2784 found_txn = true;
2786 }
2787 }
2788 }
2789 assert!(found_txn);
2790 assert!(found_empty_block);
2791
2792 let ns_proofs: Vec<NamespaceProofQueryData> = client
2794 .get(&format!(
2795 "availability/block/{block_height}/{}/namespace/{ns_id}",
2796 block_height2 + 1
2797 ))
2798 .send()
2799 .await
2800 .unwrap();
2801 assert_eq!(ns_proofs.len(), block_height2 + 1 - block_height);
2802 assert_eq!(&ns_proofs[0].transactions, std::slice::from_ref(&txn));
2803 assert_eq!(
2804 &ns_proofs[ns_proofs.len() - 1].transactions,
2805 std::slice::from_ref(&txn2)
2806 );
2807 for proof in &ns_proofs[1..ns_proofs.len() - 1] {
2808 assert_eq!(proof.transactions, &[]);
2809 }
2810 }
2811
2812 #[rstest_reuse::apply(testable_sequencer_data_source)]
2813 pub(crate) async fn catchup_test_with_query_module<D: TestableSequencerDataSource>(
2814 _d: PhantomData<D>,
2815 ) {
2816 let storage = D::create_storage().await;
2817 catchup_test_helper(|opt| D::options(&storage, opt)).await
2818 }
2819
2820 #[rstest_reuse::apply(testable_sequencer_data_source)]
2821 pub async fn test_non_consecutive_decide_with_failing_event_consumer<D>(_d: PhantomData<D>)
2822 where
2823 D: TestableSequencerDataSource + Debug + 'static,
2824 {
2825 use hotshot_types::new_protocol::CoordinatorEvent;
2826
2827 #[derive(Clone, Copy, Debug)]
2828 struct FailConsumer;
2829
2830 #[async_trait]
2831 impl EventConsumer for FailConsumer {
2832 async fn handle_event(&self, _: &CoordinatorEvent<SeqTypes>) -> anyhow::Result<()> {
2833 bail!("mock error injection");
2834 }
2835 }
2836
2837 let (pubkey, privkey) = PubKey::generated_from_seed_indexed([0; 32], 1);
2838
2839 let storage = D::create_storage().await;
2840 let persistence = D::persistence_options(&storage).create().await.unwrap();
2841 let data_source: Arc<StorageState<network::Memory, NoStorage, _>> =
2842 Arc::new(StorageState::new(
2843 D::create(D::persistence_options(&storage), Default::default(), false)
2844 .await
2845 .unwrap(),
2846 ApiState::new(future::pending()),
2847 ));
2848
2849 let mut chain1 = vec![];
2851
2852 let genesis = Leaf2::genesis(
2853 &Default::default(),
2854 &NodeState::mock(),
2855 TEST_VERSIONS.test.base,
2856 )
2857 .await;
2858 let payload = genesis.block_payload().unwrap();
2859 let payload_bytes_arc = payload.encode();
2860
2861 let avidm_param = init_avidm_param(2).unwrap();
2862 let weights = vec![1u32; 2];
2863
2864 let ns_table = parse_ns_table(payload.byte_len().as_usize(), &payload.ns_table().encode());
2865 let (payload_commitment, shares) =
2866 AvidMScheme::ns_disperse(&avidm_param, &weights, &payload_bytes_arc, ns_table).unwrap();
2867
2868 let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
2869 proposal: QuorumProposal2::<SeqTypes> {
2870 block_header: genesis.block_header().clone(),
2871 view_number: ViewNumber::genesis(),
2872 justify_qc: QuorumCertificate2::genesis(
2873 &ValidatedState::default(),
2874 &NodeState::mock(),
2875 MOCK_SEQUENCER_VERSIONS,
2876 )
2877 .await,
2878 upgrade_certificate: None,
2879 view_change_evidence: None,
2880 next_drb_result: None,
2881 next_epoch_justify_qc: None,
2882 epoch: None,
2883 state_cert: None,
2884 },
2885 };
2886 let mut qc = QuorumCertificate2::genesis(
2887 &ValidatedState::default(),
2888 &NodeState::mock(),
2889 MOCK_SEQUENCER_VERSIONS,
2890 )
2891 .await;
2892
2893 let mut justify_qc = qc.clone();
2894 for i in 0..5 {
2895 *quorum_proposal.proposal.block_header.height_mut() = i;
2896 quorum_proposal.proposal.view_number = ViewNumber::new(i);
2897 quorum_proposal.proposal.justify_qc = justify_qc;
2898 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
2899 qc.view_number = leaf.view_number();
2900 qc.data.leaf_commit = Committable::commit(&leaf);
2901 justify_qc = qc.clone();
2902 chain1.push((leaf.clone(), CertificatePair::non_epoch_change(qc.clone())));
2903
2904 let quorum_proposal_signature =
2906 PubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2907 .expect("Failed to sign quorum_proposal");
2908 persistence
2909 .append_quorum_proposal2(&Proposal {
2910 data: quorum_proposal.clone(),
2911 signature: quorum_proposal_signature,
2912 _pd: Default::default(),
2913 })
2914 .await
2915 .unwrap();
2916
2917 let share: VidDisperseShare<SeqTypes> = AvidMDisperseShare {
2919 view_number: leaf.view_number(),
2920 payload_commitment,
2921 share: shares[0].clone(),
2922 recipient_key: pubkey,
2923 epoch: Some(EpochNumber::new(0)),
2924 target_epoch: Some(EpochNumber::new(0)),
2925 common: avidm_param.clone(),
2926 }
2927 .into();
2928
2929 persistence
2930 .append_vid(&share.to_proposal(&privkey).unwrap())
2931 .await
2932 .unwrap();
2933
2934 let block_payload_signature =
2936 PubKey::sign(&privkey, &payload_bytes_arc).expect("Failed to sign block payload");
2937 let da_proposal_inner = DaProposal2::<SeqTypes> {
2938 encoded_transactions: payload_bytes_arc.clone(),
2939 metadata: payload.ns_table().clone(),
2940 view_number: leaf.view_number(),
2941 epoch: Some(EpochNumber::new(0)),
2942 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
2943 };
2944 let da_proposal = Proposal {
2945 data: da_proposal_inner,
2946 signature: block_payload_signature,
2947 _pd: Default::default(),
2948 };
2949 persistence
2950 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
2951 .await
2952 .unwrap();
2953 }
2954 let mut chain2 = chain1.split_off(2);
2956 chain2.remove(0);
2958
2959 let leaf_chain = chain1
2961 .iter()
2962 .map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
2963 .collect::<Vec<_>>();
2964 tracing::info!("decide with event handling failure");
2965 persistence
2966 .append_decided_leaves(
2967 ViewNumber::new(1),
2968 leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
2969 None,
2970 &FailConsumer,
2971 )
2972 .await
2973 .unwrap();
2974
2975 let consumer = ApiEventConsumer::from(data_source.clone());
2978 let leaf_chain = chain2
2979 .iter()
2980 .map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
2981 .collect::<Vec<_>>();
2982 tracing::info!("decide successfully");
2983 persistence
2984 .append_decided_leaves(
2985 ViewNumber::new(4),
2986 leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
2987 None,
2988 &consumer,
2989 )
2990 .await
2991 .unwrap();
2992
2993 for (leaf, cert) in chain1.iter().chain(&chain2) {
2996 tracing::info!(height = leaf.height(), "check archive");
2997 let qd = data_source.get_leaf(leaf.height() as usize).await.await;
2998 let stored_leaf: Leaf2 = qd.leaf().clone();
2999 let stored_qc = qd.qc().clone();
3000 assert_eq!(&stored_leaf, leaf);
3001 assert_eq!(&stored_qc, cert.qc());
3002
3003 data_source
3004 .get_block(leaf.height() as usize)
3005 .await
3006 .try_resolve()
3007 .ok()
3008 .unwrap();
3009 data_source
3010 .get_vid_common(leaf.height() as usize)
3011 .await
3012 .try_resolve()
3013 .ok()
3014 .unwrap();
3015
3016 assert!(
3018 persistence
3019 .load_da_proposal(leaf.view_number())
3020 .await
3021 .unwrap()
3022 .is_none()
3023 );
3024 assert!(
3025 persistence
3026 .load_vid_share(leaf.view_number())
3027 .await
3028 .unwrap()
3029 .is_none()
3030 );
3031 assert!(
3032 persistence
3033 .load_quorum_proposal(leaf.view_number())
3034 .await
3035 .is_err()
3036 );
3037 }
3038
3039 assert!(
3041 persistence
3042 .load_da_proposal(ViewNumber::new(2))
3043 .await
3044 .unwrap()
3045 .is_some()
3046 );
3047 assert!(
3048 persistence
3049 .load_vid_share(ViewNumber::new(2))
3050 .await
3051 .unwrap()
3052 .is_some()
3053 );
3054 persistence
3055 .load_quorum_proposal(ViewNumber::new(2))
3056 .await
3057 .unwrap();
3058 }
3059
3060 #[rstest_reuse::apply(testable_sequencer_data_source)]
3061 pub async fn test_decide_missing_data<D>(_d: PhantomData<D>)
3062 where
3063 D: TestableSequencerDataSource + Debug + 'static,
3064 {
3065 use ark_serialize::CanonicalDeserialize;
3066
3067 let storage = D::create_storage().await;
3068 let persistence = D::persistence_options(&storage).create().await.unwrap();
3069 let data_source: Arc<StorageState<network::Memory, NoStorage, _>> =
3070 Arc::new(StorageState::new(
3071 D::create(D::persistence_options(&storage), Default::default(), false)
3072 .await
3073 .unwrap(),
3074 ApiState::new(future::pending()),
3075 ));
3076 let consumer = ApiEventConsumer::from(data_source.clone());
3077
3078 let mut qc = QuorumCertificate2::genesis(
3079 &ValidatedState::default(),
3080 &NodeState::mock(),
3081 MOCK_SEQUENCER_VERSIONS,
3082 )
3083 .await;
3084 let leaf = Leaf2::genesis(
3085 &ValidatedState::default(),
3086 &NodeState::mock(),
3087 TEST_VERSIONS.test.base,
3088 )
3089 .await;
3090
3091 tracing::info!(?leaf, ?qc, "decide genesis leaf");
3095 persistence
3096 .append_decided_leaves(
3097 leaf.view_number(),
3098 [(
3099 &leaf_info(leaf.clone()),
3100 CertificatePair::non_epoch_change(qc.clone()),
3101 )],
3102 None,
3103 &consumer,
3104 )
3105 .await
3106 .unwrap();
3107
3108 let mut block_header = leaf.block_header().clone();
3112 *block_header.height_mut() += 1;
3113 *block_header.payload_commitment_mut() = VidCommitment::V1(
3114 CanonicalDeserialize::deserialize_uncompressed_unchecked([1u8; 32].as_slice()).unwrap(),
3115 );
3116 let qp = QuorumProposalWrapper {
3117 proposal: QuorumProposal2 {
3118 block_header,
3119 view_number: leaf.view_number() + 1,
3120 justify_qc: qc.clone(),
3121 upgrade_certificate: None,
3122 view_change_evidence: None,
3123 next_drb_result: None,
3124 next_epoch_justify_qc: None,
3125 epoch: None,
3126 state_cert: None,
3127 },
3128 };
3129
3130 let leaf = Leaf2::from_quorum_proposal(&qp);
3131 qc.view_number = leaf.view_number();
3132 qc.data.leaf_commit = Committable::commit(&leaf);
3133
3134 tracing::info!(?leaf, ?qc, "append leaf 1");
3136 persistence
3137 .append_decided_leaves(
3138 leaf.view_number(),
3139 [(
3140 &leaf_info(leaf.clone()),
3141 CertificatePair::non_epoch_change(qc),
3142 )],
3143 None,
3144 &consumer,
3145 )
3146 .await
3147 .unwrap();
3148
3149 assert_eq!(leaf, data_source.get_leaf(1).await.await.leaf().clone());
3151 assert!(data_source.get_vid_common(1).await.is_pending());
3152 assert!(data_source.get_block(1).await.is_pending());
3153 }
3154
3155 fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
3156 LeafInfo {
3157 leaf,
3158 vid_share: None,
3159 state: Default::default(),
3160 delta: None,
3161 state_cert: None,
3162 }
3163 }
3164}
3165
3166#[cfg(test)]
3167mod test {
3168 use std::{
3169 collections::{HashMap, HashSet},
3170 time::Duration,
3171 };
3172
3173 use ::light_client::{
3174 consensus::{
3175 header::HeaderProof,
3176 leaf::{LeafProof, LeafProofHint},
3177 payload::PayloadProof,
3178 },
3179 testing::{EpochChangeQuorum, LEGACY_VERSION},
3180 };
3181 use alloy::{
3182 eips::BlockId,
3183 network::EthereumWallet,
3184 primitives::{Address, U256},
3185 providers::ProviderBuilder,
3186 };
3187 use async_lock::Mutex;
3188 use committable::{Commitment, Committable};
3189 use espresso_contract_deployer::{
3190 Contract, Contracts, builder::DeployerArgsBuilder,
3191 network_config::light_client_genesis_from_stake_table, upgrade_stake_table_v2,
3192 upgrade_stake_table_v3,
3193 };
3194 use espresso_types::{
3195 ADVZNamespaceProofQueryData, FeeAmount, Header, L1Client, L1ClientOptions,
3196 MOCK_SEQUENCER_VERSIONS, NamespaceId, NamespaceProofQueryData, NsProof,
3197 RegisteredValidatorMap, RewardDistributor, StakeTableState, StateCertQueryDataV1,
3198 StateCertQueryDataV2, ValidatedState, ValidatorLeaderCounts,
3199 config::PublicHotShotConfig,
3200 traits::{MembershipPersistence, NullEventConsumer, PersistenceOptions},
3201 v0_3::{COMMISSION_BASIS_POINTS, Fetcher, RewardAmount, RewardMerkleProofV1},
3202 v0_4::{RewardAccountV2, RewardMerkleProofV2},
3203 validators_from_l1_events,
3204 };
3205 use futures::{
3206 future::{self, join_all, try_join_all},
3207 stream::{StreamExt, TryStreamExt},
3208 try_join,
3209 };
3210 use hotshot::types::{Event, EventType};
3211 use hotshot_contract_adapter::{
3212 reward::RewardClaimInput,
3213 sol_types::{EspToken, StakeTableV3},
3214 stake_table::StakeTableContractVersion,
3215 };
3216 use hotshot_query_service::{
3217 availability::{
3218 BlockQueryData, BlockSummaryQueryData, LeafQueryData, TransactionQueryData,
3219 VidCommonQueryData,
3220 },
3221 data_source::{
3222 VersionedDataSource,
3223 sql::Config,
3224 storage::{SqlStorage, StorageConnectionType},
3225 },
3226 explorer::TransactionSummariesResponse,
3227 types::HeightIndexed,
3228 };
3229 use hotshot_types::{
3230 ValidatorConfig,
3231 addr::NetAddr,
3232 data::EpochNumber,
3233 event::LeafInfo,
3234 new_protocol::CoordinatorEvent,
3235 traits::{block_contents::BlockHeader, election::Membership, metrics::NoMetrics},
3236 utils::epoch_from_block_number,
3237 x25519,
3238 };
3239 use jf_merkle_tree_compat::{
3240 MerkleTreeScheme,
3241 prelude::{MerkleProof, Sha3Node},
3242 };
3243 use pretty_assertions::assert_matches;
3244 use rand::seq::SliceRandom;
3245 use rstest::rstest;
3246 use staking_cli::{
3247 demo::DelegationConfig, fetch_commission, update_commission, update_network_config,
3248 };
3249 use surf_disco::Client;
3250 use test_helpers::{
3251 TestNetwork, TestNetworkConfigBuilder, catchup_test_helper, state_signature_test_helper,
3252 status_test_helper, submit_test_helper,
3253 };
3254 use test_utils::reserve_tcp_port;
3255 use tide_disco::{
3256 Error, StatusCode, Url, app::AppHealth, error::ServerError, healthcheck::HealthStatus,
3257 };
3258 use tokio::time::sleep;
3259 use vbs::version::StaticVersion;
3260 use versions::{
3261 DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION, EPOCH_VERSION, FEE_VERSION, Upgrade,
3262 version,
3263 };
3264
3265 use self::{
3266 data_source::testing::TestableSequencerDataSource, options::HotshotEvents,
3267 sql::DataSource as SqlDataSource,
3268 };
3269 use super::*;
3270
3271 async fn wait_until_block_height(
3272 client: &Client<ServerError, StaticVersion<0, 1>>,
3273 endpoint: &str,
3274 height: u64,
3275 ) {
3276 for _retry in 0.. {
3277 let bh = client
3278 .get::<u64>(endpoint)
3279 .send()
3280 .await
3281 .expect("block height not found");
3282
3283 if bh >= height {
3284 return;
3285 }
3286 sleep(Duration::from_secs(3)).await;
3287 }
3288 }
3289 use crate::{
3290 api::{
3291 options::Query,
3292 sql::{impl_testable_data_source::tmp_options, reconstruct_state},
3293 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3294 },
3295 catchup::{NullStateCatchup, StatePeers},
3296 persistence,
3297 persistence::no_storage,
3298 testing::{TestConfig, TestConfigBuilder, wait_for_decide_on_handle, wait_for_epochs},
3299 };
3300
3301 const POS_V3: Upgrade = Upgrade::trivial(version(0, 3));
3302 const POS_V4: Upgrade = Upgrade::trivial(version(0, 4));
3303
3304 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3305 async fn test_healthcheck() {
3306 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3307 let url = format!("http://localhost:{port}").parse().unwrap();
3308 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
3309 let options = Options::with_port(port);
3310 let network_config = TestConfigBuilder::default().build();
3311 let config = TestNetworkConfigBuilder::<5, _, NullStateCatchup>::default()
3312 .api_config(options)
3313 .network_config(network_config)
3314 .build();
3315 let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3316
3317 client.connect(None).await;
3318 let health = client.get::<AppHealth>("healthcheck").send().await.unwrap();
3319 assert_eq!(health.status, HealthStatus::Available);
3320 }
3321
3322 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3323 async fn status_test_without_query_module() {
3324 status_test_helper(|opt| opt).await
3325 }
3326
3327 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3328 async fn submit_test_without_query_module() {
3329 submit_test_helper(|opt| opt).await
3330 }
3331
3332 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3333 async fn state_signature_test_without_query_module() {
3334 state_signature_test_helper(|opt| opt).await
3335 }
3336
3337 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3338 async fn catchup_test_without_query_module() {
3339 catchup_test_helper(|opt| opt).await
3340 }
3341
3342 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3343 async fn test_leaf_only_data_source() {
3344 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3345
3346 let storage = SqlDataSource::create_storage().await;
3347 let options =
3348 SqlDataSource::leaf_only_ds_options(&storage, Options::with_port(port)).unwrap();
3349
3350 let network_config = TestConfigBuilder::default().build();
3351 let config = TestNetworkConfigBuilder::default()
3352 .api_config(options)
3353 .network_config(network_config)
3354 .build();
3355 let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3356 let url = format!("http://localhost:{port}").parse().unwrap();
3357 let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
3358
3359 tracing::info!("waiting for blocks");
3360 client.connect(Some(Duration::from_secs(15))).await;
3361 let account = TestConfig::<5>::builder_key().fee_account();
3364
3365 let _headers = client
3366 .socket("availability/stream/headers/0")
3367 .subscribe::<Header>()
3368 .await
3369 .unwrap()
3370 .take(10)
3371 .try_collect::<Vec<_>>()
3372 .await
3373 .unwrap();
3374
3375 for i in 1..5 {
3376 let leaf = client
3377 .get::<LeafQueryData<SeqTypes>>(&format!("availability/leaf/{i}"))
3378 .send()
3379 .await
3380 .unwrap();
3381
3382 assert_eq!(leaf.height(), i);
3383
3384 let header = client
3385 .get::<Header>(&format!("availability/header/{i}"))
3386 .send()
3387 .await
3388 .unwrap();
3389
3390 assert_eq!(header.height(), i);
3391
3392 let vid = client
3393 .get::<VidCommonQueryData<SeqTypes>>(&format!("availability/vid/common/{i}"))
3394 .send()
3395 .await
3396 .unwrap();
3397
3398 assert_eq!(vid.height(), i);
3399
3400 client
3401 .get::<MerkleProof<Commitment<Header>, u64, Sha3Node, 3>>(&format!(
3402 "block-state/{i}/{}",
3403 i - 1
3404 ))
3405 .send()
3406 .await
3407 .unwrap();
3408
3409 client
3410 .get::<MerkleProof<FeeAmount, FeeAccount, Sha3Node, 256>>(&format!(
3411 "fee-state/{}/{}",
3412 i + 1,
3413 account
3414 ))
3415 .send()
3416 .await
3417 .unwrap();
3418 }
3419
3420 client
3423 .get::<BlockQueryData<SeqTypes>>("availability/block/1")
3424 .send()
3425 .await
3426 .unwrap_err();
3427 }
3428
3429 async fn run_catchup_test(url_suffix: &str) {
3430 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3432 const NUM_NODES: usize = 5;
3433
3434 let url: url::Url = format!("http://localhost:{port}{url_suffix}")
3435 .parse()
3436 .unwrap();
3437
3438 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3439 .api_config(Options::with_port(port))
3440 .network_config(TestConfigBuilder::default().build())
3441 .catchups(std::array::from_fn(|_| {
3442 StatePeers::<StaticVersion<0, 1>>::from_urls(
3443 vec![url.clone()],
3444 Default::default(),
3445 Duration::from_secs(2),
3446 &NoMetrics,
3447 )
3448 }))
3449 .build();
3450 let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3451
3452 let mut events = network.peers[0].event_stream();
3454 loop {
3455 let event = events.next().await.unwrap();
3456 let CoordinatorEvent::LegacyEvent(Event {
3457 event: EventType::Decide { leaf_chain, .. },
3458 ..
3459 }) = event
3460 else {
3461 continue;
3462 };
3463 if leaf_chain[0].leaf.height() > 0 {
3464 break;
3465 }
3466 }
3467
3468 tracing::info!("shutting down node");
3473 network.peers.remove(0);
3474
3475 network
3477 .server
3478 .event_stream()
3479 .filter(|event| {
3480 future::ready(matches!(
3481 event,
3482 CoordinatorEvent::LegacyEvent(Event {
3483 event: EventType::Decide { .. },
3484 ..
3485 })
3486 ))
3487 })
3488 .take(3)
3489 .collect::<Vec<_>>()
3490 .await;
3491
3492 tracing::info!("restarting node");
3493 let node = network
3494 .cfg
3495 .init_node(
3496 1,
3497 ValidatedState::default(),
3498 no_storage::Options,
3499 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
3500 vec![url],
3501 Default::default(),
3502 Duration::from_secs(2),
3503 &NoMetrics,
3504 )),
3505 None,
3506 &NoMetrics,
3507 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3508 NullEventConsumer,
3509 MOCK_SEQUENCER_VERSIONS,
3510 Default::default(),
3511 )
3512 .await;
3513 let mut events = node.event_stream();
3514
3515 let mut proposers = [false; NUM_NODES];
3518 loop {
3519 let event = events.next().await.unwrap();
3520 let CoordinatorEvent::LegacyEvent(Event {
3521 event: EventType::Decide { leaf_chain, .. },
3522 ..
3523 }) = event
3524 else {
3525 continue;
3526 };
3527 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
3528 let height = leaf.height();
3529 let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
3530 if height == 0 {
3531 continue;
3532 }
3533
3534 tracing::info!(
3535 "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
3536 );
3537 proposers[leaf_builder] = true;
3538 }
3539
3540 if proposers.iter().all(|has_proposed| *has_proposed) {
3541 break;
3542 }
3543 }
3544 }
3545
3546 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3547 async fn test_catchup() {
3548 run_catchup_test("").await;
3549 }
3550
3551 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3552 async fn test_catchup_v0() {
3553 run_catchup_test("/v0").await;
3554 }
3555
3556 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3557 async fn test_catchup_v1() {
3558 run_catchup_test("/v1").await;
3559 }
3560
3561 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3562 async fn test_catchup_no_state_peers() {
3563 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3565 const NUM_NODES: usize = 5;
3566 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3567 .api_config(Options::with_port(port))
3568 .network_config(TestConfigBuilder::default().build())
3569 .build();
3570 let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3571
3572 let mut events = network.peers[0].event_stream();
3574 loop {
3575 let event = events.next().await.unwrap();
3576 let CoordinatorEvent::LegacyEvent(Event {
3577 event: EventType::Decide { leaf_chain, .. },
3578 ..
3579 }) = event
3580 else {
3581 continue;
3582 };
3583 if leaf_chain[0].leaf.height() > 0 {
3584 break;
3585 }
3586 }
3587
3588 tracing::info!("shutting down node");
3593 network.peers.remove(0);
3594
3595 network
3597 .server
3598 .event_stream()
3599 .filter(|event| {
3600 future::ready(matches!(
3601 event,
3602 CoordinatorEvent::LegacyEvent(Event {
3603 event: EventType::Decide { .. },
3604 ..
3605 })
3606 ))
3607 })
3608 .take(3)
3609 .collect::<Vec<_>>()
3610 .await;
3611
3612 tracing::info!("restarting node");
3613 let node = network
3614 .cfg
3615 .init_node(
3616 1,
3617 ValidatedState::default(),
3618 no_storage::Options,
3619 None::<NullStateCatchup>,
3620 None,
3621 &NoMetrics,
3622 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3623 NullEventConsumer,
3624 MOCK_SEQUENCER_VERSIONS,
3625 Default::default(),
3626 )
3627 .await;
3628 let mut events = node.event_stream();
3629
3630 let mut proposers = [false; NUM_NODES];
3633 loop {
3634 let event = events.next().await.unwrap();
3635 let CoordinatorEvent::LegacyEvent(Event {
3636 event: EventType::Decide { leaf_chain, .. },
3637 ..
3638 }) = event
3639 else {
3640 continue;
3641 };
3642 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
3643 let height = leaf.height();
3644 let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
3645 if height == 0 {
3646 continue;
3647 }
3648
3649 tracing::info!(
3650 "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
3651 );
3652 proposers[leaf_builder] = true;
3653 }
3654
3655 if proposers.iter().all(|has_proposed| *has_proposed) {
3656 break;
3657 }
3658 }
3659 }
3660
3661 #[ignore]
3662 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3663 async fn test_catchup_epochs_no_state_peers() {
3664 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3666 const EPOCH_HEIGHT: u64 = 5;
3667 let network_config = TestConfigBuilder::default()
3668 .epoch_height(EPOCH_HEIGHT)
3669 .build();
3670 const NUM_NODES: usize = 5;
3671 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3672 .api_config(Options::with_port(port))
3673 .network_config(network_config)
3674 .build();
3675 let mut network = TestNetwork::new(config, Upgrade::trivial(EPOCH_VERSION)).await;
3676
3677 let mut events = network.peers[0].event_stream();
3679 loop {
3680 let event = events.next().await.unwrap();
3681 let CoordinatorEvent::LegacyEvent(Event {
3682 event: EventType::Decide { leaf_chain, .. },
3683 ..
3684 }) = event
3685 else {
3686 continue;
3687 };
3688 tracing::error!("got decide height {}", leaf_chain[0].leaf.height());
3689
3690 if leaf_chain[0].leaf.height() > EPOCH_HEIGHT * 3 {
3691 tracing::error!("decided past one epoch");
3692 break;
3693 }
3694 }
3695
3696 tracing::info!("shutting down node");
3701 network.peers.remove(0);
3702
3703 network
3705 .server
3706 .event_stream()
3707 .filter(|event| {
3708 future::ready(matches!(
3709 event,
3710 CoordinatorEvent::LegacyEvent(Event {
3711 event: EventType::Decide { .. },
3712 ..
3713 })
3714 ))
3715 })
3716 .take(3)
3717 .collect::<Vec<_>>()
3718 .await;
3719
3720 tracing::error!("restarting node");
3721 let node = network
3722 .cfg
3723 .init_node(
3724 1,
3725 ValidatedState::default(),
3726 no_storage::Options,
3727 None::<NullStateCatchup>,
3728 None,
3729 &NoMetrics,
3730 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3731 NullEventConsumer,
3732 MOCK_SEQUENCER_VERSIONS,
3733 Default::default(),
3734 )
3735 .await;
3736 let mut events = node.event_stream();
3737
3738 let mut proposers = [false; NUM_NODES];
3741 loop {
3742 let event = events.next().await.unwrap();
3743 let CoordinatorEvent::LegacyEvent(Event {
3744 event: EventType::Decide { leaf_chain, .. },
3745 ..
3746 }) = event
3747 else {
3748 continue;
3749 };
3750 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
3751 let height = leaf.height();
3752 let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
3753 if height == 0 {
3754 continue;
3755 }
3756
3757 tracing::info!(
3758 "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
3759 );
3760 proposers[leaf_builder] = true;
3761 }
3762
3763 if proposers.iter().all(|has_proposed| *has_proposed) {
3764 break;
3765 }
3766 }
3767 }
3768
3769 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3770 async fn test_chain_config_from_instance() {
3771 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3777
3778 let chain_config: ChainConfig = ChainConfig::default();
3779
3780 let state = ValidatedState {
3781 chain_config: chain_config.commit().into(),
3782 ..Default::default()
3783 };
3784
3785 let states = std::array::from_fn(|_| state.clone());
3786
3787 let config = TestNetworkConfigBuilder::default()
3788 .api_config(Options::with_port(port))
3789 .states(states)
3790 .catchups(std::array::from_fn(|_| {
3791 StatePeers::<StaticVersion<0, 1>>::from_urls(
3792 vec![format!("http://localhost:{port}").parse().unwrap()],
3793 Default::default(),
3794 Duration::from_secs(2),
3795 &NoMetrics,
3796 )
3797 }))
3798 .network_config(TestConfigBuilder::default().build())
3799 .build();
3800
3801 let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3802
3803 network
3805 .server
3806 .event_stream()
3807 .filter(|event| {
3808 future::ready(matches!(
3809 event,
3810 CoordinatorEvent::LegacyEvent(Event {
3811 event: EventType::Decide { .. },
3812 ..
3813 })
3814 ))
3815 })
3816 .take(3)
3817 .collect::<Vec<_>>()
3818 .await;
3819
3820 for peer in &network.peers {
3821 let state = peer.consensus_handle().decided_state().await.unwrap();
3822
3823 assert_eq!(state.chain_config.resolve().unwrap(), chain_config)
3824 }
3825
3826 network.server.shut_down().await;
3827 drop(network);
3828 }
3829
3830 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3831 async fn test_chain_config_catchup() {
3832 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3838
3839 let cf = ChainConfig {
3840 max_block_size: 300.into(),
3841 base_fee: 1.into(),
3842 ..Default::default()
3843 };
3844
3845 let state1 = ValidatedState {
3847 chain_config: cf.commit().into(),
3848 ..Default::default()
3849 };
3850
3851 let state2 = ValidatedState {
3853 chain_config: cf.into(),
3854 ..Default::default()
3855 };
3856
3857 let mut states = std::array::from_fn(|_| state1.clone());
3858 states[0] = state2;
3861
3862 const NUM_NODES: usize = 5;
3863 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3864 .api_config(Options::from(options::Http {
3865 port,
3866 max_connections: None,
3867 axum_port: None,
3868 tonic_port: None,
3869 }))
3870 .states(states)
3871 .catchups(std::array::from_fn(|_| {
3872 StatePeers::<StaticVersion<0, 1>>::from_urls(
3873 vec![format!("http://localhost:{port}").parse().unwrap()],
3874 Default::default(),
3875 Duration::from_secs(2),
3876 &NoMetrics,
3877 )
3878 }))
3879 .network_config(TestConfigBuilder::default().build())
3880 .build();
3881
3882 let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3883
3884 network
3886 .server
3887 .event_stream()
3888 .filter(|event| {
3889 future::ready(matches!(
3890 event,
3891 CoordinatorEvent::LegacyEvent(Event {
3892 event: EventType::Decide { .. },
3893 ..
3894 })
3895 ))
3896 })
3897 .take(3)
3898 .collect::<Vec<_>>()
3899 .await;
3900
3901 for peer in &network.peers {
3902 let state = peer.consensus_handle().decided_state().await.unwrap();
3903
3904 assert_eq!(state.chain_config.resolve().unwrap(), cf)
3905 }
3906
3907 network.server.shut_down().await;
3908 drop(network);
3909 }
3910
3911 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3912 async fn test_pos_upgrade_view_based() {
3913 test_upgrade_helper(Upgrade::new(FEE_VERSION, EPOCH_VERSION)).await;
3914 }
3915
3916 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3917 async fn test_epoch_reward_upgrade() {
3918 test_upgrade_helper_with_nodes::<3>(Upgrade::new(
3921 versions::DRB_AND_HEADER_UPGRADE_VERSION,
3922 versions::EPOCH_REWARD_VERSION,
3923 ))
3924 .await;
3925 }
3926
3927 async fn test_upgrade_helper(upgrade: Upgrade) {
3928 test_upgrade_helper_with_nodes::<5>(upgrade).await;
3929 }
3930
3931 async fn test_upgrade_helper_with_nodes<const NUM_NODES: usize>(upgrade: Upgrade) {
3932 let wait_extra_views = 10;
3935 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3936 let epoch_start_block = if upgrade.base >= versions::EPOCH_VERSION {
3937 0
3938 } else {
3939 321
3940 };
3941
3942 let test_config = TestConfigBuilder::default()
3943 .epoch_height(200)
3944 .epoch_start_block(epoch_start_block)
3945 .set_upgrades(upgrade.target)
3946 .await
3947 .build();
3948
3949 let chain_config_genesis = ValidatedState::default().chain_config.resolve().unwrap();
3950 let chain_config_upgrade = test_config.get_upgrade_map().chain_config(upgrade.target);
3951 assert_ne!(chain_config_genesis, chain_config_upgrade);
3952 tracing::debug!(?chain_config_genesis, ?chain_config_upgrade);
3953
3954 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3955 let persistence: [_; NUM_NODES] = storage
3956 .iter()
3957 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3958 .collect::<Vec<_>>()
3959 .try_into()
3960 .unwrap();
3961
3962 let mut builder = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3963 .api_config(SqlDataSource::options(
3964 &storage[0],
3965 Options::with_port(port),
3966 ))
3967 .persistences(persistence)
3968 .catchups(std::array::from_fn(|_| {
3969 StatePeers::<SequencerApiVersion>::from_urls(
3970 vec![format!("http://localhost:{port}").parse().unwrap()],
3971 Default::default(),
3972 Duration::from_secs(2),
3973 &NoMetrics,
3974 )
3975 }))
3976 .network_config(test_config);
3977
3978 if upgrade.base >= versions::EPOCH_VERSION {
3981 let state = ValidatedState {
3982 chain_config: chain_config_upgrade.into(),
3983 ..Default::default()
3984 };
3985 builder = builder.states(std::array::from_fn(|_| state.clone()));
3986 }
3987
3988 let config = builder.build();
3989
3990 let mut network = TestNetwork::new(config, upgrade).await;
3991 let _events = network.server.event_stream();
3992
3993 let target = upgrade.target;
3994
3995 let mut hotshot_events = network
4002 .server
4003 .consensus_handle()
4004 .legacy_consensus()
4005 .read()
4006 .await
4007 .event_stream();
4008 let upgrade = loop {
4009 let event = hotshot_events.next().await.unwrap();
4010 if let EventType::UpgradeProposal { proposal, .. } = event.event {
4011 tracing::info!(?proposal, "proposal");
4012 let upgrade = proposal.data.upgrade_proposal;
4013 let new_version = upgrade.new_version;
4014 tracing::info!(?new_version, "upgrade proposal new version");
4015 assert_eq!(new_version, target);
4016 break upgrade;
4017 }
4018 };
4019
4020 let wanted_view = upgrade.new_version_first_view + wait_extra_views;
4021 loop {
4023 let event = hotshot_events.next().await.unwrap();
4024 let view_number = event.view_number;
4025
4026 tracing::debug!(?view_number, ?upgrade.new_version_first_view, "upgrade_new_view");
4027 if view_number > wanted_view {
4028 tracing::info!(?view_number, ?upgrade.new_version_first_view, "passed upgrade view");
4029 let states =
4030 join_all(network.peers.iter().map(|peer| async {
4031 peer.consensus_handle().decided_state().await.unwrap()
4032 }))
4033 .await;
4034 let leaves = join_all(
4035 network
4036 .peers
4037 .iter()
4038 .map(|peer| async { peer.consensus_handle().decided_leaf().await }),
4039 )
4040 .await;
4041 let configs: Vec<ChainConfig> = states
4042 .iter()
4043 .map(|state| state.chain_config.resolve().unwrap())
4044 .collect();
4045
4046 tracing::info!(?leaves, ?configs, "post upgrade state");
4047 for config in configs {
4048 assert_eq!(config, chain_config_upgrade);
4049 }
4050 for leaf in leaves {
4051 assert_eq!(leaf.block_header().version(), target);
4052 }
4053 break;
4054 }
4055 sleep(Duration::from_millis(200)).await;
4056 }
4057
4058 network.server.shut_down().await;
4059 }
4060
4061 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4062 pub(crate) async fn test_restart() {
4063 const NUM_NODES: usize = 5;
4064 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4066 let persistence: [_; NUM_NODES] = storage
4067 .iter()
4068 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4069 .collect::<Vec<_>>()
4070 .try_into()
4071 .unwrap();
4072 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4073 let config = TestNetworkConfigBuilder::default()
4074 .api_config(SqlDataSource::options(
4075 &storage[0],
4076 Options::with_port(port),
4077 ))
4078 .persistences(persistence.clone())
4079 .network_config(TestConfigBuilder::default().build())
4080 .build();
4081 let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
4082
4083 let client: Client<ServerError, SequencerApiVersion> =
4085 Client::new(format!("http://localhost:{port}").parse().unwrap());
4086 client.connect(None).await;
4087 tracing::info!(port, "server running");
4088
4089 client
4091 .socket("availability/stream/blocks/0")
4092 .subscribe::<BlockQueryData<SeqTypes>>()
4093 .await
4094 .unwrap()
4095 .take(3)
4096 .collect::<Vec<_>>()
4097 .await;
4098
4099 tracing::info!("shutting down nodes");
4101 network.stop_consensus().await;
4102
4103 let height = client
4105 .get::<usize>("status/block-height")
4106 .send()
4107 .await
4108 .unwrap();
4109 tracing::info!("decided {height} blocks before shutting down");
4110
4111 let chain: Vec<LeafQueryData<SeqTypes>> = client
4113 .socket("availability/stream/leaves/0")
4114 .subscribe()
4115 .await
4116 .unwrap()
4117 .take(height)
4118 .try_collect()
4119 .await
4120 .unwrap();
4121 let decided_view = chain.last().unwrap().leaf().view_number();
4122
4123 let state = network.server.decided_state().await.unwrap();
4126 tracing::info!(?decided_view, ?state, "consensus state");
4127
4128 drop(network);
4130
4131 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4133
4134 let config = TestNetworkConfigBuilder::default()
4135 .api_config(SqlDataSource::options(
4136 &storage[0],
4137 Options::with_port(port),
4138 ))
4139 .persistences(persistence)
4140 .catchups(std::array::from_fn(|_| {
4141 StatePeers::<StaticVersion<0, 1>>::from_urls(
4145 vec![format!("http://localhost:{port}").parse().unwrap()],
4146 Default::default(),
4147 Duration::from_secs(2),
4148 &NoMetrics,
4149 )
4150 }))
4151 .network_config(TestConfigBuilder::default().build())
4152 .build();
4153 let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
4154 let client: Client<ServerError, StaticVersion<0, 1>> =
4155 Client::new(format!("http://localhost:{port}").parse().unwrap());
4156 client.connect(None).await;
4157 tracing::info!(port, "server running");
4158
4159 tracing::info!("waiting for decide, height {height}");
4161 let new_leaf: LeafQueryData<SeqTypes> = client
4162 .socket(&format!("availability/stream/leaves/{height}"))
4163 .subscribe()
4164 .await
4165 .unwrap()
4166 .next()
4167 .await
4168 .unwrap()
4169 .unwrap();
4170 assert_eq!(new_leaf.height(), height as u64);
4171 assert_eq!(
4172 new_leaf.leaf().parent_commitment(),
4173 chain[height - 1].hash()
4174 );
4175
4176 let new_chain: Vec<LeafQueryData<SeqTypes>> = client
4178 .socket("availability/stream/leaves/0")
4179 .subscribe()
4180 .await
4181 .unwrap()
4182 .take(height)
4183 .try_collect()
4184 .await
4185 .unwrap();
4186 assert_eq!(chain, new_chain);
4187 }
4188
4189 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4190 async fn test_fetch_config() {
4191 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4192 let url: surf_disco::Url = format!("http://localhost:{port}").parse().unwrap();
4193 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url.clone());
4194
4195 let options = Options::with_port(port).config(Default::default());
4196 let network_config = TestConfigBuilder::default().build();
4197 let config = TestNetworkConfigBuilder::default()
4198 .api_config(options)
4199 .network_config(network_config)
4200 .build();
4201 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
4202 client.connect(None).await;
4203
4204 let peers = StatePeers::<StaticVersion<0, 1>>::from_urls(
4207 vec!["https://notarealnode.network".parse().unwrap(), url],
4208 Default::default(),
4209 Duration::from_secs(2),
4210 &NoMetrics,
4211 );
4212
4213 let validator =
4215 ValidatorConfig::generated_from_seed_indexed([0; 32], 1, U256::from(1), false);
4216 let config = peers.fetch_config(validator.clone()).await.unwrap();
4217
4218 assert_eq!(config.node_index, 1);
4220
4221 pretty_assertions::assert_eq!(
4224 serde_json::to_value(PublicHotShotConfig::from(config.config)).unwrap(),
4225 serde_json::to_value(PublicHotShotConfig::from(
4226 network.cfg.hotshot_config().clone()
4227 ))
4228 .unwrap()
4229 );
4230 }
4231
4232 async fn run_hotshot_event_streaming_test(url_suffix: &str) {
4233 let query_service_port =
4234 reserve_tcp_port().expect("OS should have ephemeral ports available");
4235
4236 let url = format!("http://localhost:{query_service_port}{url_suffix}")
4237 .parse()
4238 .unwrap();
4239
4240 let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
4241
4242 let options = Options::with_port(query_service_port).hotshot_events(HotshotEvents);
4243
4244 let network_config = TestConfigBuilder::default().build();
4245 let config = TestNetworkConfigBuilder::default()
4246 .api_config(options)
4247 .network_config(network_config)
4248 .build();
4249 let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
4250
4251 let mut subscribed_events = client
4252 .socket("hotshot-events/events")
4253 .subscribe::<Event<SeqTypes>>()
4254 .await
4255 .unwrap();
4256
4257 let total_count = 5;
4258 let mut receive_count = 0;
4260 loop {
4261 let event = subscribed_events.next().await.unwrap();
4262 tracing::info!("Received event in hotshot event streaming Client 1: {event:?}");
4263 receive_count += 1;
4264 if receive_count > total_count {
4265 tracing::info!("Client Received at least desired events, exiting loop");
4266 break;
4267 }
4268 }
4269 assert_eq!(receive_count, total_count + 1);
4270 }
4271
4272 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4273 async fn test_hotshot_event_streaming_v0() {
4274 run_hotshot_event_streaming_test("/v0").await;
4275 }
4276
4277 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4278 async fn test_hotshot_event_streaming_v1() {
4279 run_hotshot_event_streaming_test("/v1").await;
4280 }
4281
4282 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4283 async fn test_hotshot_event_streaming() {
4284 run_hotshot_event_streaming_test("").await;
4285 }
4286
4287 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4290 async fn test_hotshot_event_streaming_epoch_progression() {
4291 let epoch_height = 35;
4292 let wanted_epochs = 4;
4293
4294 let network_config = TestConfigBuilder::default()
4295 .epoch_height(epoch_height)
4296 .build();
4297
4298 let query_service_port =
4299 reserve_tcp_port().expect("OS should have ephemeral ports available");
4300
4301 let hotshot_url = format!("http://localhost:{query_service_port}")
4302 .parse()
4303 .unwrap();
4304
4305 let client: Client<ServerError, SequencerApiVersion> = Client::new(hotshot_url);
4306 let options = Options::with_port(query_service_port).hotshot_events(HotshotEvents);
4307
4308 let config = TestNetworkConfigBuilder::default()
4309 .api_config(options)
4310 .network_config(network_config.clone())
4311 .pos_hook(
4312 DelegationConfig::VariableAmounts,
4313 Default::default(),
4314 POS_V3,
4315 )
4316 .await
4317 .expect("Pos Deployment")
4318 .build();
4319
4320 let _network = TestNetwork::new(config, POS_V3).await;
4321
4322 let mut subscribed_events = client
4323 .socket("hotshot-events/events")
4324 .subscribe::<Event<SeqTypes>>()
4325 .await
4326 .unwrap();
4327
4328 let wanted_views = epoch_height * wanted_epochs;
4329
4330 let mut views = HashSet::new();
4331 let mut epochs = HashSet::new();
4332 for _ in 0..=600 {
4333 let event = subscribed_events.next().await.unwrap();
4334 let event = event.unwrap();
4335 let view_number = event.view_number;
4336 views.insert(view_number.u64());
4337
4338 if let hotshot::types::EventType::Decide { committing_qc, .. } = event.event {
4339 assert!(committing_qc.epoch().is_some(), "epochs are live");
4340 assert!(committing_qc.block_number().is_some());
4341
4342 let epoch = committing_qc.epoch().unwrap().u64();
4343 epochs.insert(epoch);
4344
4345 tracing::debug!(
4346 "Got decide: epoch: {:?}, block: {:?} ",
4347 epoch,
4348 committing_qc.block_number()
4349 );
4350
4351 let expected_epoch =
4352 epoch_from_block_number(committing_qc.block_number().unwrap(), epoch_height);
4353 tracing::debug!("expected epoch: {expected_epoch}, qc epoch: {epoch}");
4354
4355 assert_eq!(expected_epoch, epoch);
4356 }
4357 if views.contains(&wanted_views) {
4358 tracing::info!("Client Received at least desired views, exiting loop");
4359 break;
4360 }
4361 }
4362
4363 assert!(views.contains(&wanted_views), "Views are not progressing");
4365 assert!(
4366 epochs.contains(&wanted_epochs),
4367 "Epochs are not progressing"
4368 );
4369 }
4370
4371 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4372 async fn test_pos_rewards_basic() -> anyhow::Result<()> {
4373 let epoch_height = 20;
4380
4381 let network_config = TestConfigBuilder::default()
4382 .epoch_height(epoch_height)
4383 .build();
4384
4385 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4386
4387 const NUM_NODES: usize = 1;
4388 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4390 let persistence: [_; NUM_NODES] = storage
4391 .iter()
4392 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4393 .collect::<Vec<_>>()
4394 .try_into()
4395 .unwrap();
4396
4397 let config = TestNetworkConfigBuilder::with_num_nodes()
4398 .api_config(SqlDataSource::options(
4399 &storage[0],
4400 Options::with_port(api_port),
4401 ))
4402 .network_config(network_config.clone())
4403 .persistences(persistence.clone())
4404 .catchups(std::array::from_fn(|_| {
4405 StatePeers::<StaticVersion<0, 1>>::from_urls(
4406 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4407 Default::default(),
4408 Duration::from_secs(2),
4409 &NoMetrics,
4410 )
4411 }))
4412 .pos_hook(
4413 DelegationConfig::VariableAmounts,
4414 Default::default(),
4415 POS_V4,
4416 )
4417 .await
4418 .unwrap()
4419 .build();
4420
4421 let network = TestNetwork::new(config, POS_V4).await;
4422 let client: Client<ServerError, SequencerApiVersion> =
4423 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4424
4425 let _blocks = client
4430 .socket("availability/stream/blocks/0")
4431 .subscribe::<BlockQueryData<SeqTypes>>()
4432 .await
4433 .unwrap()
4434 .take(65)
4435 .try_collect::<Vec<_>>()
4436 .await
4437 .unwrap();
4438
4439 let staking_priv_keys = network_config.staking_priv_keys();
4440 let account = staking_priv_keys[0].signer.clone();
4441 let address = account.address();
4442
4443 let block_height = 60;
4444
4445 let node_state = network.server.node_state();
4446 let membership = node_state.coordinator.membership();
4447 let expected_amount = U256::from(20)
4448 * (membership
4449 .epoch_block_reward(3.into())
4450 .expect("block reward is not None"))
4451 .0;
4452
4453 let amount = client
4455 .get::<Option<RewardAmount>>(&format!(
4456 "reward-state/reward-balance/{block_height}/{address}"
4457 ))
4458 .send()
4459 .await
4460 .unwrap()
4461 .unwrap();
4462
4463 tracing::info!("amount={amount:?}");
4464
4465 assert_eq!(amount.0, expected_amount, "reward amount don't match");
4466
4467 Ok(())
4468 }
4469
4470 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4471 async fn test_cumulative_pos_rewards() -> anyhow::Result<()> {
4472 let epoch_height = 20;
4478
4479 let network_config = TestConfigBuilder::default()
4480 .epoch_height(epoch_height)
4481 .build();
4482
4483 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4484
4485 const NUM_NODES: usize = 5;
4486 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4488 let persistence: [_; NUM_NODES] = storage
4489 .iter()
4490 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4491 .collect::<Vec<_>>()
4492 .try_into()
4493 .unwrap();
4494
4495 let config = TestNetworkConfigBuilder::with_num_nodes()
4496 .api_config(SqlDataSource::options(
4497 &storage[0],
4498 Options::with_port(api_port),
4499 ))
4500 .network_config(network_config)
4501 .persistences(persistence.clone())
4502 .catchups(std::array::from_fn(|_| {
4503 StatePeers::<StaticVersion<0, 1>>::from_urls(
4504 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4505 Default::default(),
4506 Duration::from_secs(2),
4507 &NoMetrics,
4508 )
4509 }))
4510 .pos_hook(
4511 DelegationConfig::MultipleDelegators,
4512 Default::default(),
4513 POS_V4,
4514 )
4515 .await
4516 .unwrap()
4517 .build();
4518
4519 let network = TestNetwork::new(config, POS_V4).await;
4520 let node_state = network.server.node_state();
4521 let client: Client<ServerError, SequencerApiVersion> =
4522 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4523
4524 let _blocks = client
4526 .socket("availability/stream/blocks/0")
4527 .subscribe::<BlockQueryData<SeqTypes>>()
4528 .await
4529 .unwrap()
4530 .take(75)
4531 .try_collect::<Vec<_>>()
4532 .await
4533 .unwrap();
4534
4535 let validators = client
4539 .get::<AuthenticatedValidatorMap>("node/validators/3")
4540 .send()
4541 .await
4542 .expect("failed to get validator");
4543
4544 let mut addresses = HashSet::new();
4548 for v in validators.values() {
4549 addresses.insert(v.account);
4550 addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
4551 }
4552 let validators = client
4554 .get::<AuthenticatedValidatorMap>("node/validators/4")
4555 .send()
4556 .await
4557 .expect("failed to get validator");
4558 for v in validators.values() {
4559 addresses.insert(v.account);
4560 addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
4561 }
4562
4563 let mut prev_cumulative_amount = U256::ZERO;
4564 for block in 41..=67 {
4566 let membership = node_state.coordinator.membership();
4567 let block_reward = membership
4568 .epoch_block_reward(epoch_from_block_number(block, epoch_height).into())
4569 .expect("block reward is not None");
4570
4571 let mut cumulative_amount = U256::ZERO;
4572 for address in addresses.clone() {
4573 let amount = client
4574 .get::<Option<RewardAmount>>(&format!(
4575 "reward-state/reward-balance/{block}/{address}"
4576 ))
4577 .send()
4578 .await
4579 .ok()
4580 .flatten();
4581
4582 if let Some(amount) = amount {
4583 tracing::info!("address={address}, amount={amount}");
4584 cumulative_amount += amount.0;
4585 };
4586 }
4587
4588 assert_eq!(cumulative_amount - prev_cumulative_amount, block_reward.0);
4590 tracing::info!("cumulative_amount is correct for block={block}");
4591 prev_cumulative_amount = cumulative_amount;
4592 }
4593
4594 Ok(())
4595 }
4596
4597 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4598 async fn test_stake_table_duplicate_events_from_contract() -> anyhow::Result<()> {
4599 let epoch_height = 20;
4603
4604 let network_config = TestConfigBuilder::default()
4605 .epoch_height(epoch_height)
4606 .build();
4607
4608 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4609
4610 const NUM_NODES: usize = 5;
4611 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4613 let persistence: [_; NUM_NODES] = storage
4614 .iter()
4615 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4616 .collect::<Vec<_>>()
4617 .try_into()
4618 .unwrap();
4619
4620 let l1_url = network_config.l1_url();
4621 let config = TestNetworkConfigBuilder::with_num_nodes()
4622 .api_config(SqlDataSource::options(
4623 &storage[0],
4624 Options::with_port(api_port),
4625 ))
4626 .network_config(network_config)
4627 .persistences(persistence.clone())
4628 .catchups(std::array::from_fn(|_| {
4629 StatePeers::<StaticVersion<0, 1>>::from_urls(
4630 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4631 Default::default(),
4632 Duration::from_secs(2),
4633 &NoMetrics,
4634 )
4635 }))
4636 .pos_hook(
4637 DelegationConfig::MultipleDelegators,
4638 Default::default(),
4639 POS_V3,
4640 )
4641 .await
4642 .unwrap()
4643 .build();
4644
4645 let network = TestNetwork::new(config, POS_V3).await;
4646
4647 let mut prev_st = None;
4648 let state = network.server.decided_state().await.unwrap();
4649 let chain_config = state.chain_config.resolve().expect("resolve chain config");
4650 let stake_table = chain_config.stake_table_contract.unwrap();
4651
4652 let l1_client = L1ClientOptions::default()
4653 .connect(vec![l1_url])
4654 .expect("failed to connect to l1");
4655
4656 let client: Client<ServerError, SequencerApiVersion> =
4657 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4658
4659 let mut headers = client
4660 .socket("availability/stream/headers/0")
4661 .subscribe::<Header>()
4662 .await
4663 .unwrap();
4664
4665 let mut target_bh = 0;
4666 while let Some(header) = headers.next().await {
4667 let header = header.unwrap();
4668 println!("got header with height {}", header.height());
4669 if header.height() == 0 {
4670 continue;
4671 }
4672 let l1_block = header.l1_finalized().expect("l1 block not found");
4673
4674 let sorted_events = Fetcher::fetch_events_from_contract(
4675 l1_client.clone(),
4676 stake_table,
4677 None,
4678 l1_block.number(),
4679 )
4680 .await?;
4681
4682 let mut sorted_dedup_removed = sorted_events.clone();
4683 sorted_dedup_removed.dedup();
4684
4685 assert_eq!(
4686 sorted_events.len(),
4687 sorted_dedup_removed.len(),
4688 "duplicates found"
4689 );
4690
4691 let stake_table =
4693 validators_from_l1_events(sorted_events.into_iter().map(|(_, e)| e)).unwrap();
4694 if let Some(prev_st) = prev_st {
4695 assert_eq!(stake_table, prev_st);
4696 }
4697
4698 prev_st = Some(stake_table);
4699
4700 if target_bh == 100 {
4701 break;
4702 }
4703
4704 target_bh = header.height();
4705 }
4706
4707 Ok(())
4708 }
4709
4710 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4711 async fn test_rewards_v4() -> anyhow::Result<()> {
4712 const EPOCH_HEIGHT: u64 = 20;
4722
4723 let network_config = TestConfigBuilder::default()
4724 .epoch_height(EPOCH_HEIGHT)
4725 .build();
4726
4727 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4728
4729 const NUM_NODES: usize = 5;
4730
4731 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4732 let persistence: [_; NUM_NODES] = storage
4733 .iter()
4734 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4735 .collect::<Vec<_>>()
4736 .try_into()
4737 .unwrap();
4738
4739 let config = TestNetworkConfigBuilder::with_num_nodes()
4740 .api_config(SqlDataSource::options(
4741 &storage[0],
4742 Options::with_port(api_port),
4743 ))
4744 .network_config(network_config)
4745 .persistences(persistence.clone())
4746 .catchups(std::array::from_fn(|_| {
4747 StatePeers::<StaticVersion<0, 1>>::from_urls(
4748 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4749 Default::default(),
4750 Duration::from_secs(2),
4751 &NoMetrics,
4752 )
4753 }))
4754 .pos_hook(
4755 DelegationConfig::MultipleDelegators,
4756 Default::default(),
4757 POS_V4,
4758 )
4759 .await
4760 .unwrap()
4761 .build();
4762
4763 let network = TestNetwork::new(config, POS_V4).await;
4764 let client: Client<ServerError, SequencerApiVersion> =
4765 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4766
4767 let mut events = network.peers[0].event_stream();
4769 while let Some(event) = events.next().await {
4770 if let CoordinatorEvent::LegacyEvent(Event {
4771 event: EventType::Decide { leaf_chain, .. },
4772 ..
4773 }) = event
4774 {
4775 let height = leaf_chain[0].leaf.height();
4776 tracing::info!("Node 0 decided at height: {height}");
4777 if height > EPOCH_HEIGHT * 3 {
4778 break;
4779 }
4780 }
4781 }
4782
4783 {
4785 client
4786 .get::<AuthenticatedValidatorMap>("node/validators/1")
4787 .send()
4788 .await
4789 .unwrap()
4790 .is_empty();
4791
4792 client
4793 .get::<AuthenticatedValidatorMap>("node/validators/2")
4794 .send()
4795 .await
4796 .unwrap()
4797 .is_empty();
4798 }
4799
4800 let validators = client
4802 .get::<AuthenticatedValidatorMap>("node/validators/3")
4803 .send()
4804 .await
4805 .expect("validators");
4806
4807 assert!(!validators.is_empty());
4808
4809 let mut addresses = HashSet::new();
4811 for v in validators.values() {
4812 addresses.insert(v.account);
4813 addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
4814 }
4815
4816 let mut leaves = client
4817 .socket("availability/stream/leaves/0")
4818 .subscribe::<LeafQueryData<SeqTypes>>()
4819 .await
4820 .unwrap();
4821
4822 let node_state = network.server.node_state();
4823 let coordinator = node_state.coordinator;
4824
4825 let membership = coordinator.membership();
4826
4827 while let Some(leaf) = leaves.next().await {
4829 let leaf = leaf.unwrap();
4830 let header = leaf.header();
4831 assert_eq!(header.total_reward_distributed().unwrap().0, U256::ZERO);
4832
4833 let epoch_number =
4834 EpochNumber::new(epoch_from_block_number(leaf.height(), EPOCH_HEIGHT));
4835
4836 assert!(membership.epoch_block_reward(epoch_number).is_none());
4837
4838 let height = header.height();
4839 for address in addresses.clone() {
4840 let amount = client
4841 .get::<Option<RewardAmount>>(&format!(
4842 "reward-state-v2/reward-balance/{height}/{address}"
4843 ))
4844 .send()
4845 .await
4846 .ok()
4847 .flatten();
4848 assert!(amount.is_none(), "amount is not none for block {height}")
4849 }
4850
4851 if leaf.height() == EPOCH_HEIGHT * 2 {
4852 break;
4853 }
4854 }
4855
4856 let mut rewards_map = HashMap::new();
4857 let mut total_distributed = U256::ZERO;
4858 let mut epoch_rewards = HashMap::<EpochNumber, U256>::new();
4859
4860 while let Some(leaf) = leaves.next().await {
4861 let leaf = leaf.unwrap();
4862
4863 let header = leaf.header();
4864 let distributed = header
4865 .total_reward_distributed()
4866 .expect("rewards distributed is none");
4867
4868 let block = leaf.height();
4869 tracing::info!("verify rewards for block={block:?}");
4870 let membership = coordinator.membership();
4871 let epoch_number =
4872 EpochNumber::new(epoch_from_block_number(leaf.height(), EPOCH_HEIGHT));
4873
4874 let snapshot = membership.snapshot(epoch_number).expect("snapshot");
4875 let block_reward = snapshot.epoch_block_reward().unwrap();
4876 let leader = snapshot.leader(leaf.leaf().view_number()).expect("leader");
4877 let leader_eth_address = snapshot
4878 .validator_config(&leader)
4879 .expect("validator config")
4880 .account;
4881
4882 let validators = client
4883 .get::<AuthenticatedValidatorMap>(&format!("node/validators/{epoch_number}"))
4884 .send()
4885 .await
4886 .expect("validators");
4887
4888 let leader_validator = validators
4889 .get(&leader_eth_address)
4890 .expect("leader not found");
4891
4892 let distributor =
4893 RewardDistributor::new(leader_validator.clone(), block_reward, distributed);
4894 for validator in validators.values() {
4896 let delegator_stake_sum: U256 = validator.delegators.values().cloned().sum();
4897
4898 assert_eq!(delegator_stake_sum, validator.stake);
4899 }
4900
4901 let computed_rewards = distributor.compute_rewards().expect("reward computation");
4902
4903 let total_reward = block_reward.0;
4905 let leader_commission_basis_points = U256::from(leader_validator.commission);
4906 let calculated_leader_commission_reward = leader_commission_basis_points
4907 .checked_mul(total_reward)
4908 .context("overflow")?
4909 .checked_div(U256::from(COMMISSION_BASIS_POINTS))
4910 .context("overflow")?;
4911
4912 assert!(
4913 computed_rewards.leader_commission().0 - calculated_leader_commission_reward
4914 <= U256::from(10_u64)
4915 );
4916
4917 let leader_commission = *computed_rewards.leader_commission();
4919 for (address, amount) in computed_rewards.delegators().clone() {
4920 rewards_map
4921 .entry(address)
4922 .and_modify(|entry| *entry += amount)
4923 .or_insert(amount);
4924 }
4925
4926 rewards_map
4928 .entry(leader_eth_address)
4929 .and_modify(|entry| *entry += leader_commission)
4930 .or_insert(leader_commission);
4931
4932 for (address, calculated_amount) in rewards_map.iter() {
4934 let mut attempt = 0;
4935 let amount_from_api = loop {
4936 let result = client
4937 .get::<Option<RewardAmount>>(&format!(
4938 "reward-state-v2/reward-balance/{block}/{address}"
4939 ))
4940 .send()
4941 .await
4942 .ok()
4943 .flatten();
4944
4945 if let Some(amount) = result {
4946 break amount;
4947 }
4948
4949 attempt += 1;
4950 if attempt >= 3 {
4951 panic!(
4952 "Failed to fetch reward amount for address {address} after 3 retries"
4953 );
4954 }
4955
4956 sleep(Duration::from_secs(2)).await;
4957 };
4958
4959 assert_eq!(amount_from_api, *calculated_amount);
4960 }
4961
4962 total_distributed += block_reward.0;
4964 assert_eq!(
4965 header.total_reward_distributed().unwrap().0,
4966 total_distributed
4967 );
4968
4969 epoch_rewards
4971 .entry(epoch_number)
4972 .and_modify(|r| assert_eq!(*r, block_reward.0))
4973 .or_insert(block_reward.0);
4974
4975 if leaf.height() == EPOCH_HEIGHT * 5 {
4977 break;
4978 }
4979 }
4980
4981 Ok(())
4982 }
4983
4984 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4985 async fn test_epoch_reward_distribution_basic() -> anyhow::Result<()> {
4986 const EPOCH_HEIGHT: u64 = 10;
4987 const NUM_NODES: usize = 5;
4988
4989 const V5: Upgrade = Upgrade::trivial(EPOCH_REWARD_VERSION);
4990
4991 let network_config = TestConfigBuilder::default()
4992 .epoch_height(EPOCH_HEIGHT)
4993 .build();
4994
4995 let api_port = reserve_tcp_port().expect("No ports free for query service");
4996
4997 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4998 let persistence: [_; NUM_NODES] = storage
4999 .iter()
5000 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5001 .collect::<Vec<_>>()
5002 .try_into()
5003 .unwrap();
5004
5005 let config = TestNetworkConfigBuilder::with_num_nodes()
5006 .api_config(SqlDataSource::options(
5007 &storage[0],
5008 Options::with_port(api_port),
5009 ))
5010 .network_config(network_config)
5011 .persistences(persistence.clone())
5012 .catchups(std::array::from_fn(|_| {
5013 StatePeers::<StaticVersion<0, 1>>::from_urls(
5014 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5015 Default::default(),
5016 Duration::from_secs(2),
5017 &NoMetrics,
5018 )
5019 }))
5020 .pos_hook(DelegationConfig::MultipleDelegators, Default::default(), V5)
5021 .await
5022 .unwrap()
5023 .build();
5024
5025 let _network = TestNetwork::new(config, V5).await;
5026 let client: Client<ServerError, SequencerApiVersion> =
5027 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5028
5029 let height_client: Client<ServerError, StaticVersion<0, 1>> =
5031 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5032 wait_until_block_height(&height_client, "node/block-height", EPOCH_HEIGHT * 5).await;
5033
5034 let mut leaves = client
5035 .socket("availability/stream/leaves/0")
5036 .subscribe::<LeafQueryData<SeqTypes>>()
5037 .await
5038 .unwrap();
5039
5040 while let Some(leaf) = leaves.next().await {
5042 let leaf = leaf.unwrap();
5043 let header = leaf.header();
5044 let height = header.height();
5045
5046 let total_distributed = header.total_reward_distributed().unwrap();
5047 assert_eq!(
5048 total_distributed.0,
5049 U256::ZERO,
5050 "epochs 1-3 should have no rewards, height={height}"
5051 );
5052
5053 if height == EPOCH_HEIGHT * 3 {
5054 break;
5055 }
5056 }
5057
5058 while let Some(leaf) = leaves.next().await {
5059 let leaf = leaf.unwrap();
5060 let header = leaf.header();
5061 let height = header.height();
5062
5063 if height == EPOCH_HEIGHT * 4 {
5064 let total_distributed = header.total_reward_distributed().unwrap();
5065 assert!(total_distributed.0 > U256::ZERO,);
5066 break;
5067 }
5068 }
5069
5070 while let Some(leaf) = leaves.next().await {
5071 let leaf = leaf.unwrap();
5072 let header = leaf.header();
5073 let height = header.height();
5074
5075 if height == EPOCH_HEIGHT * 5 {
5076 let total_distributed = header.total_reward_distributed().unwrap();
5077 assert!(total_distributed.0 > U256::ZERO,);
5078 break;
5079 }
5080 }
5081
5082 Ok(())
5083 }
5084
5085 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5086 async fn test_epoch_reward_total_distributed_rewards() -> anyhow::Result<()> {
5087 const EPOCH_HEIGHT: u64 = 10;
5092 const NUM_NODES: usize = 5;
5093
5094 const V5: Upgrade = Upgrade::trivial(EPOCH_REWARD_VERSION);
5095
5096 let network_config = TestConfigBuilder::default()
5097 .epoch_height(EPOCH_HEIGHT)
5098 .build();
5099
5100 let api_port = reserve_tcp_port().expect("No ports free for query service");
5101
5102 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5103 let persistence: [_; NUM_NODES] = storage
5104 .iter()
5105 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5106 .collect::<Vec<_>>()
5107 .try_into()
5108 .unwrap();
5109
5110 let config = TestNetworkConfigBuilder::with_num_nodes()
5111 .api_config(SqlDataSource::options(
5112 &storage[0],
5113 Options::with_port(api_port),
5114 ))
5115 .network_config(network_config)
5116 .persistences(persistence.clone())
5117 .catchups(std::array::from_fn(|_| {
5118 StatePeers::<StaticVersion<0, 1>>::from_urls(
5119 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5120 Default::default(),
5121 Duration::from_secs(2),
5122 &NoMetrics,
5123 )
5124 }))
5125 .pos_hook(DelegationConfig::MultipleDelegators, Default::default(), V5)
5126 .await
5127 .unwrap()
5128 .build();
5129
5130 let _network = TestNetwork::new(config, V5).await;
5131 let client: Client<ServerError, SequencerApiVersion> =
5132 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5133
5134 let height_client: Client<ServerError, StaticVersion<0, 1>> =
5135 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5136 wait_until_block_height(&height_client, "node/block-height", EPOCH_HEIGHT * 5).await;
5137
5138 let mut leaves = client
5139 .socket("availability/stream/leaves/0")
5140 .subscribe::<LeafQueryData<SeqTypes>>()
5141 .await
5142 .unwrap();
5143
5144 while let Some(leaf) = leaves.next().await {
5145 let leaf = leaf.unwrap();
5146 let header = leaf.header();
5147 let height = header.height();
5148
5149 let total_distributed = header.total_reward_distributed().unwrap();
5150 assert_eq!(total_distributed.0, U256::ZERO,);
5151
5152 if height == EPOCH_HEIGHT * 3 {
5153 break;
5154 }
5155 }
5156
5157 while let Some(leaf) = leaves.next().await {
5158 let leaf = leaf.unwrap();
5159 let header = leaf.header();
5160 let height = header.height();
5161
5162 let total_distributed = header.total_reward_distributed().unwrap();
5163
5164 if height < EPOCH_HEIGHT * 4 {
5165 assert_eq!(total_distributed.0, U256::ZERO,);
5166 } else {
5167 assert!(total_distributed.0 > U256::ZERO,);
5168 break;
5169 }
5170 }
5171
5172 let epoch4_last_reward = {
5173 let header = client
5174 .get::<Header>(&format!("availability/header/{}", EPOCH_HEIGHT * 4))
5175 .send()
5176 .await
5177 .unwrap();
5178 header.total_reward_distributed().unwrap()
5179 };
5180
5181 assert!(
5182 epoch4_last_reward.0 > U256::ZERO,
5183 "epoch 4 last block should have positive rewards"
5184 );
5185
5186 while let Some(leaf) = leaves.next().await {
5187 let leaf = leaf.unwrap();
5188 let header = leaf.header();
5189 let height = header.height();
5190
5191 let total_distributed = header.total_reward_distributed().unwrap();
5192
5193 if height < EPOCH_HEIGHT * 5 {
5194 assert_eq!(total_distributed, epoch4_last_reward,);
5195 } else {
5196 assert!(total_distributed.0 > epoch4_last_reward.0,);
5197 break;
5198 }
5199 }
5200
5201 Ok(())
5202 }
5203
5204 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5207 async fn test_reward_state_v2_epoch_distribution() -> anyhow::Result<()> {
5208 const EPOCH_HEIGHT: u64 = 10;
5209 const NUM_NODES: usize = 5;
5210 const NUM_EPOCHS: u64 = 6;
5211 const V5: Upgrade = Upgrade::trivial(EPOCH_REWARD_VERSION);
5212
5213 let network_config = TestConfigBuilder::default()
5214 .epoch_height(EPOCH_HEIGHT)
5215 .build();
5216
5217 let api_port = reserve_tcp_port().expect("No ports free for query service");
5218
5219 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5220 let persistence: [_; NUM_NODES] = storage
5221 .iter()
5222 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5223 .collect::<Vec<_>>()
5224 .try_into()
5225 .unwrap();
5226
5227 let config = TestNetworkConfigBuilder::with_num_nodes()
5228 .api_config(SqlDataSource::options(
5229 &storage[0],
5230 Options::with_port(api_port),
5231 ))
5232 .network_config(network_config)
5233 .persistences(persistence.clone())
5234 .catchups(std::array::from_fn(|_| {
5235 StatePeers::<StaticVersion<0, 1>>::from_urls(
5236 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5237 Default::default(),
5238 Duration::from_secs(2),
5239 &NoMetrics,
5240 )
5241 }))
5242 .pos_hook(DelegationConfig::MultipleDelegators, Default::default(), V5)
5243 .await
5244 .unwrap()
5245 .build();
5246
5247 let network = TestNetwork::new(config, V5).await;
5248 let client: Client<ServerError, SequencerApiVersion> =
5249 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5250
5251 let node_state = network.server.node_state();
5252 let coordinator = node_state.coordinator;
5253
5254 let mut expected_total_distributed = U256::ZERO;
5255
5256 let mut leaves = client
5257 .socket("availability/stream/leaves/0")
5258 .subscribe::<LeafQueryData<SeqTypes>>()
5259 .await
5260 .unwrap();
5261
5262 while let Some(leaf) = leaves.next().await {
5263 let leaf = leaf.unwrap();
5264 let header = leaf.header();
5265 let height = header.height();
5266
5267 let epoch = epoch_from_block_number(height, EPOCH_HEIGHT);
5268
5269 let is_epoch_last_block = height % EPOCH_HEIGHT == 0;
5270
5271 if epoch <= 3 {
5272 continue;
5273 }
5274
5275 let header_total_distributed = header
5276 .total_reward_distributed()
5277 .expect("total_reward_distributed should exist");
5278
5279 if is_epoch_last_block {
5280 let prev_epoch = epoch - 1;
5281 let prev_epoch_number = EpochNumber::new(prev_epoch);
5282 let membership = coordinator.membership();
5283 let prev_block_reward = membership
5284 .epoch_block_reward(prev_epoch_number)
5285 .expect("epoch block reward should exist");
5286
5287 let epoch_total = prev_block_reward.0 * U256::from(EPOCH_HEIGHT);
5288 expected_total_distributed += epoch_total;
5289 }
5290
5291 assert_eq!(
5292 header_total_distributed.0, expected_total_distributed,
5293 "total_reward_distributed mismatch at height {height}"
5294 );
5295
5296 if height >= NUM_EPOCHS * EPOCH_HEIGHT {
5297 break;
5298 }
5299 }
5300
5301 Ok(())
5302 }
5303
5304 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5306 async fn test_epoch_leader_counts() -> anyhow::Result<()> {
5307 const EPOCH_HEIGHT: u64 = 10;
5308 const NUM_NODES: usize = 5;
5309 const NUM_EPOCHS: u64 = 6;
5310 const V5: Upgrade = Upgrade::trivial(EPOCH_REWARD_VERSION);
5311
5312 let network_config = TestConfigBuilder::default()
5313 .epoch_height(EPOCH_HEIGHT)
5314 .build();
5315
5316 let api_port = reserve_tcp_port().expect("No ports free for query service");
5317
5318 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5319 let persistence: [_; NUM_NODES] = storage
5320 .iter()
5321 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5322 .collect::<Vec<_>>()
5323 .try_into()
5324 .unwrap();
5325
5326 let config = TestNetworkConfigBuilder::with_num_nodes()
5327 .api_config(SqlDataSource::options(
5328 &storage[0],
5329 Options::with_port(api_port),
5330 ))
5331 .network_config(network_config)
5332 .persistences(persistence.clone())
5333 .catchups(std::array::from_fn(|_| {
5334 StatePeers::<StaticVersion<0, 1>>::from_urls(
5335 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5336 Default::default(),
5337 Duration::from_secs(2),
5338 &NoMetrics,
5339 )
5340 }))
5341 .pos_hook(DelegationConfig::MultipleDelegators, Default::default(), V5)
5342 .await
5343 .unwrap()
5344 .build();
5345
5346 let network = TestNetwork::new(config, V5).await;
5347 let client: Client<ServerError, SequencerApiVersion> =
5348 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5349
5350 let node_state = network.server.node_state();
5351 let coordinator = node_state.coordinator;
5352
5353 let mut expected_counts: HashMap<Address, u16> = HashMap::new();
5355
5356 let mut leaves = client
5357 .socket("availability/stream/leaves/0")
5358 .subscribe::<LeafQueryData<SeqTypes>>()
5359 .await
5360 .unwrap();
5361
5362 while let Some(leaf) = leaves.next().await {
5363 let leaf = leaf.unwrap();
5364 let header = leaf.header();
5365 let height = header.height();
5366 let epoch = epoch_from_block_number(height, EPOCH_HEIGHT);
5367 let epoch_number = EpochNumber::new(epoch);
5368
5369 if epoch <= 2 {
5370 continue;
5371 }
5372
5373 let header_leader_counts = header
5374 .leader_counts()
5375 .expect("V5+ header must have leader_counts");
5376
5377 let is_epoch_start = (height - 1) % EPOCH_HEIGHT == 0;
5379 if is_epoch_start {
5380 expected_counts.clear();
5381 }
5382
5383 let view_number = leaf.leaf().view_number();
5385 let snapshot = coordinator
5386 .membership()
5387 .snapshot(epoch_number)
5388 .expect("committee for epoch_number");
5389 let leader = snapshot.leader(view_number).expect("leader should exist");
5390 let leader_address = snapshot
5391 .validator_config(&leader)
5392 .expect("leader should have an address")
5393 .account;
5394
5395 let validator_leader_counts =
5396 ValidatorLeaderCounts::new(&snapshot, *header_leader_counts)
5397 .expect("ValidatorLeaderCounts should build from header leader_counts");
5398
5399 *expected_counts.entry(leader_address).or_insert(0) += 1;
5400
5401 let header_counts: HashMap<Address, u16> = validator_leader_counts
5402 .active_leaders()
5403 .map(|(v, count)| (v.account, count))
5404 .collect();
5405
5406 assert_eq!(
5407 header_counts, expected_counts,
5408 "leader_counts mismatch at height {height} (epoch {epoch})"
5409 );
5410
5411 if height % EPOCH_HEIGHT == 0 {
5412 let total: u16 = expected_counts.values().sum();
5413 assert_eq!(
5414 total, EPOCH_HEIGHT as u16,
5415 "total leader_counts at epoch boundary should equal EPOCH_HEIGHT at height \
5416 {height}"
5417 );
5418 }
5419
5420 if height >= NUM_EPOCHS * EPOCH_HEIGHT {
5421 break;
5422 }
5423 }
5424
5425 Ok(())
5426 }
5427
5428 #[rstest]
5429 #[case(POS_V3)]
5430 #[case(POS_V4)]
5431 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5432 async fn test_node_stake_table_api(#[case] upgrade: Upgrade) {
5433 let epoch_height = 20;
5434
5435 let network_config = TestConfigBuilder::default()
5436 .epoch_height(epoch_height)
5437 .build();
5438
5439 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5440
5441 const NUM_NODES: usize = 2;
5442 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5444 let persistence: [_; NUM_NODES] = storage
5445 .iter()
5446 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5447 .collect::<Vec<_>>()
5448 .try_into()
5449 .unwrap();
5450
5451 let config = TestNetworkConfigBuilder::with_num_nodes()
5452 .api_config(SqlDataSource::options(
5453 &storage[0],
5454 Options::with_port(api_port),
5455 ))
5456 .network_config(network_config)
5457 .persistences(persistence.clone())
5458 .catchups(std::array::from_fn(|_| {
5459 StatePeers::<StaticVersion<0, 1>>::from_urls(
5460 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5461 Default::default(),
5462 Duration::from_secs(2),
5463 &NoMetrics,
5464 )
5465 }))
5466 .pos_hook(
5467 DelegationConfig::MultipleDelegators,
5468 Default::default(),
5469 upgrade,
5470 )
5471 .await
5472 .unwrap()
5473 .build();
5474
5475 let _network = TestNetwork::new(config, upgrade).await;
5476
5477 let client: Client<ServerError, SequencerApiVersion> =
5478 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5479
5480 let _blocks = client
5482 .socket("availability/stream/blocks/0")
5483 .subscribe::<BlockQueryData<SeqTypes>>()
5484 .await
5485 .unwrap()
5486 .take(40)
5487 .try_collect::<Vec<_>>()
5488 .await
5489 .unwrap();
5490
5491 for i in 1..=3 {
5492 let _st = client
5493 .get::<Vec<PeerConfig<SeqTypes>>>(&format!("node/stake-table/{}", i as u64))
5494 .send()
5495 .await
5496 .expect("failed to get stake table");
5497 }
5498
5499 let _st = client
5500 .get::<StakeTableWithEpochNumber<SeqTypes>>("node/stake-table/current")
5501 .send()
5502 .await
5503 .expect("failed to get stake table");
5504 }
5505
5506 #[rstest]
5507 #[case(POS_V3)]
5508 #[case(POS_V4)]
5509 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5510 async fn test_epoch_stake_table_catchup(#[case] upgrade: Upgrade) {
5511 const EPOCH_HEIGHT: u64 = 10;
5512 const NUM_NODES: usize = 6;
5513
5514 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5515
5516 let network_config = TestConfigBuilder::default()
5517 .epoch_height(EPOCH_HEIGHT)
5518 .build();
5519
5520 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5522
5523 let persistence_options: [_; NUM_NODES] = storage
5524 .iter()
5525 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5526 .collect::<Vec<_>>()
5527 .try_into()
5528 .unwrap();
5529
5530 let catchup_peers = std::array::from_fn(|_| {
5532 StatePeers::<StaticVersion<0, 1>>::from_urls(
5533 vec![format!("http://localhost:{port}").parse().unwrap()],
5534 Default::default(),
5535 Duration::from_secs(2),
5536 &NoMetrics,
5537 )
5538 });
5539 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
5540 .api_config(SqlDataSource::options(
5541 &storage[0],
5542 Options::with_port(port),
5543 ))
5544 .network_config(network_config)
5545 .persistences(persistence_options.clone())
5546 .catchups(catchup_peers)
5547 .pos_hook(
5548 DelegationConfig::MultipleDelegators,
5549 Default::default(),
5550 upgrade,
5551 )
5552 .await
5553 .unwrap()
5554 .build();
5555
5556 let state = config.states()[0].clone();
5557 let mut network = TestNetwork::new(config, upgrade).await;
5558
5559 let mut events = network.peers[0].event_stream();
5561 while let Some(event) = events.next().await {
5562 if let CoordinatorEvent::LegacyEvent(Event {
5563 event: EventType::Decide { leaf_chain, .. },
5564 ..
5565 }) = event
5566 {
5567 let height = leaf_chain[0].leaf.height();
5568 tracing::info!("Node 0 decided at height: {height}");
5569 if height > EPOCH_HEIGHT * 3 {
5570 break;
5571 }
5572 }
5573 }
5574
5575 tracing::info!("Shutting down peer 0");
5577 network.peers.remove(0);
5578
5579 let mut events = network.server.event_stream();
5581 while let Some(event) = events.next().await {
5582 if let CoordinatorEvent::LegacyEvent(Event {
5583 event: EventType::Decide { leaf_chain, .. },
5584 ..
5585 }) = event
5586 {
5587 let height = leaf_chain[0].leaf.height();
5588 if height > EPOCH_HEIGHT * 7 {
5589 break;
5590 }
5591 }
5592 }
5593
5594 let storage = SqlDataSource::create_storage().await;
5596 let options = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
5597 tracing::info!("Restarting peer 0");
5598 let node = network
5599 .cfg
5600 .init_node(
5601 1,
5602 state,
5603 options,
5604 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5605 vec![format!("http://localhost:{port}").parse().unwrap()],
5606 Default::default(),
5607 Duration::from_secs(2),
5608 &NoMetrics,
5609 )),
5610 None,
5611 &NoMetrics,
5612 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5613 NullEventConsumer,
5614 upgrade,
5615 Default::default(),
5616 )
5617 .await;
5618
5619 let coordinator = node.node_state().coordinator;
5620 let server_node_state = network.server.node_state();
5621 let server_coordinator = server_node_state.coordinator;
5622 for epoch_num in 1..=7 {
5624 let epoch = EpochNumber::new(epoch_num);
5625 let node_em = match coordinator.membership_for_epoch(Some(epoch)) {
5626 Ok(em) => em,
5627 Err(_) => coordinator.wait_for_catchup(epoch).await.unwrap(),
5628 };
5629 let server_em = match server_coordinator.membership_for_epoch(Some(epoch)) {
5630 Ok(em) => em,
5631 Err(_) => server_coordinator.wait_for_catchup(epoch).await.unwrap(),
5632 };
5633
5634 println!("have stake table for epoch = {epoch_num}");
5635
5636 let node_stake_table = HSStakeTable::from_iter(node_em.stake_table());
5637 let stake_table = HSStakeTable::from_iter(server_em.stake_table());
5638 println!("asserting stake table for epoch = {epoch_num}");
5639
5640 assert_eq!(
5641 node_stake_table, stake_table,
5642 "Stake table mismatch for epoch {epoch_num}",
5643 );
5644 }
5645 }
5646
5647 #[rstest]
5648 #[case(POS_V3)]
5649 #[case(POS_V4)]
5650 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5651 async fn test_epoch_stake_table_catchup_stress(#[case] upgrade: Upgrade) {
5652 const EPOCH_HEIGHT: u64 = 10;
5653 const NUM_NODES: usize = 6;
5654
5655 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5656
5657 let network_config = TestConfigBuilder::default()
5658 .epoch_height(EPOCH_HEIGHT)
5659 .build();
5660
5661 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5663
5664 let persistence_options: [_; NUM_NODES] = storage
5665 .iter()
5666 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5667 .collect::<Vec<_>>()
5668 .try_into()
5669 .unwrap();
5670
5671 let catchup_peers = std::array::from_fn(|_| {
5673 StatePeers::<StaticVersion<0, 1>>::from_urls(
5674 vec![format!("http://localhost:{port}").parse().unwrap()],
5675 Default::default(),
5676 Duration::from_secs(2),
5677 &NoMetrics,
5678 )
5679 });
5680 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
5681 .api_config(SqlDataSource::options(
5682 &storage[0],
5683 Options::with_port(port),
5684 ))
5685 .network_config(network_config)
5686 .persistences(persistence_options.clone())
5687 .catchups(catchup_peers)
5688 .pos_hook(
5689 DelegationConfig::MultipleDelegators,
5690 Default::default(),
5691 upgrade,
5692 )
5693 .await
5694 .unwrap()
5695 .build();
5696
5697 let state = config.states()[0].clone();
5698 let mut network = TestNetwork::new(config, upgrade).await;
5699
5700 let mut events = network.peers[0].event_stream();
5702 while let Some(event) = events.next().await {
5703 if let CoordinatorEvent::LegacyEvent(Event {
5704 event: EventType::Decide { leaf_chain, .. },
5705 ..
5706 }) = event
5707 {
5708 let height = leaf_chain[0].leaf.height();
5709 tracing::info!("Node 0 decided at height: {height}");
5710 if height > EPOCH_HEIGHT * 3 {
5711 break;
5712 }
5713 }
5714 }
5715
5716 tracing::info!("Shutting down peer 0");
5718 network.peers.remove(0);
5719
5720 let mut events = network.server.event_stream();
5722 while let Some(event) = events.next().await {
5723 if let CoordinatorEvent::LegacyEvent(Event {
5724 event: EventType::Decide { leaf_chain, .. },
5725 ..
5726 }) = event
5727 {
5728 let height = leaf_chain[0].leaf.height();
5729 tracing::info!("Server decided at height: {height}");
5730 if height > EPOCH_HEIGHT * 7 {
5732 break;
5733 }
5734 }
5735 }
5736
5737 let storage = SqlDataSource::create_storage().await;
5739 let options = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
5740
5741 tracing::info!("Restarting peer 0");
5742 let node = network
5743 .cfg
5744 .init_node(
5745 1,
5746 state,
5747 options,
5748 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5749 vec![format!("http://localhost:{port}").parse().unwrap()],
5750 Default::default(),
5751 Duration::from_secs(2),
5752 &NoMetrics,
5753 )),
5754 None,
5755 &NoMetrics,
5756 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5757 NullEventConsumer,
5758 upgrade,
5759 Default::default(),
5760 )
5761 .await;
5762
5763 let coordinator = node.node_state().coordinator;
5764
5765 let server_node_state = network.server.node_state();
5766 let server_coordinator = server_node_state.coordinator;
5767
5768 let mut rand_epochs: Vec<_> = (1..=7).collect();
5770 rand_epochs.shuffle(&mut rand::thread_rng());
5771 println!("trigger catchup in this order: {rand_epochs:?}");
5772 for epoch_num in rand_epochs {
5773 let epoch = EpochNumber::new(epoch_num);
5774 let _ = coordinator.membership_for_epoch(Some(epoch));
5775 }
5776
5777 for epoch_num in 1..=7 {
5779 println!("getting stake table for epoch = {epoch_num}");
5780 let epoch = EpochNumber::new(epoch_num);
5781 let node_em = coordinator.wait_for_catchup(epoch).await.unwrap();
5782 let server_em = match server_coordinator.membership_for_epoch(Some(epoch)) {
5783 Ok(em) => em,
5784 Err(_) => server_coordinator.wait_for_catchup(epoch).await.unwrap(),
5785 };
5786
5787 println!("have stake table for epoch = {epoch_num}");
5788
5789 let node_stake_table = HSStakeTable::from_iter(node_em.stake_table());
5790 let stake_table = HSStakeTable::from_iter(server_em.stake_table());
5791
5792 println!("asserting stake table for epoch = {epoch_num}");
5793
5794 assert_eq!(
5795 node_stake_table, stake_table,
5796 "Stake table mismatch for epoch {epoch_num}",
5797 );
5798 }
5799 }
5800
5801 #[rstest]
5802 #[case(POS_V3)]
5803 #[case(POS_V4)]
5804 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5805 async fn test_merklized_state_catchup_on_restart(
5806 #[case] upgrade: Upgrade,
5807 ) -> anyhow::Result<()> {
5808 const EPOCH_HEIGHT: u64 = 10;
5820
5821 let network_config = TestConfigBuilder::default()
5822 .epoch_height(EPOCH_HEIGHT)
5823 .build();
5824
5825 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5826
5827 tracing::info!("API PORT = {api_port}");
5828 const NUM_NODES: usize = 5;
5829
5830 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5831 let persistence: [_; NUM_NODES] = storage
5832 .iter()
5833 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5834 .collect::<Vec<_>>()
5835 .try_into()
5836 .unwrap();
5837
5838 let config = TestNetworkConfigBuilder::with_num_nodes()
5839 .api_config(SqlDataSource::options(
5840 &storage[0],
5841 Options::with_port(api_port)
5842 .catchup(Default::default())
5843 .light_client(Default::default()),
5844 ))
5845 .network_config(network_config)
5846 .persistences(persistence.clone())
5847 .catchups(std::array::from_fn(|_| {
5848 StatePeers::<StaticVersion<0, 1>>::from_urls(
5849 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5850 Default::default(),
5851 Duration::from_secs(2),
5852 &NoMetrics,
5853 )
5854 }))
5855 .pos_hook(
5856 DelegationConfig::MultipleDelegators,
5857 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V3,
5858 upgrade,
5859 )
5860 .await
5861 .unwrap()
5862 .build();
5863 let state = config.states()[0].clone();
5864 let mut network = TestNetwork::new(config, upgrade).await;
5865
5866 network.peers[0].shut_down().await;
5871 network.peers.remove(0);
5872 let node_0_storage = &storage[1];
5873 let node_0_persistence = persistence[1].clone();
5874 let node_0_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5875 tracing::info!("node_0_port {node_0_port}");
5876 let opt = Options::with_port(node_0_port).query_sql(
5878 Query {
5879 peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
5880 light_client: LightClientOptions {
5881 decaf: true,
5882 ..Default::default()
5883 },
5884 ..Default::default()
5885 },
5886 tmp_options(node_0_storage),
5887 );
5888
5889 let node_0 = opt
5891 .clone()
5892 .serve(|metrics, consumer, storage| {
5893 let cfg = network.cfg.clone();
5894 let node_0_persistence = node_0_persistence.clone();
5895 let state = state.clone();
5896 async move {
5897 Ok(cfg
5898 .init_node(
5899 1,
5900 state,
5901 node_0_persistence.clone(),
5902 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5903 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5904 Default::default(),
5905 Duration::from_secs(2),
5906 &NoMetrics,
5907 )),
5908 storage,
5909 &*metrics,
5910 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5911 consumer,
5912 upgrade,
5913 Default::default(),
5914 )
5915 .await)
5916 }
5917 .boxed()
5918 })
5919 .await
5920 .unwrap();
5921
5922 let mut events = network.peers[2].event_stream();
5923 wait_for_epochs(&mut events, EPOCH_HEIGHT, 1).await;
5925
5926 drop(node_0);
5928
5929 wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
5931
5932 tracing::info!("restarting node");
5934 let node_0 = opt
5935 .serve(|metrics, consumer, storage| {
5936 let cfg = network.cfg.clone();
5937 async move {
5938 Ok(cfg
5939 .init_node(
5940 1,
5941 state,
5942 node_0_persistence,
5943 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5944 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5945 Default::default(),
5946 Duration::from_secs(2),
5947 &NoMetrics,
5948 )),
5949 storage,
5950 &*metrics,
5951 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5952 consumer,
5953 upgrade,
5954 Default::default(),
5955 )
5956 .await)
5957 }
5958 .boxed()
5959 })
5960 .await
5961 .unwrap();
5962
5963 let client: Client<ServerError, SequencerApiVersion> =
5964 Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
5965 client.connect(None).await;
5966
5967 wait_for_epochs(&mut events, EPOCH_HEIGHT, 6).await;
5968
5969 let epoch_7_block = EPOCH_HEIGHT * 6 + 1;
5970
5971 let mut retries = 0;
5973 loop {
5974 sleep(Duration::from_secs(1)).await;
5975 let state = node_0.decided_state().await.unwrap();
5976
5977 let leaves = if upgrade.base == EPOCH_VERSION {
5978 state.reward_merkle_tree_v1.num_leaves()
5980 } else {
5981 state.reward_merkle_tree_v2.num_leaves()
5983 };
5984
5985 if leaves > 0 {
5986 tracing::info!("Node's state has reward accounts");
5987 break;
5988 }
5989
5990 retries += 1;
5991 if retries > 120 {
5992 panic!("max retries reached. failed to catchup reward state");
5993 }
5994 }
5995
5996 retries = 0;
5997 loop {
5999 sleep(Duration::from_secs(3)).await;
6000
6001 let bh = client
6002 .get::<u64>("block-state/block-height")
6003 .send()
6004 .await
6005 .expect("block height not found");
6006
6007 tracing::info!("block state: block height={bh}");
6008 if bh > epoch_7_block {
6009 break;
6010 }
6011
6012 retries += 1;
6013 if retries > 30 {
6014 panic!(
6015 "max retries reached. block state block height is less than epoch 7 start \
6016 block"
6017 );
6018 }
6019 }
6020
6021 node_0.shutdown_consensus().await;
6023 let decided_leaf = node_0.decided_leaf().await;
6024 let state = node_0.decided_state().await.unwrap();
6025 tracing::info!(
6026 height = decided_leaf.height(),
6027 ?decided_leaf,
6028 ?state,
6029 "final state"
6030 );
6031
6032 let height = decided_leaf.height();
6033 let num_leaves = state.block_merkle_tree.num_leaves();
6034 tracing::info!(height, num_leaves, "checking block merkle tree state");
6035 state
6036 .block_merkle_tree
6037 .lookup(height - 1)
6038 .expect_ok()
6039 .unwrap_or_else(|err| {
6040 panic!(
6041 "block state not found ({err:#}):\n{:#?}",
6042 state.block_merkle_tree
6043 )
6044 });
6045
6046 Ok(())
6047 }
6048
6049 #[rstest]
6050 #[case(POS_V3)]
6051 #[case(POS_V4)]
6052 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6053 async fn test_state_reconstruction(#[case] upgrade: Upgrade) -> anyhow::Result<()> {
6054 const EPOCH_HEIGHT: u64 = 10;
6070
6071 let network_config = TestConfigBuilder::default()
6072 .epoch_height(EPOCH_HEIGHT)
6073 .build();
6074
6075 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6076
6077 tracing::info!("API PORT = {api_port}");
6078 const NUM_NODES: usize = 5;
6079
6080 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6081 let persistence: [_; NUM_NODES] = storage
6082 .iter()
6083 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6084 .collect::<Vec<_>>()
6085 .try_into()
6086 .unwrap();
6087
6088 let config = TestNetworkConfigBuilder::with_num_nodes()
6089 .api_config(SqlDataSource::options(
6090 &storage[0],
6091 Options::with_port(api_port).light_client(Default::default()),
6092 ))
6093 .network_config(network_config)
6094 .persistences(persistence.clone())
6095 .catchups(std::array::from_fn(|_| {
6096 StatePeers::<StaticVersion<0, 1>>::from_urls(
6097 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6098 Default::default(),
6099 Duration::from_secs(2),
6100 &NoMetrics,
6101 )
6102 }))
6103 .pos_hook(
6104 DelegationConfig::MultipleDelegators,
6105 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V3,
6106 upgrade,
6107 )
6108 .await
6109 .unwrap()
6110 .build();
6111 let state = config.states()[0].clone();
6112 let mut network = TestNetwork::new(config, upgrade).await;
6113 network.peers.remove(0);
6118
6119 let node_0_storage = &storage[1];
6120 let node_0_persistence = persistence[1].clone();
6121 let node_0_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6122 tracing::info!("node_0_port {node_0_port}");
6123 let opt = Options::with_port(node_0_port).query_sql(
6124 Query {
6125 peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
6126 ..Query::test()
6127 },
6128 tmp_options(node_0_storage),
6129 );
6130 let node_0 = opt
6131 .clone()
6132 .serve(|metrics, consumer, storage| {
6133 let cfg = network.cfg.clone();
6134 let node_0_persistence = node_0_persistence.clone();
6135 let state = state.clone();
6136 async move {
6137 Ok(cfg
6138 .init_node(
6139 1,
6140 state,
6141 node_0_persistence.clone(),
6142 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
6143 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6144 Default::default(),
6145 Duration::from_secs(2),
6146 &NoMetrics,
6147 )),
6148 storage,
6149 &*metrics,
6150 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
6151 consumer,
6152 upgrade,
6153 Default::default(),
6154 )
6155 .await)
6156 }
6157 .boxed()
6158 })
6159 .await
6160 .unwrap();
6161
6162 let mut events = network.peers[2].event_stream();
6163 wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
6165
6166 tracing::warn!("shutting down node 0");
6167
6168 node_0.shutdown_consensus().await;
6169
6170 let instance = node_0.node_state();
6171 let state = node_0.decided_state().await.unwrap();
6172 let fee_accounts = state
6173 .fee_merkle_tree
6174 .clone()
6175 .into_iter()
6176 .map(|(acct, _)| acct)
6177 .collect::<Vec<_>>();
6178 let reward_accounts = match upgrade.base {
6179 EPOCH_VERSION => state
6180 .reward_merkle_tree_v1
6181 .clone()
6182 .into_iter()
6183 .map(|(acct, _)| RewardAccountV2::from(acct))
6184 .collect::<Vec<_>>(),
6185 DRB_AND_HEADER_UPGRADE_VERSION => state
6186 .reward_merkle_tree_v2
6187 .clone()
6188 .into_iter()
6189 .map(|(acct, _)| acct)
6190 .collect::<Vec<_>>(),
6191 _ => panic!("invalid version"),
6192 };
6193
6194 let client: Client<ServerError, SequencerApiVersion> =
6195 Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
6196 client.connect(Some(Duration::from_secs(10))).await;
6197
6198 sleep(Duration::from_secs(3)).await;
6201
6202 tracing::info!("getting node block height");
6203 let node_block_height = client
6204 .get::<u64>("node/block-height")
6205 .send()
6206 .await
6207 .context("getting Espresso block height")
6208 .unwrap();
6209
6210 tracing::info!("node block height={node_block_height}");
6211
6212 let leaf_query_data = client
6213 .get::<LeafQueryData<SeqTypes>>(&format!("availability/leaf/{}", node_block_height - 1))
6214 .send()
6215 .await
6216 .context("error getting leaf")
6217 .unwrap();
6218
6219 tracing::info!("leaf={leaf_query_data:?}");
6220 let leaf = leaf_query_data.leaf();
6221 let to_view = leaf.view_number() + 1;
6222
6223 let ds = SqlStorage::connect(
6224 Config::try_from(&node_0_persistence).unwrap(),
6225 StorageConnectionType::Sequencer,
6226 )
6227 .await
6228 .unwrap();
6229 let mut tx = ds.read().await?;
6230
6231 let (state, leaf) = reconstruct_state(
6232 &instance,
6233 &ds,
6234 &mut tx,
6235 node_block_height - 1,
6236 to_view,
6237 &[],
6238 &[],
6239 )
6240 .await
6241 .unwrap();
6242 assert_eq!(leaf.view_number(), to_view);
6243 assert!(
6244 state
6245 .block_merkle_tree
6246 .lookup(node_block_height - 1)
6247 .expect_ok()
6248 .is_ok(),
6249 "inconsistent block merkle tree"
6250 );
6251
6252 let (state, leaf) = reconstruct_state(
6254 &instance,
6255 &ds,
6256 &mut tx,
6257 node_block_height - 1,
6258 to_view,
6259 &fee_accounts,
6260 &[],
6261 )
6262 .await
6263 .unwrap();
6264
6265 assert_eq!(leaf.view_number(), to_view);
6266 assert!(
6267 state
6268 .block_merkle_tree
6269 .lookup(node_block_height - 1)
6270 .expect_ok()
6271 .is_ok(),
6272 "inconsistent block merkle tree"
6273 );
6274
6275 for account in &fee_accounts {
6276 state.fee_merkle_tree.lookup(account).expect_ok().unwrap();
6277 }
6278
6279 let (state, leaf) = reconstruct_state(
6282 &instance,
6283 &ds,
6284 &mut tx,
6285 node_block_height - 1,
6286 to_view,
6287 &[],
6288 &reward_accounts,
6289 )
6290 .await
6291 .unwrap();
6292
6293 match upgrade.base {
6294 EPOCH_VERSION => {
6295 for account in reward_accounts.clone() {
6296 state
6297 .reward_merkle_tree_v1
6298 .lookup(RewardAccountV1::from(account))
6299 .expect_ok()
6300 .unwrap();
6301 }
6302 },
6303 DRB_AND_HEADER_UPGRADE_VERSION => {
6304 for account in &reward_accounts {
6305 state
6306 .reward_merkle_tree_v2
6307 .lookup(account)
6308 .expect_ok()
6309 .unwrap();
6310 }
6311 },
6312 _ => panic!("invalid version"),
6313 };
6314
6315 assert_eq!(leaf.view_number(), to_view);
6316 assert!(
6317 state
6318 .block_merkle_tree
6319 .lookup(node_block_height - 1)
6320 .expect_ok()
6321 .is_ok(),
6322 "inconsistent block merkle tree"
6323 );
6324 let (state, leaf) = reconstruct_state(
6327 &instance,
6328 &ds,
6329 &mut tx,
6330 node_block_height - 1,
6331 to_view,
6332 &fee_accounts,
6333 &reward_accounts,
6334 )
6335 .await
6336 .unwrap();
6337
6338 assert!(
6339 state
6340 .block_merkle_tree
6341 .lookup(node_block_height - 1)
6342 .expect_ok()
6343 .is_ok(),
6344 "inconsistent block merkle tree"
6345 );
6346 assert_eq!(leaf.view_number(), to_view);
6347
6348 match upgrade.base {
6349 EPOCH_VERSION => {
6350 for account in reward_accounts.clone() {
6351 state
6352 .reward_merkle_tree_v1
6353 .lookup(RewardAccountV1::from(account))
6354 .expect_ok()
6355 .unwrap();
6356 }
6357 },
6358 DRB_AND_HEADER_UPGRADE_VERSION => {
6359 for account in &reward_accounts {
6360 state
6361 .reward_merkle_tree_v2
6362 .lookup(account)
6363 .expect_ok()
6364 .unwrap();
6365 }
6366 },
6367 _ => panic!("invalid version"),
6368 };
6369
6370 for account in &fee_accounts {
6371 state.fee_merkle_tree.lookup(account).expect_ok().unwrap();
6372 }
6373
6374 Ok(())
6375 }
6376
6377 #[rstest]
6378 #[case(POS_V3)]
6379 #[case(POS_V4)]
6380 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6381 async fn test_block_reward_api(#[case] upgrade: Upgrade) -> anyhow::Result<()> {
6382 let epoch_height = 10;
6383
6384 let network_config = TestConfigBuilder::default()
6385 .epoch_height(epoch_height)
6386 .build();
6387
6388 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6389
6390 const NUM_NODES: usize = 1;
6391 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6393 let persistence: [_; NUM_NODES] = storage
6394 .iter()
6395 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6396 .collect::<Vec<_>>()
6397 .try_into()
6398 .unwrap();
6399
6400 let config = TestNetworkConfigBuilder::with_num_nodes()
6401 .api_config(SqlDataSource::options(
6402 &storage[0],
6403 Options::with_port(api_port),
6404 ))
6405 .network_config(network_config.clone())
6406 .persistences(persistence.clone())
6407 .catchups(std::array::from_fn(|_| {
6408 StatePeers::<StaticVersion<0, 1>>::from_urls(
6409 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6410 Default::default(),
6411 Duration::from_secs(2),
6412 &NoMetrics,
6413 )
6414 }))
6415 .pos_hook(
6416 DelegationConfig::VariableAmounts,
6417 Default::default(),
6418 upgrade,
6419 )
6420 .await
6421 .unwrap()
6422 .build();
6423
6424 let _network = TestNetwork::new(config, upgrade).await;
6425 let client: Client<ServerError, SequencerApiVersion> =
6426 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6427
6428 let _blocks = client
6429 .socket("availability/stream/blocks/0")
6430 .subscribe::<BlockQueryData<SeqTypes>>()
6431 .await
6432 .unwrap()
6433 .take(3)
6434 .try_collect::<Vec<_>>()
6435 .await
6436 .unwrap();
6437
6438 let block_reward = client
6439 .get::<Option<RewardAmount>>("node/block-reward")
6440 .send()
6441 .await
6442 .expect("failed to get block reward")
6443 .expect("block reward is None");
6444 tracing::info!("block_reward={block_reward:?}");
6445
6446 assert!(block_reward.0 > U256::ZERO);
6447
6448 Ok(())
6449 }
6450
6451 #[rstest]
6453 #[case(POS_V4, None)]
6454 #[case(POS_V4, Some(1u64))]
6455 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6456 async fn test_token_supply_api(
6457 #[case] upgrade: Upgrade,
6458 #[case] chain_id: Option<u64>,
6459 ) -> anyhow::Result<()> {
6460 use alloy::primitives::utils::parse_ether;
6461 use espresso_types::v0_3::ChainConfig;
6462
6463 let epoch_height = 10;
6464 let network_config = TestConfigBuilder::default()
6465 .epoch_height(epoch_height)
6466 .build();
6467
6468 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6469
6470 const NUM_NODES: usize = 1;
6471 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6472 let persistence: [_; NUM_NODES] = storage
6473 .iter()
6474 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6475 .collect::<Vec<_>>()
6476 .try_into()
6477 .unwrap();
6478
6479 let initial_supply_tokens = U256::from(3_590_000_000u64);
6482 let initial_supply_wei = parse_ether("3590000000").unwrap();
6483
6484 let mut builder = TestNetworkConfigBuilder::with_num_nodes()
6485 .api_config(SqlDataSource::options(
6486 &storage[0],
6487 Options::with_port(api_port),
6488 ))
6489 .network_config(network_config.clone())
6490 .persistences(persistence.clone())
6491 .catchups(std::array::from_fn(|_| {
6492 StatePeers::<StaticVersion<0, 1>>::from_urls(
6493 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6494 Default::default(),
6495 Duration::from_secs(2),
6496 &NoMetrics,
6497 )
6498 }))
6499 .initial_token_supply(initial_supply_tokens);
6500
6501 if let Some(id) = chain_id {
6503 let state = ValidatedState {
6504 chain_config: ChainConfig {
6505 chain_id: U256::from(id).into(),
6506 ..Default::default()
6507 }
6508 .into(),
6509 ..Default::default()
6510 };
6511 builder = builder.states(std::array::from_fn(|_| state.clone()));
6512 }
6513
6514 let config = builder
6515 .pos_hook(
6516 DelegationConfig::VariableAmounts,
6517 Default::default(),
6518 upgrade,
6519 )
6520 .await
6521 .unwrap()
6522 .build();
6523
6524 let _network = TestNetwork::new(config, upgrade).await;
6525 let client: Client<ServerError, SequencerApiVersion> =
6526 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6527
6528 let _blocks = client
6529 .socket("availability/stream/blocks/0")
6530 .subscribe::<BlockQueryData<SeqTypes>>()
6531 .await
6532 .unwrap()
6533 .take(3)
6534 .try_collect::<Vec<_>>()
6535 .await
6536 .unwrap();
6537
6538 let minted: String = client
6539 .get("token/total-minted-supply")
6540 .send()
6541 .await
6542 .expect("total-minted-supply");
6543 let circ_eth: String = client
6544 .get("token/circulating-supply-ethereum")
6545 .send()
6546 .await
6547 .expect("circulating-supply-ethereum");
6548 let circulating: String = client
6549 .get("token/circulating-supply")
6550 .send()
6551 .await
6552 .expect("circulating-supply");
6553 tracing::info!(%minted, %circ_eth, %circulating);
6554
6555 let minted = parse_ether(&minted)?;
6556 let circ_eth = parse_ether(&circ_eth)?;
6557 let circ = parse_ether(&circulating)?;
6558
6559 assert_eq!(minted, initial_supply_wei);
6560 assert!(circ_eth <= minted);
6561 assert!(circ >= circ_eth);
6562 assert!(circ > U256::ZERO);
6563
6564 if chain_id == Some(1) {
6565 assert!(circ_eth < minted);
6568 }
6569
6570 Ok(())
6571 }
6572
6573 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6574 async fn test_scanning_token_contract_initialized_event() -> anyhow::Result<()> {
6575 use espresso_types::v0_3::ChainConfig;
6576
6577 let blocks_per_epoch = 10;
6578
6579 let network_config = TestConfigBuilder::<1>::default()
6580 .epoch_height(blocks_per_epoch)
6581 .build();
6582
6583 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
6584 &network_config.hotshot_config().hotshot_stake_table(),
6585 STAKE_TABLE_CAPACITY_FOR_TEST,
6586 )
6587 .unwrap();
6588
6589 let deployer = ProviderBuilder::new()
6590 .wallet(EthereumWallet::from(network_config.signer().clone()))
6591 .connect_http(network_config.l1_url().clone());
6592
6593 let mut contracts = Contracts::new();
6594 let args = DeployerArgsBuilder::default()
6595 .deployer(deployer.clone())
6596 .rpc_url(network_config.l1_url().clone())
6597 .mock_light_client(true)
6598 .genesis_lc_state(genesis_state)
6599 .genesis_st_state(genesis_stake)
6600 .blocks_per_epoch(blocks_per_epoch)
6601 .epoch_start_block(1)
6602 .multisig_pauser(network_config.signer().address())
6603 .token_name("Espresso".to_string())
6604 .token_symbol("ESP".to_string())
6605 .initial_token_supply(U256::from(3590000000u64))
6606 .ops_timelock_delay(U256::from(0))
6607 .ops_timelock_admin(network_config.signer().address())
6608 .ops_timelock_proposers(vec![network_config.signer().address()])
6609 .ops_timelock_executors(vec![network_config.signer().address()])
6610 .safe_exit_timelock_delay(U256::from(0))
6611 .safe_exit_timelock_admin(network_config.signer().address())
6612 .safe_exit_timelock_proposers(vec![network_config.signer().address()])
6613 .safe_exit_timelock_executors(vec![network_config.signer().address()])
6614 .build()
6615 .unwrap();
6616
6617 args.deploy_to_stake_table_v3(&mut contracts).await.unwrap();
6618
6619 let st_addr = contracts
6620 .address(Contract::StakeTableProxy)
6621 .expect("StakeTableProxy deployed");
6622
6623 let l1_url = network_config.l1_url().clone();
6624
6625 let storage = SqlDataSource::create_storage().await;
6626 let mut opt = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
6627 let persistence = opt.create().await.unwrap();
6628
6629 let l1_client = L1ClientOptions {
6630 stake_table_update_interval: Duration::from_secs(7),
6631 l1_retry_delay: Duration::from_millis(10),
6632 l1_events_max_block_range: 10000,
6633 ..Default::default()
6634 }
6635 .connect(vec![l1_url])
6636 .unwrap();
6637 l1_client.spawn_tasks().await;
6638
6639 let fetcher = Fetcher::new(
6640 Arc::new(NullStateCatchup::default()),
6641 Arc::new(Mutex::new(persistence.clone())),
6642 l1_client.clone(),
6643 ChainConfig {
6644 stake_table_contract: Some(st_addr),
6645 base_fee: 0.into(),
6646 ..Default::default()
6647 },
6648 );
6649
6650 let provider = l1_client.provider;
6651 let stake_table = StakeTableV3::new(st_addr, provider.clone());
6652
6653 let stake_table_init_block = stake_table
6654 .initializedAtBlock()
6655 .block(BlockId::finalized())
6656 .call()
6657 .await?
6658 .to::<u64>();
6659
6660 tracing::info!("stake table init block = {stake_table_init_block}");
6661
6662 let token_address = stake_table
6663 .token()
6664 .block(BlockId::finalized())
6665 .call()
6666 .await
6667 .context("Failed to get token address")?;
6668
6669 let token = EspToken::new(token_address, provider.clone());
6670
6671 let init_log = fetcher
6672 .scan_token_contract_initialized_event_log(stake_table_init_block, token.clone())
6673 .await
6674 .unwrap();
6675
6676 let init_block = init_log.block_number.context("missing block number")?;
6677 let init_tx_hash = init_log
6678 .transaction_hash
6679 .context("missing transaction hash")?;
6680
6681 let transfer_logs = token
6682 .Transfer_filter()
6683 .from_block(init_block)
6684 .to_block(init_block)
6685 .query()
6686 .await
6687 .unwrap();
6688
6689 let (mint_transfer, _) = transfer_logs
6690 .iter()
6691 .find(|(transfer, log)| {
6692 log.transaction_hash == Some(init_tx_hash) && transfer.from == Address::ZERO
6693 })
6694 .context("no mint transfer event in init tx")?;
6695
6696 assert!(mint_transfer.value > U256::ZERO);
6697
6698 Ok(())
6699 }
6700
6701 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6702 async fn test_tx_metadata() {
6703 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6704
6705 let url = format!("http://localhost:{port}").parse().unwrap();
6706 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
6707
6708 let storage = SqlDataSource::create_storage().await;
6709 let network_config = TestConfigBuilder::default().build();
6710 let config = TestNetworkConfigBuilder::default()
6711 .api_config(
6712 SqlDataSource::options(&storage, Options::with_port(port))
6713 .submit(Default::default())
6714 .explorer(Default::default()),
6715 )
6716 .network_config(network_config)
6717 .build();
6718 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
6719 let mut events = network.server.event_stream();
6720
6721 client.connect(None).await;
6722
6723 let namespace_counts = [(101, 1), (102, 2), (103, 3)];
6725 for (ns, count) in &namespace_counts {
6726 for i in 0..*count {
6727 let ns_id = NamespaceId::from(*ns as u64);
6728 let txn = Transaction::new(ns_id, vec![*ns, i]);
6729 client
6730 .post::<()>("submit/submit")
6731 .body_json(&txn)
6732 .unwrap()
6733 .send()
6734 .await
6735 .unwrap();
6736 let (block, _) = wait_for_decide_on_handle(&mut events, &txn).await;
6737
6738 let summary: BlockSummaryQueryData<SeqTypes> = client
6740 .get(&format!("availability/block/summary/{block}"))
6741 .send()
6742 .await
6743 .unwrap();
6744 let ns_info = summary.namespaces();
6745 assert_eq!(ns_info.len(), 1);
6746 assert_eq!(ns_info.keys().copied().collect::<Vec<_>>(), vec![ns_id]);
6747 assert_eq!(ns_info[&ns_id].num_transactions, 1);
6748 assert_eq!(ns_info[&ns_id].size, txn.size_in_block(true));
6749 }
6750 }
6751
6752 for (ns, count) in &namespace_counts {
6754 tracing::info!(ns, "list transactions in namespace");
6755
6756 let ns_id = NamespaceId::from(*ns as u64);
6757 let summaries: TransactionSummariesResponse<SeqTypes> = client
6758 .get(&format!(
6759 "explorer/transactions/latest/{count}/namespace/{ns_id}"
6760 ))
6761 .send()
6762 .await
6763 .unwrap();
6764 let txs = summaries.transaction_summaries;
6765 assert_eq!(txs.len(), *count as usize);
6766
6767 for i in 0..*count {
6769 let summary = &txs[i as usize];
6770 let expected = Transaction::new(ns_id, vec![*ns, count - i - 1]);
6771 assert_eq!(summary.rollups, vec![ns_id]);
6772 assert_eq!(summary.hash, expected.commit());
6773 }
6774 }
6775 }
6776
6777 use std::time::Instant;
6778
6779 use rand::thread_rng;
6780
6781 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6782 async fn test_aggregator_namespace_endpoints() {
6783 let mut rng = thread_rng();
6784
6785 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6786
6787 let url = format!("http://localhost:{port}").parse().unwrap();
6788 tracing::info!("Sequencer URL = {url}");
6789 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
6790
6791 let options = Options::with_port(port).submit(Default::default());
6792 const NUM_NODES: usize = 2;
6793 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6795
6796 let persistence_options: [_; NUM_NODES] = storage
6797 .iter()
6798 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6799 .collect::<Vec<_>>()
6800 .try_into()
6801 .unwrap();
6802
6803 let network_config = TestConfigBuilder::default().build();
6804
6805 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
6806 .api_config(SqlDataSource::options(&storage[0], options))
6807 .network_config(network_config)
6808 .persistences(persistence_options.clone())
6809 .build();
6810 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
6811 let mut events = network.server.event_stream();
6812 let start = Instant::now();
6813 let mut total_transactions = 0;
6814 let mut tx_heights = Vec::new();
6815 let mut sizes = HashMap::new();
6816 for namespace in 1..=4 {
6819 for _count in 0..namespace {
6820 let payload_len = rng.gen_range(4..=10);
6822 let payload: Vec<u8> = (0..payload_len).map(|_| rng.r#gen()).collect();
6823
6824 let txn = Transaction::new(NamespaceId::from(namespace as u32), payload);
6825
6826 client.connect(None).await;
6827
6828 let hash = client
6829 .post("submit/submit")
6830 .body_json(&txn)
6831 .unwrap()
6832 .send()
6833 .await
6834 .unwrap();
6835 assert_eq!(txn.commit(), hash);
6836
6837 let (height, size) = wait_for_decide_on_handle(&mut events, &txn).await;
6839 tx_heights.push(height);
6840 total_transactions += 1;
6841 *sizes.entry(namespace).or_insert(0) += size;
6842 }
6843 }
6844
6845 let duration = start.elapsed();
6846
6847 println!("Time elapsed to submit transactions: {duration:?}");
6848
6849 let last_tx_height = tx_heights.last().unwrap();
6850
6851 let aggregator_deadline = Instant::now() + Duration::from_secs(30);
6856 loop {
6857 let count = client
6858 .get::<u64>(&format!("node/transactions/count/{last_tx_height}"))
6859 .send()
6860 .await
6861 .ok();
6862 if count == Some(total_transactions) {
6863 break;
6864 }
6865 assert!(
6866 Instant::now() < aggregator_deadline,
6867 "aggregator did not catch up to height {last_tx_height} (got {count:?}, expected \
6868 {total_transactions})"
6869 );
6870 sleep(Duration::from_secs(1)).await;
6871 }
6872
6873 for namespace in 1..=4 {
6874 let count = client
6875 .get::<u64>(&format!("node/transactions/count/namespace/{namespace}"))
6876 .send()
6877 .await
6878 .unwrap();
6879 assert_eq!(
6880 count, namespace as u64,
6881 "Incorrect transaction count for namespace {namespace}: expected {namespace}, got \
6882 {count}"
6883 );
6884
6885 let to_endpoint_count = client
6887 .get::<u64>(&format!(
6888 "node/transactions/count/namespace/{namespace}/{last_tx_height}"
6889 ))
6890 .send()
6891 .await
6892 .unwrap();
6893 assert_eq!(
6894 to_endpoint_count, namespace as u64,
6895 "Incorrect transaction count for range endpoint (to only) for namespace \
6896 {namespace}: expected {namespace}, got {to_endpoint_count}"
6897 );
6898
6899 let from_to_endpoint_count = client
6901 .get::<u64>(&format!(
6902 "node/transactions/count/namespace/{namespace}/0/{last_tx_height}"
6903 ))
6904 .send()
6905 .await
6906 .unwrap();
6907 assert_eq!(
6908 from_to_endpoint_count, namespace as u64,
6909 "Incorrect transaction count for range endpoint (from-to) for namespace \
6910 {namespace}: expected {namespace}, got {from_to_endpoint_count}"
6911 );
6912
6913 let ns_size = client
6914 .get::<usize>(&format!("node/payloads/size/namespace/{namespace}"))
6915 .send()
6916 .await
6917 .unwrap();
6918
6919 let expected_ns_size = *sizes.get(&namespace).unwrap();
6920 assert_eq!(
6921 ns_size, expected_ns_size,
6922 "Incorrect payload size for namespace {namespace}: expected {expected_ns_size}, \
6923 got {ns_size}"
6924 );
6925
6926 let ns_size_to = client
6927 .get::<usize>(&format!(
6928 "node/payloads/size/namespace/{namespace}/{last_tx_height}"
6929 ))
6930 .send()
6931 .await
6932 .unwrap();
6933 assert_eq!(
6934 ns_size_to, expected_ns_size,
6935 "Incorrect payload size for namespace {namespace} up to height {last_tx_height}: \
6936 expected {expected_ns_size}, got {ns_size_to}"
6937 );
6938
6939 let ns_size_from_to = client
6940 .get::<usize>(&format!(
6941 "node/payloads/size/namespace/{namespace}/0/{last_tx_height}"
6942 ))
6943 .send()
6944 .await
6945 .unwrap();
6946 assert_eq!(
6947 ns_size_from_to, expected_ns_size,
6948 "Incorrect payload size for namespace {namespace} from 0 to height \
6949 {last_tx_height}: expected {expected_ns_size}, got {ns_size_from_to}"
6950 );
6951 }
6952
6953 let total_tx_count = client
6954 .get::<u64>("node/transactions/count")
6955 .send()
6956 .await
6957 .unwrap();
6958 assert_eq!(
6959 total_tx_count, total_transactions,
6960 "Incorrect total transaction count: expected {total_transactions}, got \
6961 {total_tx_count}"
6962 );
6963
6964 let total_payload_size = client
6965 .get::<usize>("node/payloads/size")
6966 .send()
6967 .await
6968 .unwrap();
6969
6970 let expected_total_size: usize = sizes.values().copied().sum();
6971 assert_eq!(
6972 total_payload_size, expected_total_size,
6973 "Incorrect total payload size: expected {expected_total_size}, got \
6974 {total_payload_size}"
6975 );
6976 }
6977
6978 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6979 async fn test_stream_transactions_endpoint() {
6980 let mut rng = thread_rng();
6986
6987 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6988
6989 let url = format!("http://localhost:{port}").parse().unwrap();
6990 tracing::info!("Sequencer URL = {url}");
6991 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
6992
6993 let options = Options::with_port(port).submit(Default::default());
6994 const NUM_NODES: usize = 2;
6995 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6997
6998 let persistence_options: [_; NUM_NODES] = storage
6999 .iter()
7000 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7001 .collect::<Vec<_>>()
7002 .try_into()
7003 .unwrap();
7004
7005 let network_config = TestConfigBuilder::default().build();
7006
7007 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
7008 .api_config(SqlDataSource::options(&storage[0], options))
7009 .network_config(network_config)
7010 .persistences(persistence_options.clone())
7011 .build();
7012 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
7013 let mut events = network.server.event_stream();
7014 let mut all_transactions = HashMap::new();
7015 let mut namespace_tx: HashMap<_, HashSet<_>> = HashMap::new();
7016
7017 for namespace in 1..=4 {
7020 for _count in 0..namespace {
7021 let payload_len = rng.gen_range(4..=10);
7022 let payload: Vec<u8> = (0..payload_len).map(|_| rng.r#gen()).collect();
7023
7024 let txn = Transaction::new(NamespaceId::from(namespace as u32), payload);
7025
7026 client.connect(None).await;
7027
7028 let hash = client
7029 .post("submit/submit")
7030 .body_json(&txn)
7031 .unwrap()
7032 .send()
7033 .await
7034 .unwrap();
7035 assert_eq!(txn.commit(), hash);
7036
7037 wait_for_decide_on_handle(&mut events, &txn).await;
7039 all_transactions.insert(txn.commit(), txn.clone());
7042 namespace_tx.entry(namespace).or_default().insert(txn);
7043 }
7044 }
7045
7046 let mut transactions = client
7047 .socket("availability/stream/transactions/0")
7048 .subscribe::<TransactionQueryData<SeqTypes>>()
7049 .await
7050 .expect("failed to subscribe to transactions endpoint");
7051
7052 let mut count = 0;
7053 while let Some(tx) = transactions.next().await {
7054 let tx = tx.unwrap();
7055 let expected = all_transactions
7056 .get(&tx.transaction().commit())
7057 .expect("txn not found ");
7058 assert_eq!(tx.transaction(), expected, "invalid transaction");
7059 count += 1;
7060
7061 if count == all_transactions.len() {
7062 break;
7063 }
7064 }
7065
7066 for (namespace, expected_ns_txns) in &namespace_tx {
7069 let mut api_namespace_txns = client
7070 .socket(&format!(
7071 "availability/stream/transactions/0/namespace/{namespace}",
7072 ))
7073 .subscribe::<TransactionQueryData<SeqTypes>>()
7074 .await
7075 .unwrap_or_else(|_| {
7076 panic!("failed to subscribe to transactions namespace {namespace}")
7077 });
7078
7079 let mut received = HashSet::new();
7080
7081 while let Some(res) = api_namespace_txns.next().await {
7082 let tx = res.expect("stream error");
7083 received.insert(tx.transaction().clone());
7084
7085 if received.len() == expected_ns_txns.len() {
7086 break;
7087 }
7088 }
7089
7090 assert_eq!(
7091 received, *expected_ns_txns,
7092 "Mismatched transactions for namespace {namespace}"
7093 );
7094 }
7095 }
7096
7097 #[rstest]
7098 #[case(POS_V3)]
7099 #[case(POS_V4)]
7100 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7101 async fn test_v3_and_v4_reward_tree_updates(#[case] upgrade: Upgrade) -> anyhow::Result<()> {
7102 const EPOCH_HEIGHT: u64 = 10;
7112
7113 let network_config = TestConfigBuilder::default()
7114 .epoch_height(EPOCH_HEIGHT)
7115 .build();
7116
7117 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7118
7119 tracing::info!("API PORT = {api_port}");
7120 const NUM_NODES: usize = 5;
7121
7122 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7123 let persistence: [_; NUM_NODES] = storage
7124 .iter()
7125 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7126 .collect::<Vec<_>>()
7127 .try_into()
7128 .unwrap();
7129
7130 let config = TestNetworkConfigBuilder::with_num_nodes()
7131 .api_config(SqlDataSource::options(
7132 &storage[0],
7133 Options::with_port(api_port).catchup(Default::default()),
7134 ))
7135 .network_config(network_config)
7136 .persistences(persistence.clone())
7137 .catchups(std::array::from_fn(|_| {
7138 StatePeers::<StaticVersion<0, 1>>::from_urls(
7139 vec![format!("http://localhost:{api_port}").parse().unwrap()],
7140 Default::default(),
7141 Duration::from_secs(2),
7142 &NoMetrics,
7143 )
7144 }))
7145 .pos_hook(
7146 DelegationConfig::MultipleDelegators,
7147 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V3,
7148 upgrade,
7149 )
7150 .await
7151 .unwrap()
7152 .build();
7153 let mut network = TestNetwork::new(config, upgrade).await;
7154
7155 let mut events = network.peers[2].event_stream();
7156 wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
7158
7159 let validated_state = network.server.decided_state().await.unwrap();
7160 if upgrade.base == EPOCH_VERSION {
7161 let v1_tree = &validated_state.reward_merkle_tree_v1;
7162 assert!(v1_tree.num_leaves() > 0, "v1 reward tree tree is empty");
7163 let v2_tree = &validated_state.reward_merkle_tree_v2;
7164 assert!(
7165 v2_tree.num_leaves() == 0,
7166 "v2 reward tree tree is not empty"
7167 );
7168 } else {
7169 let v1_tree = &validated_state.reward_merkle_tree_v1;
7170 assert!(
7171 v1_tree.num_leaves() == 0,
7172 "v1 reward tree tree is not empty"
7173 );
7174 let v2_tree = &validated_state.reward_merkle_tree_v2;
7175 assert!(v2_tree.num_leaves() > 0, "v2 reward tree tree is empty");
7176 }
7177
7178 network.stop_consensus().await;
7179 Ok(())
7180 }
7181
7182 #[rstest]
7183 #[case(POS_V3)]
7184 #[case(POS_V4)]
7185 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7186 pub(crate) async fn test_state_cert_query(#[case] upgrade: Upgrade) {
7187 const TEST_EPOCH_HEIGHT: u64 = 10;
7188 const TEST_EPOCHS: u64 = 5;
7189
7190 let network_config = TestConfigBuilder::default()
7191 .epoch_height(TEST_EPOCH_HEIGHT)
7192 .build();
7193
7194 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7195
7196 tracing::info!("API PORT = {api_port}");
7197 const NUM_NODES: usize = 2;
7198
7199 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7200 let persistence: [_; NUM_NODES] = storage
7201 .iter()
7202 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7203 .collect::<Vec<_>>()
7204 .try_into()
7205 .unwrap();
7206
7207 let config = TestNetworkConfigBuilder::with_num_nodes()
7208 .api_config(SqlDataSource::options(
7209 &storage[0],
7210 Options::with_port(api_port).catchup(Default::default()),
7211 ))
7212 .network_config(network_config)
7213 .persistences(persistence.clone())
7214 .catchups(std::array::from_fn(|_| {
7215 StatePeers::<StaticVersion<0, 1>>::from_urls(
7216 vec![format!("http://localhost:{api_port}").parse().unwrap()],
7217 Default::default(),
7218 Duration::from_secs(2),
7219 &NoMetrics,
7220 )
7221 }))
7222 .pos_hook(
7223 DelegationConfig::MultipleDelegators,
7224 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V3,
7225 upgrade,
7226 )
7227 .await
7228 .unwrap()
7229 .build();
7230
7231 let network = TestNetwork::new(config, upgrade).await;
7232 let mut events = network.server.event_stream();
7233
7234 loop {
7236 let event = events.next().await.unwrap();
7237 tracing::info!("Received event from handle: {event:?}");
7238
7239 if let CoordinatorEvent::LegacyEvent(Event {
7240 event: EventType::Decide { leaf_chain, .. },
7241 ..
7242 }) = event
7243 {
7244 println!(
7245 "Decide event received: {:?}",
7246 leaf_chain.first().unwrap().leaf.height()
7247 );
7248 if let Some(first_leaf) = leaf_chain.first() {
7249 let height = first_leaf.leaf.height();
7250 tracing::info!("Decide event received at height: {height}");
7251
7252 if height >= TEST_EPOCHS * TEST_EPOCH_HEIGHT {
7253 break;
7254 }
7255 }
7256 }
7257 }
7258
7259 let client: Client<ServerError, StaticVersion<0, 1>> =
7261 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
7262 client.connect(Some(Duration::from_secs(10))).await;
7263
7264 for i in 3..=TEST_EPOCHS {
7266 let state_query_data_v2 = client
7269 .get::<StateCertQueryDataV2<SeqTypes>>(&format!("availability/state-cert-v2/{i}"))
7270 .send()
7271 .await
7272 .unwrap();
7273 let state_cert_v2 = state_query_data_v2.0.clone();
7274 tracing::info!("state_cert_v2: {state_cert_v2:?}");
7275 assert_eq!(state_cert_v2.epoch.u64(), i);
7276 assert_eq!(
7277 state_cert_v2.light_client_state.block_height,
7278 i * TEST_EPOCH_HEIGHT - 5
7279 );
7280 let block_height = state_cert_v2.light_client_state.block_height;
7281
7282 let header: Header = client
7283 .get(&format!("availability/header/{block_height}"))
7284 .send()
7285 .await
7286 .unwrap();
7287
7288 if header.version() == DRB_AND_HEADER_UPGRADE_VERSION {
7290 let auth_root = state_cert_v2.auth_root;
7291 let header_auth_root = header.auth_root().unwrap();
7292 if auth_root.is_zero() || header_auth_root.is_zero() {
7293 panic!("auth root shouldn't be zero");
7294 }
7295
7296 assert_eq!(auth_root, header_auth_root, "auth root mismatch");
7297 }
7298
7299 let state_query_data_v1 = client
7301 .get::<StateCertQueryDataV1<SeqTypes>>(&format!("availability/state-cert/{i}"))
7302 .send()
7303 .await
7304 .unwrap();
7305
7306 let state_cert_v1 = state_query_data_v1.0.clone();
7307 tracing::info!("state_cert_v1: {state_cert_v1:?}");
7308 assert_eq!(state_query_data_v1, state_query_data_v2.into());
7309 }
7310 }
7311
7312 #[rstest]
7318 #[case(POS_V3)]
7319 #[case(POS_V4)]
7320 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7321 pub(crate) async fn test_state_cert_catchup(#[case] upgrade: Upgrade) {
7322 const EPOCH_HEIGHT: u64 = 10;
7323
7324 let network_config = TestConfigBuilder::default()
7325 .epoch_height(EPOCH_HEIGHT)
7326 .build();
7327
7328 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7329
7330 tracing::info!("API PORT = {api_port}");
7331 const NUM_NODES: usize = 5;
7332
7333 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7334 let persistence: [_; NUM_NODES] = storage
7335 .iter()
7336 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7337 .collect::<Vec<_>>()
7338 .try_into()
7339 .unwrap();
7340
7341 let config = TestNetworkConfigBuilder::with_num_nodes()
7342 .api_config(SqlDataSource::options(
7343 &storage[0],
7344 Options::with_port(api_port).light_client(Default::default()),
7345 ))
7346 .network_config(network_config)
7347 .persistences(persistence.clone())
7348 .catchups(std::array::from_fn(|_| {
7349 StatePeers::<StaticVersion<0, 1>>::from_urls(
7350 vec![format!("http://localhost:{api_port}").parse().unwrap()],
7351 Default::default(),
7352 Duration::from_secs(2),
7353 &NoMetrics,
7354 )
7355 }))
7356 .pos_hook(
7357 DelegationConfig::MultipleDelegators,
7358 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V3,
7359 upgrade,
7360 )
7361 .await
7362 .unwrap()
7363 .build();
7364 let state = config.states()[0].clone();
7365 let mut network = TestNetwork::new(config, upgrade).await;
7366
7367 let mut events = network.peers[2].event_stream();
7368 wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
7370
7371 network.peers.remove(0);
7376
7377 let new_storage: hotshot_query_service::data_source::sql::testing::TmpDb =
7378 SqlDataSource::create_storage().await;
7379 let new_persistence: persistence::sql::Options =
7380 <SqlDataSource as TestableSequencerDataSource>::persistence_options(&new_storage);
7381
7382 let node_0_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7383 tracing::info!("node_0_port {node_0_port}");
7384 let opt = Options::with_port(node_0_port).query_sql(
7385 Query {
7386 peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
7387 ..Query::test()
7388 },
7389 tmp_options(&new_storage),
7390 );
7391 let node_0 = opt
7392 .clone()
7393 .serve(|metrics, consumer, storage| {
7394 let cfg = network.cfg.clone();
7395 let new_persistence = new_persistence.clone();
7396 let state = state.clone();
7397 async move {
7398 Ok(cfg
7399 .init_node(
7400 1,
7401 state,
7402 new_persistence.clone(),
7403 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
7404 vec![format!("http://localhost:{api_port}").parse().unwrap()],
7405 Default::default(),
7406 Duration::from_secs(2),
7407 &NoMetrics,
7408 )),
7409 storage,
7410 &*metrics,
7411 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
7412 consumer,
7413 upgrade,
7414 Default::default(),
7415 )
7416 .await)
7417 }
7418 .boxed()
7419 })
7420 .await
7421 .unwrap();
7422
7423 let mut events = node_0.event_stream();
7424 wait_for_epochs(&mut events, EPOCH_HEIGHT, 5).await;
7426
7427 let client: Client<ServerError, StaticVersion<0, 1>> =
7428 Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
7429 client.connect(Some(Duration::from_secs(60))).await;
7430
7431 for epoch in 3..=5 {
7432 let state_cert = client
7433 .get::<StateCertQueryDataV2<SeqTypes>>(&format!(
7434 "availability/state-cert-v2/{epoch}"
7435 ))
7436 .send()
7437 .await
7438 .unwrap();
7439 assert_eq!(state_cert.0.epoch.u64(), epoch);
7440 }
7441 }
7442
7443 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7444 async fn test_integration_commission_updates() -> anyhow::Result<()> {
7445 const NUM_NODES: usize = 3;
7446 const EPOCH_HEIGHT: u64 = 10;
7447
7448 let versions = POS_V4;
7450
7451 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7452
7453 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7455 let persistence: [_; NUM_NODES] = storage
7456 .iter()
7457 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7458 .collect::<Vec<_>>()
7459 .try_into()
7460 .unwrap();
7461
7462 let network_config = TestConfigBuilder::default()
7464 .epoch_height(EPOCH_HEIGHT)
7465 .build();
7466
7467 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
7469 .api_config(SqlDataSource::options(
7470 &storage[0],
7471 Options::with_port(api_port),
7472 ))
7473 .network_config(network_config.clone())
7474 .persistences(persistence.clone())
7475 .catchups(std::array::from_fn(|_| {
7476 StatePeers::<SequencerApiVersion>::from_urls(
7477 vec![format!("http://localhost:{api_port}").parse().unwrap()],
7478 Default::default(),
7479 Duration::from_secs(2),
7480 &NoMetrics,
7481 )
7482 }))
7483 .pos_hook(
7484 DelegationConfig::NoSelfDelegation,
7486 StakeTableContractVersion::V1, POS_V4,
7488 )
7489 .await
7490 .unwrap()
7491 .build();
7492
7493 let network = TestNetwork::new(config, versions).await;
7494 let provider = network.cfg.anvil().unwrap();
7495 let deployer_addr = network.cfg.signer().address();
7496 let mut contracts = network.contracts.unwrap();
7497 let st_addr = contracts.address(Contract::StakeTableProxy).unwrap();
7498 upgrade_stake_table_v2(
7499 provider,
7500 L1Client::new(vec![network.cfg.l1_url()])?,
7501 &mut contracts,
7502 deployer_addr,
7503 deployer_addr,
7504 )
7505 .await?;
7506
7507 let mut commissions = vec![];
7508 for (i, (validator, provider)) in
7509 network_config.validator_providers().into_iter().enumerate()
7510 {
7511 let commission = fetch_commission(provider.clone(), st_addr, validator).await?;
7512 let new_commission = match i {
7513 0 => 0u16,
7514 1 => commission.to_evm() + 500u16,
7515 2 => commission.to_evm() - 100u16,
7516 _ => unreachable!(),
7517 }
7518 .try_into()?;
7519 commissions.push((validator, commission, new_commission));
7520 tracing::info!(%validator, %commission, %new_commission, "Update commission");
7521 update_commission(provider, st_addr, new_commission)
7522 .await?
7523 .get_receipt()
7524 .await?;
7525 }
7526
7527 let current_epoch = network.peers[0]
7529 .decided_leaf()
7530 .await
7531 .epoch(EPOCH_HEIGHT)
7532 .unwrap();
7533 let target_epoch = current_epoch.u64() + 3;
7534 println!("target epoch for new stake table: {target_epoch}");
7535 let mut events = network.peers[0].event_stream();
7536 wait_for_epochs(&mut events, EPOCH_HEIGHT, target_epoch).await;
7537
7538 let client: Client<ServerError, SequencerApiVersion> =
7540 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
7541 let validators = client
7542 .get::<AuthenticatedValidatorMap>(&format!("node/validators/{}", target_epoch - 1))
7543 .send()
7544 .await
7545 .expect("validators");
7546 assert!(!validators.is_empty());
7547 for (val, old_comm, _) in commissions.clone() {
7548 assert_eq!(validators.get(&val).unwrap().commission, old_comm.to_evm());
7549 }
7550
7551 let client: Client<ServerError, SequencerApiVersion> =
7553 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
7554 let validators = client
7555 .get::<AuthenticatedValidatorMap>(&format!("node/validators/{target_epoch}"))
7556 .send()
7557 .await
7558 .expect("validators");
7559 assert!(!validators.is_empty());
7560 for (val, _, new_comm) in commissions.clone() {
7561 assert_eq!(validators.get(&val).unwrap().commission, new_comm.to_evm());
7562 }
7563
7564 let last_block_with_old_commissions = EPOCH_HEIGHT * (target_epoch - 1);
7565 let block_with_new_commissions = EPOCH_HEIGHT * target_epoch;
7566 let mut new_amounts = vec![];
7567 for (val, ..) in commissions {
7568 let before = client
7569 .get::<Option<RewardAmount>>(&format!(
7570 "reward-state-v2/reward-balance/{last_block_with_old_commissions}/{val}"
7571 ))
7572 .send()
7573 .await?
7574 .unwrap();
7575 let after = client
7576 .get::<Option<RewardAmount>>(&format!(
7577 "reward-state-v2/reward-balance/{block_with_new_commissions}/{val}"
7578 ))
7579 .send()
7580 .await?
7581 .unwrap();
7582 new_amounts.push(after - before);
7583 }
7584
7585 let tolerance = U256::from(10 * EPOCH_HEIGHT).into();
7586 assert!(new_amounts[0] < tolerance);
7588
7589 assert!(new_amounts[1] + new_amounts[2] > tolerance);
7591
7592 Ok(())
7593 }
7594
7595 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7599 async fn test_integration_update_fast_finality_network_config() -> anyhow::Result<()> {
7600 const NUM_NODES: usize = 3;
7601 const EPOCH_HEIGHT: u64 = 10;
7602
7603 let versions = POS_V4;
7604 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7605
7606 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7607 let persistence: [_; NUM_NODES] = storage
7608 .iter()
7609 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7610 .collect::<Vec<_>>()
7611 .try_into()
7612 .unwrap();
7613
7614 let network_config = TestConfigBuilder::default()
7615 .epoch_height(EPOCH_HEIGHT)
7616 .build();
7617
7618 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
7619 .api_config(SqlDataSource::options(
7620 &storage[0],
7621 Options::with_port(api_port),
7622 ))
7623 .network_config(network_config.clone())
7624 .persistences(persistence.clone())
7625 .catchups(std::array::from_fn(|_| {
7626 StatePeers::<SequencerApiVersion>::from_urls(
7627 vec![format!("http://localhost:{api_port}").parse().unwrap()],
7628 Default::default(),
7629 Duration::from_secs(2),
7630 &NoMetrics,
7631 )
7632 }))
7633 .pos_hook(
7634 DelegationConfig::MultipleDelegators,
7635 StakeTableContractVersion::V2,
7636 POS_V4,
7637 )
7638 .await
7639 .unwrap()
7640 .build();
7641
7642 let network = TestNetwork::new(config, versions).await;
7643 let provider = network.cfg.anvil().unwrap();
7644 let mut contracts = network.contracts.unwrap();
7645 let st_addr = contracts.address(Contract::StakeTableProxy).unwrap();
7646
7647 upgrade_stake_table_v3(provider, &mut contracts).await?;
7648
7649 let (validator, validator_provider) = network_config
7650 .validator_providers()
7651 .into_iter()
7652 .next()
7653 .unwrap();
7654 let x25519_key = x25519::Keypair::generate().unwrap().public_key();
7655 let p2p_addr: NetAddr = "127.0.0.1:9000".parse().unwrap();
7656 update_network_config(validator_provider, st_addr, x25519_key, p2p_addr.clone())
7657 .await?
7658 .get_receipt()
7659 .await?;
7660
7661 let current_epoch = network.peers[0]
7662 .decided_leaf()
7663 .await
7664 .epoch(EPOCH_HEIGHT)
7665 .unwrap();
7666 let target_epoch = current_epoch.u64() + 3;
7667 let mut events = network.peers[0].event_stream();
7668 wait_for_epochs(&mut events, EPOCH_HEIGHT, target_epoch).await;
7669
7670 let client: Client<ServerError, SequencerApiVersion> =
7671 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
7672 let validators = client
7673 .get::<AuthenticatedValidatorMap>(&format!("node/validators/{target_epoch}"))
7674 .send()
7675 .await
7676 .expect("validators");
7677 let v = validators.get(&validator).expect("validator present");
7678 assert_eq!(v.x25519_key, Some(x25519_key));
7679 assert_eq!(v.p2p_addr, Some(p2p_addr));
7680
7681 Ok(())
7682 }
7683
7684 async fn compare_endpoints(
7685 http: &reqwest::Client,
7686 api_port: u16,
7687 axum_port: u16,
7688 path: &str,
7689 ) -> anyhow::Result<()> {
7690 let tide: serde_json::Value = http
7691 .get(format!("http://localhost:{api_port}/v1/{path}"))
7692 .send()
7693 .await?
7694 .json()
7695 .await?;
7696 let axum: serde_json::Value = http
7697 .get(format!("http://localhost:{axum_port}/v1/{path}"))
7698 .send()
7699 .await?
7700 .json()
7701 .await?;
7702 assert_eq!(tide, axum, "v1/{path}: tide and axum v1 responses differ");
7703 Ok(())
7704 }
7705
7706 async fn compare_error_endpoints(
7708 http: &reqwest::Client,
7709 api_port: u16,
7710 axum_port: u16,
7711 path: &str,
7712 expected_status: u16,
7713 ) -> anyhow::Result<()> {
7714 let tide_status = http
7715 .get(format!("http://localhost:{api_port}/v1/{path}"))
7716 .send()
7717 .await?
7718 .status()
7719 .as_u16();
7720 let axum_status = http
7721 .get(format!("http://localhost:{axum_port}/v1/{path}"))
7722 .send()
7723 .await?
7724 .status()
7725 .as_u16();
7726 assert_eq!(
7727 tide_status, expected_status,
7728 "v1/{path}: tide should return {expected_status}, got {tide_status}"
7729 );
7730 assert_eq!(
7731 axum_status, expected_status,
7732 "v1/{path}: axum should return {expected_status}, got {axum_status}"
7733 );
7734 Ok(())
7735 }
7736
7737 async fn compare_ws_endpoints(api_port: u16, axum_port: u16, path: &str) -> anyhow::Result<()> {
7740 use std::{collections::HashSet, time::Duration};
7741
7742 use futures::StreamExt as _;
7743 use tokio::time::timeout;
7744 use tokio_tungstenite::{connect_async, tungstenite::Message};
7745
7746 async fn collect_messages(port: u16, path: &str) -> anyhow::Result<Vec<serde_json::Value>> {
7747 let url = format!("ws://localhost:{port}/v1/{path}");
7748 let (mut ws, _) = connect_async(&url).await?;
7749 let mut messages = Vec::new();
7750 while messages.len() < 10 {
7751 match timeout(Duration::from_millis(500), ws.next()).await {
7752 Ok(Some(Ok(Message::Text(text)))) => {
7753 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&text) {
7754 messages.push(v);
7755 }
7756 },
7757 _ => break,
7758 }
7759 }
7760 Ok(messages)
7761 }
7762
7763 let (tide_msgs, axum_msgs) = tokio::join!(
7764 collect_messages(api_port, path),
7765 collect_messages(axum_port, path)
7766 );
7767 let tide_msgs = tide_msgs?;
7768 let axum_msgs = axum_msgs?;
7769
7770 let tide_set: HashSet<String> = tide_msgs.iter().map(|v| v.to_string()).collect();
7771 let common = axum_msgs
7772 .iter()
7773 .filter(|v| tide_set.contains(&v.to_string()))
7774 .count();
7775
7776 assert!(
7777 common >= 2,
7778 "v1/{path}: expected ≥2 messages in common between tide ({} msgs) and axum ({} msgs), \
7779 got {common}",
7780 tide_msgs.len(),
7781 axum_msgs.len(),
7782 );
7783 Ok(())
7784 }
7785
7786 #[rstest]
7787 #[case(POS_V3)]
7788 #[case(POS_V4)]
7789 #[test_log::test]
7790 fn test_reward_proof_endpoint(#[case] upgrade: Upgrade) {
7791 let test = async move {
7792 const EPOCH_HEIGHT: u64 = 10;
7793 const NUM_NODES: usize = 5;
7794
7795 let network_config = TestConfigBuilder::default()
7796 .epoch_height(EPOCH_HEIGHT)
7797 .build();
7798
7799 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7800 let axum_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7801 println!("API PORT = {api_port}");
7802 println!("AXUM PORT = {axum_port}");
7803
7804 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7805 let persistence: [_; NUM_NODES] = storage
7806 .iter()
7807 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7808 .collect::<Vec<_>>()
7809 .try_into()
7810 .unwrap();
7811
7812 let mut api_opts = Options::with_port(api_port).catchup(Default::default());
7813 api_opts.http.axum_port = Some(axum_port);
7814
7815 let config = TestNetworkConfigBuilder::with_num_nodes()
7816 .api_config(SqlDataSource::options(&storage[0], api_opts))
7817 .network_config(network_config.clone())
7818 .persistences(persistence.clone())
7819 .catchups(std::array::from_fn(|_| {
7820 StatePeers::<StaticVersion<0, 1>>::from_urls(
7821 vec![format!("http://localhost:{api_port}").parse().unwrap()],
7822 Default::default(),
7823 Duration::from_secs(2),
7824 &NoMetrics,
7825 )
7826 }))
7827 .pos_hook(
7828 DelegationConfig::MultipleDelegators,
7829 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V3,
7830 upgrade,
7831 )
7832 .await
7833 .unwrap()
7834 .build();
7835
7836 let mut network = TestNetwork::new(config, upgrade).await;
7837
7838 let mut events = network.server.event_stream();
7840 wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
7841
7842 let url = format!("http://localhost:{api_port}").parse().unwrap();
7843 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
7844
7845 let validated_state = network.server.decided_state().await.unwrap();
7846 let decided_leaf = network.server.decided_leaf().await;
7847 let height = decided_leaf.height();
7848
7849 if upgrade.base == EPOCH_VERSION {
7851 wait_until_block_height(&client, "reward-state/block-height", height).await;
7853
7854 network.stop_consensus().await;
7855
7856 for (address, _) in validated_state.reward_merkle_tree_v1.iter() {
7857 let (_, expected_proof) = validated_state
7858 .reward_merkle_tree_v1
7859 .lookup(*address)
7860 .expect_ok()
7861 .unwrap();
7862
7863 let res = client
7864 .get::<RewardAccountQueryDataV1>(&format!(
7865 "reward-state/proof/{height}/{address}"
7866 ))
7867 .send()
7868 .await
7869 .unwrap();
7870
7871 match res.proof.proof {
7872 RewardMerkleProofV1::Presence(p) => {
7873 assert_eq!(
7874 p, expected_proof,
7875 "Proof mismatch for V1 at {height}, addr={address}"
7876 );
7877 },
7878 other => panic!(
7879 "Expected Present proof for V1 at {height}, addr={address}, got \
7880 {other:?}"
7881 ),
7882 }
7883 }
7884 } else {
7885 let avail_ns = NamespaceId::from(42_u32);
7894 let avail_tx = Transaction::new(avail_ns, vec![1, 2, 3]);
7895 network
7896 .server
7897 .submit_transaction(avail_tx.clone())
7898 .await
7899 .unwrap();
7900 let (avail_block, _) = wait_for_decide_on_handle(&mut events, &avail_tx).await;
7901
7902 let avail_tx2 = Transaction::new(avail_ns, vec![4, 5, 6]);
7905 network
7906 .server
7907 .submit_transaction(avail_tx2.clone())
7908 .await
7909 .unwrap();
7910 wait_for_decide_on_handle(&mut events, &avail_tx2).await;
7911
7912 wait_until_block_height(&client, "reward-state-v2/block-height", height).await;
7913 wait_until_block_height(&client, "node/block-height", avail_block).await;
7915
7916 network.stop_consensus().await;
7917
7918 let http = reqwest::Client::new();
7919
7920 for (address, _) in validated_state.reward_merkle_tree_v2.iter() {
7921 let (_, expected_proof) = validated_state
7922 .reward_merkle_tree_v2
7923 .lookup(*address)
7924 .expect_ok()
7925 .unwrap();
7926
7927 let res = client
7928 .get::<RewardAccountQueryDataV2>(&format!(
7929 "reward-state-v2/proof/{height}/{address}"
7930 ))
7931 .send()
7932 .await
7933 .unwrap();
7934
7935 match res.proof.proof.clone() {
7936 RewardMerkleProofV2::Presence(p) => {
7937 assert_eq!(
7938 p, expected_proof,
7939 "Proof mismatch for V2 at {height}, addr={address}"
7940 );
7941 },
7942 other => panic!(
7943 "Expected Present proof for V2 at {height}, addr={address}, got \
7944 {other:?}"
7945 ),
7946 }
7947
7948 let reward_claim_input = client
7949 .get::<RewardClaimInput>(&format!(
7950 "reward-state-v2/reward-claim-input/{height}/{address}"
7951 ))
7952 .send()
7953 .await
7954 .unwrap();
7955
7956 assert_eq!(reward_claim_input, res.to_reward_claim_input()?);
7957
7958 compare_endpoints(
7961 &http,
7962 api_port,
7963 axum_port,
7964 &format!("reward-state-v2/proof/{height}/{address}"),
7965 )
7966 .await?;
7967 compare_endpoints(
7968 &http,
7969 api_port,
7970 axum_port,
7971 &format!("reward-state-v2/reward-claim-input/{height}/{address}"),
7972 )
7973 .await?;
7974 compare_endpoints(
7975 &http,
7976 api_port,
7977 axum_port,
7978 &format!("reward-state-v2/reward-balance/{height}/{address}"),
7979 )
7980 .await?;
7981 compare_endpoints(
7982 &http,
7983 api_port,
7984 axum_port,
7985 &format!("reward-state-v2/proof/latest/{address}"),
7986 )
7987 .await?;
7988 compare_endpoints(
7989 &http,
7990 api_port,
7991 axum_port,
7992 &format!("reward-state-v2/reward-balance/latest/{address}"),
7993 )
7994 .await?;
7995 }
7996
7997 compare_endpoints(
7998 &http,
7999 api_port,
8000 axum_port,
8001 &format!("reward-state-v2/reward-amounts/{height}/0/1000"),
8002 )
8003 .await?;
8004 compare_endpoints(
8005 &http,
8006 api_port,
8007 axum_port,
8008 &format!("reward-state-v2/reward-merkle-tree-v2/{height}"),
8009 )
8010 .await?;
8011
8012 compare_endpoints(
8016 &http,
8017 api_port,
8018 axum_port,
8019 &format!("availability/block/{avail_block}/namespace/{avail_ns}"),
8020 )
8021 .await?;
8022
8023 let avail_header: Header = client
8025 .get(&format!("availability/header/{avail_block}"))
8026 .send()
8027 .await
8028 .unwrap();
8029 compare_endpoints(
8030 &http,
8031 api_port,
8032 axum_port,
8033 &format!(
8034 "availability/block/hash/{}/namespace/{avail_ns}",
8035 avail_header.commit()
8036 ),
8037 )
8038 .await?;
8039 compare_endpoints(
8040 &http,
8041 api_port,
8042 axum_port,
8043 &format!(
8044 "availability/block/payload-hash/{}/namespace/{avail_ns}",
8045 avail_header.payload_commitment()
8046 ),
8047 )
8048 .await?;
8049
8050 compare_endpoints(
8052 &http,
8053 api_port,
8054 axum_port,
8055 &format!(
8056 "availability/block/{avail_block}/{}/namespace/{avail_ns}",
8057 avail_block + 1
8058 ),
8059 )
8060 .await?;
8061
8062 compare_endpoints(&http, api_port, axum_port, "availability/state-cert/1").await?;
8064 compare_endpoints(&http, api_port, axum_port, "availability/state-cert-v2/1")
8065 .await?;
8066
8067 let avail_leaf: LeafQueryData<SeqTypes> = client
8069 .get(&format!("availability/leaf/{avail_block}"))
8070 .send()
8071 .await
8072 .unwrap();
8073 let leaf_hash = avail_leaf.hash();
8074 let block_hash = avail_header.commit();
8075 let payload_hash = avail_header.payload_commitment();
8076
8077 compare_endpoints(
8079 &http,
8080 api_port,
8081 axum_port,
8082 &format!("availability/leaf/{avail_block}"),
8083 )
8084 .await?;
8085 compare_endpoints(
8086 &http,
8087 api_port,
8088 axum_port,
8089 &format!("availability/leaf/hash/{leaf_hash}"),
8090 )
8091 .await?;
8092 compare_endpoints(
8093 &http,
8094 api_port,
8095 axum_port,
8096 &format!("availability/leaf/{avail_block}/{}", avail_block + 1),
8097 )
8098 .await?;
8099
8100 compare_endpoints(
8102 &http,
8103 api_port,
8104 axum_port,
8105 &format!("availability/header/{avail_block}"),
8106 )
8107 .await?;
8108 compare_endpoints(
8109 &http,
8110 api_port,
8111 axum_port,
8112 &format!("availability/header/hash/{block_hash}"),
8113 )
8114 .await?;
8115 compare_endpoints(
8116 &http,
8117 api_port,
8118 axum_port,
8119 &format!("availability/header/payload-hash/{payload_hash}"),
8120 )
8121 .await?;
8122 compare_endpoints(
8123 &http,
8124 api_port,
8125 axum_port,
8126 &format!("availability/header/{avail_block}/{}", avail_block + 1),
8127 )
8128 .await?;
8129
8130 compare_endpoints(
8132 &http,
8133 api_port,
8134 axum_port,
8135 &format!("availability/block/{avail_block}"),
8136 )
8137 .await?;
8138 compare_endpoints(
8139 &http,
8140 api_port,
8141 axum_port,
8142 &format!("availability/block/hash/{block_hash}"),
8143 )
8144 .await?;
8145 compare_endpoints(
8146 &http,
8147 api_port,
8148 axum_port,
8149 &format!("availability/block/payload-hash/{payload_hash}"),
8150 )
8151 .await?;
8152 compare_endpoints(
8153 &http,
8154 api_port,
8155 axum_port,
8156 &format!("availability/block/{avail_block}/{}", avail_block + 1),
8157 )
8158 .await?;
8159
8160 compare_endpoints(
8162 &http,
8163 api_port,
8164 axum_port,
8165 &format!("availability/payload/{avail_block}"),
8166 )
8167 .await?;
8168 compare_endpoints(
8169 &http,
8170 api_port,
8171 axum_port,
8172 &format!("availability/payload/hash/{payload_hash}"),
8173 )
8174 .await?;
8175 compare_endpoints(
8176 &http,
8177 api_port,
8178 axum_port,
8179 &format!("availability/payload/block-hash/{block_hash}"),
8180 )
8181 .await?;
8182 compare_endpoints(
8183 &http,
8184 api_port,
8185 axum_port,
8186 &format!("availability/payload/{avail_block}/{}", avail_block + 1),
8187 )
8188 .await?;
8189
8190 compare_endpoints(
8192 &http,
8193 api_port,
8194 axum_port,
8195 &format!("availability/vid/common/{avail_block}"),
8196 )
8197 .await?;
8198 compare_endpoints(
8199 &http,
8200 api_port,
8201 axum_port,
8202 &format!("availability/vid/common/hash/{block_hash}"),
8203 )
8204 .await?;
8205 compare_endpoints(
8206 &http,
8207 api_port,
8208 axum_port,
8209 &format!("availability/vid/common/payload-hash/{payload_hash}"),
8210 )
8211 .await?;
8212 compare_endpoints(
8213 &http,
8214 api_port,
8215 axum_port,
8216 &format!("availability/vid/common/{avail_block}/{}", avail_block + 1),
8217 )
8218 .await?;
8219
8220 let tx_hash = avail_tx.commit();
8222 compare_endpoints(
8223 &http,
8224 api_port,
8225 axum_port,
8226 &format!("availability/transaction/{avail_block}/0/noproof"),
8227 )
8228 .await?;
8229 compare_endpoints(
8230 &http,
8231 api_port,
8232 axum_port,
8233 &format!("availability/transaction/hash/{tx_hash}/noproof"),
8234 )
8235 .await?;
8236 compare_endpoints(
8237 &http,
8238 api_port,
8239 axum_port,
8240 &format!("availability/transaction/{avail_block}/0/proof"),
8241 )
8242 .await?;
8243 compare_endpoints(
8244 &http,
8245 api_port,
8246 axum_port,
8247 &format!("availability/transaction/hash/{tx_hash}/proof"),
8248 )
8249 .await?;
8250 compare_endpoints(
8251 &http,
8252 api_port,
8253 axum_port,
8254 &format!("availability/transaction/{avail_block}/0"),
8255 )
8256 .await?;
8257 compare_endpoints(
8258 &http,
8259 api_port,
8260 axum_port,
8261 &format!("availability/transaction/hash/{tx_hash}"),
8262 )
8263 .await?;
8264
8265 compare_endpoints(
8267 &http,
8268 api_port,
8269 axum_port,
8270 &format!("availability/block/summary/{avail_block}"),
8271 )
8272 .await?;
8273 compare_endpoints(
8274 &http,
8275 api_port,
8276 axum_port,
8277 &format!(
8278 "availability/block/summaries/{avail_block}/{}",
8279 avail_block + 1
8280 ),
8281 )
8282 .await?;
8283
8284 compare_endpoints(&http, api_port, axum_port, "availability/limits").await?;
8286
8287 compare_endpoints(
8289 &http,
8290 api_port,
8291 axum_port,
8292 &format!("availability/cert2/{avail_block}"),
8293 )
8294 .await?;
8295
8296 let ws_start = avail_block.saturating_sub(10);
8305 compare_ws_endpoints(
8306 api_port,
8307 axum_port,
8308 &format!("availability/stream/leaves/{ws_start}"),
8309 )
8310 .await?;
8311 compare_ws_endpoints(
8312 api_port,
8313 axum_port,
8314 &format!("availability/stream/headers/{ws_start}"),
8315 )
8316 .await?;
8317 compare_ws_endpoints(
8318 api_port,
8319 axum_port,
8320 &format!("availability/stream/blocks/{ws_start}"),
8321 )
8322 .await?;
8323 compare_ws_endpoints(
8324 api_port,
8325 axum_port,
8326 &format!("availability/stream/payloads/{ws_start}"),
8327 )
8328 .await?;
8329 compare_ws_endpoints(
8330 api_port,
8331 axum_port,
8332 &format!("availability/stream/vid/common/{ws_start}"),
8333 )
8334 .await?;
8335 compare_ws_endpoints(
8336 api_port,
8337 axum_port,
8338 &format!("availability/stream/transactions/{ws_start}"),
8339 )
8340 .await?;
8341 compare_ws_endpoints(
8344 api_port,
8345 axum_port,
8346 &format!("availability/stream/transactions/{avail_block}/namespace/{avail_ns}"),
8347 )
8348 .await?;
8349 compare_ws_endpoints(
8350 api_port,
8351 axum_port,
8352 &format!("availability/stream/blocks/{avail_block}/namespace/{avail_ns}"),
8353 )
8354 .await?;
8355
8356 compare_error_endpoints(
8362 &http,
8363 api_port,
8364 axum_port,
8365 "availability/leaf/999999",
8366 404,
8367 )
8368 .await?;
8369
8370 compare_error_endpoints(
8373 &http,
8374 api_port,
8375 axum_port,
8376 &format!("availability/block/{avail_block}/{}", avail_block + 200),
8377 400,
8378 )
8379 .await?;
8380
8381 compare_error_endpoints(
8384 &http,
8385 api_port,
8386 axum_port,
8387 &format!(
8388 "availability/block/{avail_block}/{}/namespace/{avail_ns}",
8389 avail_block + 200
8390 ),
8391 400,
8392 )
8393 .await?;
8394 }
8395
8396 anyhow::Ok(())
8397 };
8398
8399 std::thread::Builder::new()
8403 .stack_size(32 * 1024 * 1024)
8404 .spawn(move || {
8405 tokio::runtime::Builder::new_multi_thread()
8406 .enable_all()
8407 .build()
8408 .unwrap()
8409 .block_on(test)
8410 .unwrap()
8411 })
8412 .unwrap()
8413 .join()
8414 .unwrap()
8415 }
8416
8417 #[test_log::test(tokio::test(flavor = "multi_thread"))]
8418 async fn test_all_validators_endpoint() -> anyhow::Result<()> {
8419 const EPOCH_HEIGHT: u64 = 20;
8420
8421 let network_config = TestConfigBuilder::default()
8422 .epoch_height(EPOCH_HEIGHT)
8423 .build();
8424
8425 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
8426
8427 const NUM_NODES: usize = 5;
8428
8429 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
8430 let persistence: [_; NUM_NODES] = storage
8431 .iter()
8432 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
8433 .collect::<Vec<_>>()
8434 .try_into()
8435 .unwrap();
8436
8437 let config = TestNetworkConfigBuilder::with_num_nodes()
8438 .api_config(SqlDataSource::options(
8439 &storage[0],
8440 Options::with_port(api_port),
8441 ))
8442 .network_config(network_config)
8443 .persistences(persistence.clone())
8444 .catchups(std::array::from_fn(|_| {
8445 StatePeers::<StaticVersion<0, 1>>::from_urls(
8446 vec![format!("http://localhost:{api_port}").parse().unwrap()],
8447 Default::default(),
8448 Duration::from_secs(2),
8449 &NoMetrics,
8450 )
8451 }))
8452 .pos_hook(
8453 DelegationConfig::MultipleDelegators,
8454 Default::default(),
8455 POS_V4,
8456 )
8457 .await
8458 .unwrap()
8459 .build();
8460
8461 let network = TestNetwork::new(config, POS_V4).await;
8462 let client: Client<ServerError, SequencerApiVersion> =
8463 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
8464
8465 let err = client
8466 .get::<Vec<RegisteredValidator<PubKey>>>("node/all-validators/1/0/1001")
8467 .header("Accept", "application/json")
8468 .send()
8469 .await
8470 .unwrap_err();
8471
8472 assert_matches!(err, ServerError { status, message} if
8473 status == StatusCode::BAD_REQUEST
8474 && message.contains("Limit cannot be greater than 1000")
8475 );
8476
8477 let mut events = network.peers[0].event_stream();
8479 wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
8480
8481 {
8483 client
8484 .get::<Vec<RegisteredValidator<PubKey>>>("node/all-validators/1/0/100")
8485 .send()
8486 .await
8487 .unwrap()
8488 .is_empty();
8489
8490 client
8491 .get::<Vec<RegisteredValidator<PubKey>>>("node/all-validators/2/0/100")
8492 .send()
8493 .await
8494 .unwrap()
8495 .is_empty();
8496 }
8497
8498 let validators = client
8500 .get::<Vec<RegisteredValidator<PubKey>>>("node/all-validators/3/0/100")
8501 .send()
8502 .await
8503 .expect("validators");
8504
8505 assert!(!validators.is_empty());
8506
8507 Ok(())
8508 }
8509
8510 #[test_log::test(tokio::test(flavor = "multi_thread"))]
8511 async fn test_reward_accounts_catchup_endpoint() -> anyhow::Result<()> {
8512 const EPOCH_HEIGHT: u64 = 10;
8513 const NUM_NODES: usize = 3;
8514
8515 let network_config = TestConfigBuilder::default()
8516 .epoch_height(EPOCH_HEIGHT)
8517 .build();
8518
8519 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
8520 println!("API PORT = {api_port}");
8521
8522 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
8523 let persistence: [_; NUM_NODES] = storage
8524 .iter()
8525 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
8526 .collect::<Vec<_>>()
8527 .try_into()
8528 .unwrap();
8529
8530 let config = TestNetworkConfigBuilder::with_num_nodes()
8531 .api_config(SqlDataSource::options(
8532 &storage[0],
8533 Options::with_port(api_port).catchup(Default::default()),
8534 ))
8535 .network_config(network_config)
8536 .persistences(persistence.clone())
8537 .catchups(std::array::from_fn(|_| {
8538 StatePeers::<StaticVersion<0, 1>>::from_urls(
8539 vec![format!("http://localhost:{api_port}").parse().unwrap()],
8540 Default::default(),
8541 Duration::from_secs(2),
8542 &NoMetrics,
8543 )
8544 }))
8545 .pos_hook(
8546 DelegationConfig::MultipleDelegators,
8547 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V3,
8548 POS_V4,
8549 )
8550 .await
8551 .unwrap()
8552 .build();
8553
8554 let mut network = TestNetwork::new(config, POS_V4).await;
8555
8556 let client: Client<ServerError, StaticVersion<0, 1>> =
8557 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
8558
8559 client.connect(None).await;
8560
8561 let mut events = network.server.event_stream();
8562 wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
8563
8564 network.stop_consensus().await;
8565 let height = network.server.decided_leaf().await.height();
8566 wait_until_block_height(&client, "reward-state-v2/block-height", height).await;
8567
8568 let err = client
8569 .get::<Vec<(RewardAccountV2, RewardAmount)>>(&format!(
8570 "reward-state-v2/reward-amounts/{height}/0/10001"
8571 ))
8572 .send()
8573 .await
8574 .unwrap_err();
8575
8576 assert_matches!(err, ServerError { status, .. } if
8577 status == StatusCode::BAD_REQUEST
8578
8579 );
8580
8581 let mut expected: Vec<_> = network
8582 .server
8583 .decided_state()
8584 .await
8585 .unwrap()
8586 .reward_merkle_tree_v2
8587 .iter()
8588 .map(|(addr, amt)| (*addr, *amt))
8589 .collect();
8590 expected.sort_by_key(|(acct, _)| std::cmp::Reverse(*acct));
8592
8593 tracing::info!("expected accounts = {expected:?}");
8594 let limit = expected.len().min(10_000) as u64;
8595 let offset = 0u64;
8596 let expected: Vec<_> = expected.into_iter().take(limit as usize).collect();
8597
8598 let res = client
8599 .get::<Vec<(RewardAccountV2, RewardAmount)>>(&format!(
8600 "reward-state-v2/reward-amounts/{height}/{offset}/{limit}"
8601 ))
8602 .send()
8603 .await
8604 .unwrap();
8605
8606 assert_eq!(res, expected);
8607
8608 Ok(())
8609 }
8610
8611 #[test_log::test(tokio::test(flavor = "multi_thread"))]
8612 async fn test_namespace_query_compat_v0_2() {
8613 test_namespace_query_compat_helper(Upgrade::trivial(FEE_VERSION)).await;
8614 }
8615
8616 #[test_log::test(tokio::test(flavor = "multi_thread"))]
8617 async fn test_namespace_query_compat_v0_3() {
8618 test_namespace_query_compat_helper(Upgrade::trivial(EPOCH_VERSION)).await;
8619 }
8620
8621 async fn test_namespace_query_compat_helper(upgrade: Upgrade) {
8622 const NUM_NODES: usize = 5;
8624
8625 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
8626 let url: Url = format!("http://localhost:{port}").parse().unwrap();
8627
8628 let test_config = TestConfigBuilder::default().build();
8629 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
8630 .api_config(Options::from(options::Http {
8631 port,
8632 max_connections: None,
8633 axum_port: None,
8634 tonic_port: None,
8635 }))
8636 .catchups(std::array::from_fn(|_| {
8637 StatePeers::<SequencerApiVersion>::from_urls(
8638 vec![url.clone()],
8639 Default::default(),
8640 Duration::from_secs(2),
8641 &NoMetrics,
8642 )
8643 }))
8644 .network_config(test_config)
8645 .build();
8646
8647 let mut network = TestNetwork::new(config, upgrade).await;
8648 let mut events = network.server.event_stream();
8649
8650 let ns = NamespaceId::from(10_000u64);
8652 let tx = Transaction::new(ns, vec![1, 2, 3]);
8653 network.server.submit_transaction(tx.clone()).await.unwrap();
8654 let block = wait_for_decide_on_handle(&mut events, &tx).await.0;
8655
8656 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
8658 client.connect(None).await;
8659
8660 let (header, common): (Header, VidCommonQueryData<SeqTypes>) = try_join!(
8661 client.get(&format!("availability/header/{block}")).send(),
8662 client
8663 .get(&format!("availability/vid/common/{block}"))
8664 .send()
8665 )
8666 .unwrap();
8667 let version = header.version();
8668
8669 for api_ver in ["/v1", ""] {
8672 tracing::info!("test namespace API version: {api_ver}");
8673
8674 let ns_proof: NamespaceProofQueryData = client
8675 .get(&format!(
8676 "{api_ver}/availability/block/{block}/namespace/{ns}"
8677 ))
8678 .send()
8679 .await
8680 .unwrap();
8681 let proof = ns_proof.proof.as_ref().unwrap();
8682 if version < EPOCH_VERSION {
8683 assert!(matches!(proof, NsProof::V0(..)));
8684 } else {
8685 assert!(matches!(proof, NsProof::V1(..)));
8686 }
8687 let (txs, ns_from_proof) = proof
8688 .verify(
8689 header.ns_table(),
8690 &header.payload_commitment(),
8691 common.common(),
8692 )
8693 .unwrap();
8694 assert_eq!(ns_from_proof, ns);
8695 assert_eq!(txs, ns_proof.transactions);
8696 assert_eq!(txs, std::slice::from_ref(&tx));
8697
8698 let ns_proofs: Vec<NamespaceProofQueryData> = client
8700 .get(&format!(
8701 "{api_ver}/availability/block/{}/{}/namespace/{ns}",
8702 block,
8703 block + 1
8704 ))
8705 .send()
8706 .await
8707 .unwrap();
8708 assert_eq!(&ns_proofs, std::slice::from_ref(&ns_proof));
8709
8710 let ns_proof: NamespaceProofQueryData = client
8712 .get(&format!(
8713 "{api_ver}/availability/block/{}/namespace/{ns}",
8714 block - 1
8715 ))
8716 .send()
8717 .await
8718 .unwrap();
8719 assert_eq!(ns_proof.proof, None);
8720 assert_eq!(ns_proof.transactions, vec![]);
8721
8722 let mut proofs = client
8724 .socket(&format!(
8725 "{api_ver}/availability/stream/blocks/0/namespace/{ns}"
8726 ))
8727 .subscribe()
8728 .await
8729 .unwrap();
8730 for i in 0.. {
8731 tracing::info!(i, "stream proof");
8732 let proof: NamespaceProofQueryData = proofs.next().await.unwrap().unwrap();
8733 if proof.proof.is_none() {
8734 tracing::info!("waiting for non-trivial proof from stream");
8735 continue;
8736 }
8737 assert_eq!(&proof.transactions, std::slice::from_ref(&tx));
8738 break;
8739 }
8740 }
8741
8742 tracing::info!("test namespace API version: v0");
8744 if version < EPOCH_VERSION {
8745 let ns_proof: ADVZNamespaceProofQueryData = client
8746 .get(&format!("v0/availability/block/{block}/namespace/{ns}"))
8747 .send()
8748 .await
8749 .unwrap();
8750 let proof = ns_proof.proof.as_ref().unwrap();
8751 let VidCommon::V0(common) = common.common() else {
8752 panic!("wrong VID common version");
8753 };
8754 let (txs, ns_from_proof) = proof
8755 .verify(header.ns_table(), &header.payload_commitment(), common)
8756 .unwrap();
8757 assert_eq!(ns_from_proof, ns);
8758 assert_eq!(txs, ns_proof.transactions);
8759 assert_eq!(&txs, std::slice::from_ref(&tx));
8760
8761 let ns_proofs: Vec<ADVZNamespaceProofQueryData> = client
8763 .get(&format!(
8764 "v0/availability/block/{}/{}/namespace/{ns}",
8765 block,
8766 block + 1
8767 ))
8768 .send()
8769 .await
8770 .unwrap();
8771 assert_eq!(&ns_proofs, std::slice::from_ref(&ns_proof));
8772 } else {
8773 client
8775 .get::<ADVZNamespaceProofQueryData>(&format!(
8776 "v0/availability/block/{block}/namespace/{ns}"
8777 ))
8778 .send()
8779 .await
8780 .unwrap_err();
8781 }
8782
8783 let ns_proof: ADVZNamespaceProofQueryData = client
8785 .get(&format!(
8786 "v0/availability/block/{}/namespace/{ns}",
8787 block - 1
8788 ))
8789 .send()
8790 .await
8791 .unwrap();
8792 assert_eq!(ns_proof.proof, None);
8793 assert_eq!(ns_proof.transactions, vec![]);
8794
8795 let mut proofs = client
8798 .socket(&format!("v0/availability/stream/blocks/0/namespace/{ns}"))
8799 .subscribe()
8800 .await
8801 .unwrap();
8802 for i in 0.. {
8803 tracing::info!(i, "stream proof");
8804 let proof: ADVZNamespaceProofQueryData = match proofs.next().await {
8805 Some(proof) => proof.unwrap(),
8806 None => {
8807 assert!(
8809 version >= EPOCH_VERSION,
8810 "legacy steam ended while still on legacy consensus"
8811 );
8812 break;
8813 },
8814 };
8815 if proof.proof.is_none() {
8816 tracing::info!("waiting for non-trivial proof from stream");
8817 continue;
8818 }
8819 assert_eq!(&proof.transactions, std::slice::from_ref(&tx));
8820 break;
8821 }
8822
8823 network.server.shut_down().await;
8824 }
8825
8826 #[test_log::test(tokio::test(flavor = "multi_thread"))]
8827 async fn test_light_client_completeness() {
8828 const NUM_NODES: usize = 1;
8832 const EPOCH_HEIGHT: u64 = 200;
8833
8834 let upgrade = Upgrade::new(LEGACY_VERSION, EPOCH_VERSION);
8835 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
8836 let url: Url = format!("http://localhost:{port}").parse().unwrap();
8837
8838 let test_config = TestConfigBuilder::default()
8839 .epoch_height(EPOCH_HEIGHT)
8840 .epoch_start_block(321)
8841 .set_upgrades(upgrade.target)
8842 .await
8843 .build();
8844
8845 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
8846 let persistence: [_; NUM_NODES] = storage
8847 .iter()
8848 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
8849 .collect::<Vec<_>>()
8850 .try_into()
8851 .unwrap();
8852
8853 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
8854 .api_config(
8855 SqlDataSource::options(&storage[0], Options::with_port(port))
8856 .light_client(Default::default()),
8857 )
8858 .persistences(persistence.clone())
8859 .catchups(std::array::from_fn(|_| {
8860 StatePeers::<SequencerApiVersion>::from_urls(
8861 vec![url.clone()],
8862 Default::default(),
8863 Duration::from_secs(2),
8864 &NoMetrics,
8865 )
8866 }))
8867 .network_config(test_config)
8868 .build();
8869
8870 let mut network = TestNetwork::new(config, upgrade).await;
8871 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
8872 client.connect(None).await;
8873
8874 let mut actual_leaves = vec![];
8877 let mut actual_blocks = vec![];
8878 let mut leaves = client
8879 .socket("availability/stream/leaves/0")
8880 .subscribe::<LeafQueryData<SeqTypes>>()
8881 .await
8882 .unwrap()
8883 .zip(
8884 client
8885 .socket("availability/stream/blocks/0")
8886 .subscribe::<BlockQueryData<SeqTypes>>()
8887 .await
8888 .unwrap(),
8889 )
8890 .map(|(leaf, block)| {
8891 let leaf = leaf.unwrap();
8892 let block = block.unwrap();
8893 actual_leaves.push(leaf.clone());
8894 actual_blocks.push(block);
8895 leaf
8896 });
8897
8898 let (upgrade_height, first_epoch) = loop {
8900 let leaf: LeafQueryData<SeqTypes> = leaves.next().await.unwrap();
8901 if leaf.header().version() < EPOCH_VERSION {
8902 tracing::info!(version = %leaf.header().version(), height = leaf.header().height(), view = ?leaf.leaf().view_number(), "waiting for epoch upgrade");
8903 continue;
8904 }
8905 break (leaf.height(), leaf.leaf().epoch(EPOCH_HEIGHT).unwrap());
8906 };
8907 tracing::info!(upgrade_height, ?first_epoch, "epochs enabled");
8908
8909 let mut epoch_heights = [0; 2];
8912 for (i, epoch_height) in epoch_heights.iter_mut().enumerate() {
8913 let desired_epoch = first_epoch + (i as u64) + 1;
8914 *epoch_height = loop {
8915 let leaf = leaves.next().await.unwrap();
8916 let epoch = leaf.leaf().epoch(EPOCH_HEIGHT).unwrap();
8917 if epoch > desired_epoch {
8918 tracing::info!(
8919 height = leaf.height(),
8920 ?desired_epoch,
8921 ?epoch,
8922 "changed epoch"
8923 );
8924 break leaf.height();
8925 }
8926 tracing::info!(
8927 ?desired_epoch,
8928 height = leaf.header().height(),
8929 view = ?leaf.leaf().view_number(),
8930 "waiting for epoch change"
8931 );
8932 };
8933 }
8934
8935 let max_block = epoch_heights[1] + 1;
8937 loop {
8938 let leaf = leaves.next().await.unwrap();
8939 if leaf.height() > max_block {
8940 break;
8941 }
8942 tracing::info!(max_block, height = leaf.height(), "waiting for block");
8943 }
8944
8945 network.stop_consensus().await;
8948
8949 let heights =
8952 (0..=1)
8954 .chain(upgrade_height-1..=upgrade_height+1)
8956 .chain(epoch_heights[0]-1..=epoch_heights[0] + 1)
8958 .chain(epoch_heights[1]-1..=max_block);
8960
8961 let quorum = EpochChangeQuorum::new(EPOCH_HEIGHT);
8962 for i in heights {
8963 let leaf = &actual_leaves[i as usize];
8964 let block = &actual_blocks[i as usize];
8965 tracing::info!(i, ?leaf, ?block, "check leaf");
8966
8967 let client = &client;
8969 let proofs = try_join_all(
8970 [
8971 format!("light-client/leaf/{i}"),
8972 format!("light-client/leaf/hash/{}", leaf.hash()),
8973 format!("light-client/leaf/block-hash/{}", leaf.block_hash()),
8974 ]
8975 .into_iter()
8976 .map(|path| async move {
8977 tracing::info!(i, path, "fetch leaf proof");
8978 let proof = client.get::<LeafProof>(&path).send().await?;
8979 Ok::<_, anyhow::Error>((path, proof))
8980 }),
8981 )
8982 .await
8983 .unwrap();
8984
8985 for (path, proof) in proofs {
8987 tracing::info!(i, path, ?proof, "check leaf proof");
8988 assert_eq!(
8989 proof.verify(LeafProofHint::Quorum(&quorum)).await.unwrap(),
8990 *leaf
8991 );
8992 }
8993
8994 let root_height = i + 1;
8996 let root = actual_leaves[root_height as usize].header();
8997 let proofs = try_join_all(
8998 [
8999 format!("light-client/header/{root_height}/{i}"),
9000 format!(
9001 "light-client/header/{root_height}/hash/{}",
9002 leaf.block_hash()
9003 ),
9004 ]
9005 .into_iter()
9006 .map(|path| async move {
9007 tracing::info!(i, path, "get header proof");
9008 let proof = client.get::<HeaderProof>(&path).send().await?;
9009 Ok::<_, anyhow::Error>((path, proof))
9010 }),
9011 )
9012 .await
9013 .unwrap();
9014 for (path, proof) in proofs {
9015 tracing::info!(i, path, ?proof, "check header proof");
9016 assert_eq!(
9017 proof.verify_ref(root.block_merkle_tree_root()).unwrap(),
9018 leaf.header()
9019 );
9020 }
9021
9022 let proof = client
9024 .get::<PayloadProof>(&format!("light-client/payload/{i}"))
9025 .send()
9026 .await
9027 .unwrap();
9028 assert_eq!(proof.verify(leaf.header()).unwrap(), *block.payload());
9029 }
9030
9031 let events: Vec<StakeTableEvent> = client
9033 .get(&format!("light-client/stake-table/{}", first_epoch + 2))
9034 .send()
9035 .await
9036 .unwrap();
9037 let mut state_from_events = StakeTableState::default();
9038 for event in events {
9039 state_from_events.apply_event(event).unwrap().unwrap();
9040 }
9041
9042 assert_eq!(
9043 state_from_events.into_validators(),
9044 network
9045 .server
9046 .consensus_handle()
9047 .storage()
9048 .await
9049 .load_all_validators(first_epoch + 2, 0, 1_000_000)
9050 .await
9051 .unwrap()
9052 .into_iter()
9053 .map(|v| (v.account, v))
9054 .collect::<RegisteredValidatorMap>()
9055 );
9056
9057 let err = client
9059 .get::<Vec<StakeTableEvent>>(&format!("light-client/stake-table/{}", first_epoch + 1))
9060 .send()
9061 .await
9062 .unwrap_err();
9063 assert_eq!(err.status(), StatusCode::BAD_REQUEST);
9064 }
9065
9066 #[test_log::test(tokio::test(flavor = "multi_thread"))]
9068 async fn test_fetch_leaf_returns_exact_height() -> anyhow::Result<()> {
9069 const EPOCH_HEIGHT: u64 = 10;
9070 const NUM_NODES: usize = 5;
9071 const TARGET_HEIGHT: u64 = EPOCH_HEIGHT * 3 + 2;
9072
9073 let network_config = TestConfigBuilder::default()
9074 .epoch_height(EPOCH_HEIGHT)
9075 .build();
9076
9077 let port = reserve_tcp_port().expect("No ports free for query service");
9078
9079 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
9080 let persistence: [_; NUM_NODES] = storage
9081 .iter()
9082 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
9083 .collect::<Vec<_>>()
9084 .try_into()
9085 .unwrap();
9086
9087 let catchup_peers = std::array::from_fn(|_| {
9088 StatePeers::<StaticVersion<0, 1>>::from_urls(
9089 vec![format!("http://localhost:{port}").parse().unwrap()],
9090 Default::default(),
9091 Duration::from_secs(2),
9092 &NoMetrics,
9093 )
9094 });
9095
9096 let config = TestNetworkConfigBuilder::with_num_nodes()
9097 .api_config(SqlDataSource::options(
9098 &storage[0],
9099 Options::with_port(port),
9100 ))
9101 .network_config(network_config)
9102 .persistences(persistence)
9103 .catchups(catchup_peers)
9104 .pos_hook(
9105 DelegationConfig::MultipleDelegators,
9106 Default::default(),
9107 POS_V4,
9108 )
9109 .await?
9110 .build();
9111
9112 let network = TestNetwork::new(config, POS_V4).await;
9113
9114 let height_client: Client<ServerError, StaticVersion<0, 1>> =
9116 Client::new(format!("http://localhost:{port}").parse().unwrap());
9117 wait_until_block_height(&height_client, "node/block-height", TARGET_HEIGHT + 5).await;
9118
9119 let coordinator = network.server.node_state().coordinator;
9121 let epoch = EpochNumber::new(epoch_from_block_number(TARGET_HEIGHT, EPOCH_HEIGHT));
9122 let snapshot = coordinator.membership().snapshot(epoch).expect("snapshot");
9123 let stake_table = HSStakeTable::from_iter(snapshot.stake_table());
9124 let success_threshold = snapshot.success_threshold();
9125
9126 let catchup = StatePeers::<StaticVersion<0, 1>>::from_urls(
9128 vec![format!("http://localhost:{port}").parse().unwrap()],
9129 Default::default(),
9130 Duration::from_secs(5),
9131 &NoMetrics,
9132 );
9133
9134 let leaf = catchup
9135 .fetch_leaf(TARGET_HEIGHT, stake_table, success_threshold)
9136 .await?;
9137
9138 assert_eq!(
9139 leaf.height(),
9140 TARGET_HEIGHT,
9141 "fetch_leaf must return the leaf at exactly the requested height"
9142 );
9143
9144 Ok(())
9145 }
9146}