1use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};
2
3use alloy::primitives::U256;
4use anyhow::{Context, bail, ensure};
5use async_lock::RwLock;
6use async_once_cell::Lazy;
7use async_trait::async_trait;
8use committable::Commitment;
9use data_source::{
10 CatchupDataSource, RequestResponseDataSource, StakeTableDataSource, StakeTableWithEpochNumber,
11 StateCertDataSource, StateCertFetchingDataSource, SubmitDataSource,
12};
13use derivative::Derivative;
14use espresso_types::{
15 AccountQueryData, AuthenticatedValidatorMap, BlockMerkleTree, FeeAccount, FeeMerkleTree, Leaf2,
16 NodeState, PubKey, Transaction,
17 config::PublicNetworkConfig,
18 retain_accounts,
19 traits::EventsPersistenceRead,
20 v0::traits::{SequencerPersistence, StateCatchup},
21 v0_3::{
22 ChainConfig, RegisteredValidator, RewardAccountQueryDataV1, RewardAccountV1, RewardAmount,
23 RewardMerkleTreeV1, StakeTableEvent,
24 },
25 v0_4::{
26 PermittedRewardMerkleTreeV2, RewardAccountQueryDataV2, RewardAccountV2, RewardMerkleTreeV2,
27 },
28};
29use futures::{
30 future::{BoxFuture, Future, FutureExt},
31 stream::BoxStream,
32};
33use hotshot_contract_adapter::sol_types::EspToken;
34use hotshot_events_service::events_source::{
35 EventFilterSet, EventsSource, EventsStreamer, StartupInfo,
36};
37use hotshot_query_service::{availability::VidCommonQueryData, data_source::ExtensibleDataSource};
38use hotshot_types::{
39 PeerConfig,
40 data::{EpochNumber, VidCommitment, VidCommon, VidShare, ViewNumber},
41 event::{Event, LegacyEvent},
42 light_client::LCV3StateSignatureRequestBody,
43 network::NetworkConfig,
44 simple_certificate::LightClientStateUpdateCertificateV2,
45 traits::{election::Membership, network::ConnectedNetwork},
46 utils::epoch_from_block_number,
47 vid::avidm::{AvidMScheme, init_avidm_param},
48 vote::HasViewNumber,
49};
50use itertools::Itertools;
51use jf_merkle_tree_compat::MerkleTreeScheme;
52use moka::future::Cache;
53use rand::Rng;
54use request_response::RequestType;
55use serde::{Deserialize, Serialize};
56use tokio::time::timeout;
57use vbs::version::Version;
58
59use self::data_source::{HotShotConfigDataSource, NodeStateDataSource, StateSignatureDataSource};
60use crate::{
61 SeqTypes, SequencerApiVersion, SequencerContext,
62 api::data_source::TokenDataSource,
63 catchup::{
64 CatchupStorage, add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
65 add_v2_reward_accounts_to_state,
66 },
67 context::Consensus,
68 request_response::{
69 data_source::{retain_v1_reward_accounts, retain_v2_reward_accounts},
70 request::{Request, Response},
71 },
72 state_cert::{StateCertFetchError, validate_state_cert},
73 state_signature::StateSigner,
74};
75
76pub mod data_source;
77pub mod endpoints;
78pub mod fs;
79pub mod light_client;
80pub mod options;
81pub mod sql;
82pub mod unlock_schedule;
83mod update;
84
85pub use options::Options;
86
87pub type BlocksFrontier = <BlockMerkleTree as MerkleTreeScheme>::MembershipProof;
88
89type BoxLazy<T> = Pin<Arc<Lazy<T, BoxFuture<'static, T>>>>;
90
91#[derive(Derivative)]
92#[derivative(Clone(bound = ""), Debug(bound = ""))]
93struct ApiState<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> {
94 #[derivative(Debug = "ignore")]
99 sequencer_context: BoxLazy<SequencerContext<N, P>>,
100
101 token_supply: Cache<(), U256>,
103}
104
105impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> ApiState<N, P> {
106 fn new(context_init: impl Future<Output = SequencerContext<N, P>> + Send + 'static) -> Self {
107 Self {
108 sequencer_context: Arc::pin(Lazy::from_future(context_init.boxed())),
109 token_supply: Cache::builder()
110 .max_capacity(1)
111 .time_to_live(Duration::from_secs(3600))
112 .build(),
113 }
114 }
115
116 async fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
117 self.sequencer_context
118 .as_ref()
119 .get()
120 .await
121 .get_ref()
122 .state_signer()
123 }
124
125 async fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
126 self.sequencer_context
127 .as_ref()
128 .get()
129 .await
130 .get_ref()
131 .event_streamer()
132 }
133
134 async fn consensus(&self) -> Arc<RwLock<Consensus<N, P>>> {
135 self.sequencer_context
136 .as_ref()
137 .get()
138 .await
139 .get_ref()
140 .consensus()
141 }
142
143 async fn network_config(&self) -> NetworkConfig<SeqTypes> {
144 self.sequencer_context
145 .as_ref()
146 .get()
147 .await
148 .get_ref()
149 .network_config()
150 }
151}
152
153type StorageState<N, P, D> = ExtensibleDataSource<D, ApiState<N, P>>;
154
155#[async_trait]
156impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> EventsSource<SeqTypes>
157 for ApiState<N, P>
158{
159 type EventStream = BoxStream<'static, Arc<Event<SeqTypes>>>;
160 type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<SeqTypes>>>;
161
162 async fn get_event_stream(
163 &self,
164 _filter: Option<EventFilterSet<SeqTypes>>,
165 ) -> Self::EventStream {
166 self.event_streamer()
167 .await
168 .read()
169 .await
170 .get_event_stream(None)
171 .await
172 }
173
174 async fn get_legacy_event_stream(
175 &self,
176 _filter: Option<EventFilterSet<SeqTypes>>,
177 ) -> Self::LegacyEventStream {
178 self.event_streamer()
179 .await
180 .read()
181 .await
182 .get_legacy_event_stream(None)
183 .await
184 }
185
186 async fn get_startup_info(&self) -> StartupInfo<SeqTypes> {
187 self.event_streamer()
188 .await
189 .read()
190 .await
191 .get_startup_info()
192 .await
193 }
194}
195
196impl<N: ConnectedNetwork<PubKey>, D: Send + Sync, P: SequencerPersistence> TokenDataSource<SeqTypes>
197 for StorageState<N, P, D>
198{
199 async fn get_initial_supply_l1(&self) -> anyhow::Result<U256> {
200 self.as_ref().get_initial_supply_l1().await
201 }
202
203 async fn get_total_supply_l1(&self) -> anyhow::Result<U256> {
204 self.as_ref().get_total_supply_l1().await
205 }
206
207 async fn get_decided_header(&self) -> espresso_types::Header {
208 self.as_ref().get_decided_header().await
209 }
210}
211
212impl<N: ConnectedNetwork<PubKey>, D: Send + Sync, P: SequencerPersistence> SubmitDataSource<N, P>
213 for StorageState<N, P, D>
214{
215 async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
216 self.as_ref().submit(tx).await
217 }
218}
219
220impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence> StakeTableDataSource<SeqTypes>
221 for StorageState<N, P, D>
222{
223 async fn get_stake_table(
225 &self,
226 epoch: Option<EpochNumber>,
227 ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
228 self.as_ref().get_stake_table(epoch).await
229 }
230
231 async fn get_stake_table_current(&self) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
233 self.as_ref().get_stake_table_current().await
234 }
235
236 async fn get_da_stake_table(
238 &self,
239 epoch: Option<EpochNumber>,
240 ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
241 self.as_ref().get_da_stake_table(epoch).await
242 }
243
244 async fn get_da_stake_table_current(
246 &self,
247 ) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
248 self.as_ref().get_da_stake_table_current().await
249 }
250
251 async fn get_validators(
253 &self,
254 epoch: EpochNumber,
255 ) -> anyhow::Result<AuthenticatedValidatorMap> {
256 self.as_ref().get_validators(epoch).await
257 }
258
259 async fn get_block_reward(
260 &self,
261 epoch: Option<EpochNumber>,
262 ) -> anyhow::Result<Option<RewardAmount>> {
263 self.as_ref().get_block_reward(epoch).await
264 }
265 async fn current_proposal_participation(&self) -> HashMap<PubKey, f64> {
267 self.as_ref().current_proposal_participation().await
268 }
269 async fn proposal_participation(&self, epoch: EpochNumber) -> HashMap<PubKey, f64> {
271 self.as_ref().proposal_participation(epoch).await
272 }
273 async fn current_vote_participation(&self) -> HashMap<PubKey, f64> {
275 self.as_ref().current_vote_participation().await
276 }
277 async fn vote_participation(&self, epoch: EpochNumber) -> HashMap<PubKey, f64> {
279 self.as_ref().vote_participation(epoch).await
280 }
281
282 async fn get_all_validators(
283 &self,
284 epoch: EpochNumber,
285 offset: u64,
286 limit: u64,
287 ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>> {
288 self.as_ref().get_all_validators(epoch, offset, limit).await
289 }
290
291 async fn stake_table_events(
292 &self,
293 from_l1_block: u64,
294 to_l1_block: u64,
295 ) -> anyhow::Result<Vec<StakeTableEvent>> {
296 self.as_ref()
297 .stake_table_events(from_l1_block, to_l1_block)
298 .await
299 }
300}
301
302impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> TokenDataSource<SeqTypes>
303 for ApiState<N, P>
304{
305 async fn get_initial_supply_l1(&self) -> anyhow::Result<U256> {
306 let node_state = self.sequencer_context.as_ref().get().await.node_state();
307 let fetcher = node_state
308 .coordinator
309 .membership()
310 .read()
311 .await
312 .fetcher()
313 .clone();
314 let cached = *fetcher.initial_supply.read().await;
315 match cached {
316 Some(supply) => Ok(supply),
317 None => Ok(fetcher.fetch_and_update_initial_supply().await?),
318 }
319 }
320
321 async fn get_total_supply_l1(&self) -> anyhow::Result<U256> {
322 match self.token_supply.get(&()).await {
323 Some(supply) => Ok(supply),
324 None => {
325 let node_state = self.sequencer_context.as_ref().get().await.node_state();
326 let token_contract_address = node_state.token_contract_address().await?;
327
328 let provider = node_state.l1_client.provider;
329
330 let token = EspToken::new(token_contract_address, provider.clone());
331
332 let supply = token
333 .totalSupply()
334 .call()
335 .await
336 .context("Failed to retrieve totalSupply from the contract")?;
337
338 self.token_supply.insert((), supply).await;
339
340 Ok(supply)
341 },
342 }
343 }
344
345 async fn get_decided_header(&self) -> espresso_types::Header {
346 self.consensus()
347 .await
348 .read()
349 .await
350 .decided_leaf()
351 .await
352 .block_header()
353 .clone()
354 }
355}
356
357impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> StakeTableDataSource<SeqTypes>
358 for ApiState<N, P>
359{
360 async fn get_stake_table(
362 &self,
363 epoch: Option<EpochNumber>,
364 ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
365 let highest_epoch = self
366 .consensus()
367 .await
368 .read()
369 .await
370 .cur_epoch()
371 .await
372 .map(|e| e + 1);
373 if epoch > highest_epoch {
374 return Err(anyhow::anyhow!(
375 "requested stake table for epoch {epoch:?} is beyond the current epoch + 1 \
376 {highest_epoch:?}"
377 ));
378 }
379 let mem = self
380 .consensus()
381 .await
382 .read()
383 .await
384 .membership_coordinator
385 .stake_table_for_epoch(epoch)
386 .await?;
387
388 Ok(mem.stake_table().await.0)
389 }
390
391 async fn get_stake_table_current(&self) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
393 let epoch = self.consensus().await.read().await.cur_epoch().await;
394
395 Ok(StakeTableWithEpochNumber {
396 epoch,
397 stake_table: self.get_stake_table(epoch).await?,
398 })
399 }
400
401 async fn get_da_stake_table(
403 &self,
404 epoch: Option<EpochNumber>,
405 ) -> anyhow::Result<Vec<PeerConfig<SeqTypes>>> {
406 Ok(self
407 .consensus()
408 .await
409 .read()
410 .await
411 .membership_coordinator
412 .membership()
413 .read()
414 .await
415 .da_stake_table(epoch)
416 .0)
417 }
418
419 async fn get_da_stake_table_current(
421 &self,
422 ) -> anyhow::Result<StakeTableWithEpochNumber<SeqTypes>> {
423 let epoch = self.consensus().await.read().await.cur_epoch().await;
424
425 Ok(StakeTableWithEpochNumber {
426 epoch,
427 stake_table: self.get_da_stake_table(epoch).await?,
428 })
429 }
430
431 async fn get_block_reward(
432 &self,
433 epoch: Option<EpochNumber>,
434 ) -> anyhow::Result<Option<RewardAmount>> {
435 let coordinator = self
436 .consensus()
437 .await
438 .read()
439 .await
440 .membership_coordinator
441 .clone();
442
443 let membership = coordinator.membership().read().await;
444 let block_reward = match epoch {
445 None => membership.fixed_block_reward(),
446 Some(e) => membership.epoch_block_reward(e),
447 };
448
449 Ok(block_reward)
450 }
451
452 async fn get_validators(
454 &self,
455 epoch: EpochNumber,
456 ) -> anyhow::Result<AuthenticatedValidatorMap> {
457 let mem = self
458 .consensus()
459 .await
460 .read()
461 .await
462 .membership_coordinator
463 .membership_for_epoch(Some(epoch))
464 .await
465 .context("membership not found")?;
466
467 let membership = mem.coordinator.membership().read().await;
468 membership.active_validators(&epoch)
469 }
470
471 async fn current_proposal_participation(&self) -> HashMap<PubKey, f64> {
473 self.consensus()
474 .await
475 .read()
476 .await
477 .consensus()
478 .read()
479 .await
480 .current_proposal_participation()
481 }
482
483 async fn proposal_participation(&self, epoch: EpochNumber) -> HashMap<PubKey, f64> {
485 self.consensus()
486 .await
487 .read()
488 .await
489 .consensus()
490 .read()
491 .await
492 .proposal_participation(epoch)
493 }
494
495 async fn current_vote_participation(&self) -> HashMap<PubKey, f64> {
497 self.consensus()
498 .await
499 .read()
500 .await
501 .consensus()
502 .read()
503 .await
504 .current_vote_participation()
505 }
506
507 async fn vote_participation(&self, epoch: EpochNumber) -> HashMap<PubKey, f64> {
509 self.consensus()
510 .await
511 .read()
512 .await
513 .consensus()
514 .read()
515 .await
516 .vote_participation(Some(epoch))
517 }
518
519 async fn get_all_validators(
520 &self,
521 epoch: EpochNumber,
522 offset: u64,
523 limit: u64,
524 ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>> {
525 let handle = self.consensus().await;
526 let handle_read = handle.read().await;
527 let storage = handle_read.storage();
528 storage.load_all_validators(epoch, offset, limit).await
529 }
530
531 async fn stake_table_events(
532 &self,
533 from_l1_block: u64,
534 to_l1_block: u64,
535 ) -> anyhow::Result<Vec<StakeTableEvent>> {
536 let handle = self.consensus().await;
537 let handle_read = handle.read().await;
538 let storage = handle_read.storage();
539 let (status, events) = storage.load_events(from_l1_block, to_l1_block).await?;
540 ensure!(
541 status == Some(EventsPersistenceRead::Complete),
542 "some events in range [{from_l1_block}, {to_l1_block}] are not available ({status:?})"
543 );
544 Ok(events.into_iter().map(|(_, event)| event).collect())
545 }
546}
547
548impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence>
549 RequestResponseDataSource<SeqTypes> for StorageState<N, P, D>
550{
551 async fn request_vid_shares(
552 &self,
553 block_number: u64,
554 vid_common_data: VidCommonQueryData<SeqTypes>,
555 timeout_duration: Duration,
556 ) -> BoxFuture<'static, anyhow::Result<Vec<VidShare>>> {
557 self.as_ref()
558 .request_vid_shares(block_number, vid_common_data, timeout_duration)
559 .await
560 }
561}
562
563#[async_trait]
564impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence>
565 StateCertFetchingDataSource<SeqTypes> for StorageState<N, P, D>
566{
567 async fn request_state_cert(
568 &self,
569 epoch: u64,
570 timeout: Duration,
571 ) -> Result<LightClientStateUpdateCertificateV2<SeqTypes>, StateCertFetchError> {
572 self.as_ref().request_state_cert(epoch, timeout).await
573 }
574}
575
576impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> RequestResponseDataSource<SeqTypes>
577 for ApiState<N, P>
578{
579 async fn request_vid_shares(
580 &self,
581 block_number: u64,
582 vid_common_data: VidCommonQueryData<SeqTypes>,
583 duration: Duration,
584 ) -> BoxFuture<'static, anyhow::Result<Vec<VidShare>>> {
585 let request_response_protocol = self
587 .sequencer_context
588 .as_ref()
589 .get()
590 .await
591 .request_response_protocol
592 .clone();
593
594 async move {
595 let total_weight = match vid_common_data.common() {
597 VidCommon::V0(_) => {
598 return Err(anyhow::anyhow!(
600 "V0 total weight calculation not supported yet"
601 ));
602 },
603 VidCommon::V1(v1) => v1.total_weights,
604 VidCommon::V2(v2) => v2.param.total_weights,
605 };
606
607 let avidm_param = init_avidm_param(total_weight)
609 .with_context(|| "failed to initialize avidm param")?;
610
611 let VidCommitment::V1(local_payload_hash) = vid_common_data.payload_hash() else {
613 bail!("V0 share verification not supported yet");
614 };
615
616 let request_id = rand::thread_rng().r#gen();
618
619 let received_shares = Arc::new(parking_lot::Mutex::new(Vec::new()));
621 let received_shares_clone = received_shares.clone();
622 let request_result: anyhow::Result<_, _> = timeout(
623 duration,
624 request_response_protocol.request_indefinitely::<_, _, _>(
625 Request::VidShare(block_number, request_id),
626 RequestType::Broadcast,
627 move |_request, response| {
628 let avidm_param = avidm_param.clone();
629 let received_shares = received_shares_clone.clone();
630 async move {
631 let Response::VidShare(VidShare::V1(received_share)) = response else {
633 bail!("V0 share verification not supported yet");
634 };
635
636 let Ok(Ok(_)) = AvidMScheme::verify_share(
638 &avidm_param,
639 &local_payload_hash,
640 &received_share,
641 ) else {
642 bail!("share verification failed");
643 };
644
645 received_shares.lock().push(received_share);
647
648 bail!("waiting for more shares");
649
650 #[allow(unreachable_code)]
651 Ok(())
652 }
653 },
654 ),
655 )
656 .await;
657
658 match request_result {
660 Err(_) => {
661 Ok(received_shares
663 .lock()
664 .clone()
665 .into_iter()
666 .map(VidShare::V1)
667 .collect())
668 },
669
670 Ok(Err(e)) => Err(e).with_context(|| "failed to request vid shares"),
672
673 Ok(Ok(_)) => bail!("this should not be possible"),
675 }
676 }
677 .boxed()
678 }
679}
680
681#[async_trait]
682impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> StateCertFetchingDataSource<SeqTypes>
683 for ApiState<N, P>
684{
685 async fn request_state_cert(
686 &self,
687 epoch: u64,
688 timeout: Duration,
689 ) -> Result<LightClientStateUpdateCertificateV2<SeqTypes>, StateCertFetchError> {
690 tracing::info!("fetching state certificate for epoch={epoch}");
691 let consensus = self.consensus().await;
692 let consensus_read = consensus.read().await;
693
694 let current_epoch = consensus_read.cur_epoch().await;
695
696 let highest_epoch = current_epoch.map(|e| e.u64() + 1);
699
700 if Some(epoch) > highest_epoch {
701 return Err(StateCertFetchError::Other(anyhow::anyhow!(
702 "requested state certificate for epoch {epoch} is beyond the highest possible \
703 epoch {highest_epoch:?}"
704 )));
705 }
706
707 let coordinator = consensus_read.membership_coordinator.clone();
709 if let Err(err) = coordinator
710 .stake_table_for_epoch(Some(EpochNumber::new(epoch)))
711 .await
712 {
713 tracing::warn!(
714 "Failed to get membership for epoch {epoch}: {err:#}. Waiting for catchup"
715 );
716
717 coordinator
718 .wait_for_catchup(EpochNumber::new(epoch))
719 .await
720 .map_err(|e| {
721 StateCertFetchError::Other(
722 anyhow::Error::new(e)
723 .context(format!("failed to catch up for stake table epoch={epoch}")),
724 )
725 })?;
726 }
727
728 let membership = coordinator
729 .stake_table_for_epoch(Some(EpochNumber::new(epoch)))
730 .await
731 .map_err(|e| {
732 StateCertFetchError::Other(
733 anyhow::Error::new(e)
734 .context(format!("failed to get stake table for epoch={epoch}")),
735 )
736 })?;
737
738 let stake_table = membership.stake_table().await;
739
740 drop(consensus_read);
741 drop(consensus);
742
743 let state_catchup = self
744 .sequencer_context
745 .as_ref()
746 .get()
747 .await
748 .node_state()
749 .state_catchup
750 .clone();
751
752 let result = tokio::time::timeout(timeout, state_catchup.fetch_state_cert(epoch)).await;
753
754 match result {
755 Err(_) => Err(StateCertFetchError::FetchError(anyhow::anyhow!(
756 "timeout while fetching state cert for epoch {epoch}"
757 ))),
758 Ok(Ok(cert)) => {
759 validate_state_cert(&cert, &stake_table).map_err(|e| {
761 StateCertFetchError::ValidationError(e.context(format!(
762 "state certificate validation failed for epoch={epoch}"
763 )))
764 })?;
765
766 tracing::info!("fetched and validated state certificate for epoch {epoch}");
767 Ok(cert)
768 },
769 Ok(Err(e)) => Err(StateCertFetchError::FetchError(
770 e.context(format!("failed to fetch state cert for epoch {epoch}")),
771 )),
772 }
773 }
774}
775
776#[async_trait]
778impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence> StateCertDataSource
779 for StorageState<N, P, D>
780{
781 async fn get_state_cert_by_epoch(
782 &self,
783 epoch: u64,
784 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
785 self.as_ref().get_state_cert_by_epoch(epoch).await
786 }
787
788 async fn insert_state_cert(
789 &self,
790 epoch: u64,
791 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
792 ) -> anyhow::Result<()> {
793 self.as_ref().insert_state_cert(epoch, cert).await
794 }
795}
796
797#[async_trait]
798impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> StateCertDataSource for ApiState<N, P> {
799 async fn get_state_cert_by_epoch(
800 &self,
801 epoch: u64,
802 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
803 let consensus = self.consensus().await;
804 let consensus_lock = consensus.read().await;
805 let persistence = consensus_lock.storage();
806
807 persistence.get_state_cert_by_epoch(epoch).await
808 }
809
810 async fn insert_state_cert(
811 &self,
812 epoch: u64,
813 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
814 ) -> anyhow::Result<()> {
815 let consensus = self.consensus().await;
816 let consensus_lock = consensus.read().await;
817 let persistence = consensus_lock.storage();
818
819 persistence.insert_state_cert(epoch, cert).await
820 }
821}
822
823impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> SubmitDataSource<N, P>
824 for ApiState<N, P>
825{
826 async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
827 let handle = self.consensus().await;
828
829 let consensus_read_lock = handle.read().await;
830
831 let cf = consensus_read_lock
835 .decided_state()
836 .await
837 .chain_config
838 .resolve();
839
840 let cf = match cf {
844 Some(cf) => cf,
845 None => self.node_state().await.chain_config,
846 };
847
848 let max_block_size: u64 = cf.max_block_size.into();
849 let txn_size = tx.payload().len() as u64;
850
851 if txn_size > max_block_size {
853 bail!("transaction size ({txn_size}) is greater than max_block_size ({max_block_size})")
854 }
855
856 consensus_read_lock.submit_transaction(tx).await?;
857 Ok(())
858 }
859}
860
861impl<N, P, D> NodeStateDataSource for StorageState<N, P, D>
862where
863 N: ConnectedNetwork<PubKey>,
864 P: SequencerPersistence,
865 D: Sync,
866{
867 async fn node_state(&self) -> NodeState {
868 self.as_ref().node_state().await
869 }
870}
871
872impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, D: CatchupStorage + Send + Sync>
873 data_source::DatabaseMetadataSource for StorageState<N, P, D>
874where
875 N: ConnectedNetwork<PubKey>,
876 P: SequencerPersistence,
877 D: data_source::DatabaseMetadataSource + Send + Sync,
878{
879 async fn get_table_sizes(&self) -> anyhow::Result<Vec<data_source::TableSize>> {
880 self.inner().get_table_sizes().await
881 }
882}
883
884impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, D: CatchupStorage + Send + Sync>
885 CatchupDataSource for StorageState<N, P, D>
886{
887 #[tracing::instrument(skip(self, instance))]
888 async fn get_accounts(
889 &self,
890 instance: &NodeState,
891 height: u64,
892 view: ViewNumber,
893 accounts: &[FeeAccount],
894 ) -> anyhow::Result<FeeMerkleTree> {
895 match self
897 .as_ref()
898 .get_accounts(instance, height, view, accounts)
899 .await
900 {
901 Ok(accounts) => return Ok(accounts),
902 Err(err) => {
903 tracing::info!("accounts not in memory, trying storage: {err:#}");
904 },
905 }
906
907 let (tree, leaf) = self
909 .inner()
910 .get_accounts(instance, height, view, accounts)
911 .await
912 .context("accounts not in memory, and could not fetch from storage")?;
913 let consensus = self
917 .as_ref()
918 .consensus()
919 .await
920 .read()
921 .await
922 .consensus()
923 .clone();
924 if let Err(err) =
925 add_fee_accounts_to_state::<N, P>(&consensus, &view, accounts, &tree, leaf).await
926 {
927 tracing::warn!(?view, "cannot update fetched account state: {err:#}");
928 }
929 tracing::info!(?view, "updated with fetched account state");
930
931 Ok(tree)
932 }
933
934 #[tracing::instrument(skip(self, instance))]
935 async fn get_frontier(
936 &self,
937 instance: &NodeState,
938 height: u64,
939 view: ViewNumber,
940 ) -> anyhow::Result<BlocksFrontier> {
941 match self.as_ref().get_frontier(instance, height, view).await {
943 Ok(frontier) => return Ok(frontier),
944 Err(err) => {
945 tracing::info!("frontier is not in memory, trying storage: {err:#}");
946 },
947 }
948
949 self.inner().get_frontier(instance, height, view).await
951 }
952
953 async fn get_chain_config(
954 &self,
955 commitment: Commitment<ChainConfig>,
956 ) -> anyhow::Result<ChainConfig> {
957 match self.as_ref().get_chain_config(commitment).await {
959 Ok(cf) => return Ok(cf),
960 Err(err) => {
961 tracing::info!("chain config is not in memory, trying storage: {err:#}");
962 },
963 }
964
965 self.inner().get_chain_config(commitment).await
967 }
968 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
969 match self.as_ref().get_leaf_chain(height).await {
971 Ok(cf) => return Ok(cf),
972 Err(err) => {
973 tracing::info!("leaf chain is not in memory, trying storage: {err:#}");
974 },
975 }
976
977 self.inner().get_leaf_chain(height).await
979 }
980
981 #[tracing::instrument(skip(self, instance))]
982 async fn get_reward_accounts_v2(
983 &self,
984 instance: &NodeState,
985 height: u64,
986 view: ViewNumber,
987 accounts: &[RewardAccountV2],
988 ) -> anyhow::Result<RewardMerkleTreeV2> {
989 match self
991 .as_ref()
992 .get_reward_accounts_v2(instance, height, view, accounts)
993 .await
994 {
995 Ok(accounts) => return Ok(accounts),
996 Err(err) => {
997 tracing::info!("reward accounts not in memory, trying storage: {err:#}");
998 },
999 }
1000
1001 let (tree, leaf) = self
1003 .inner()
1004 .get_reward_accounts_v2(instance, height, view, accounts)
1005 .await
1006 .context("accounts not in memory, and could not fetch from storage")?;
1007
1008 let consensus = self
1011 .as_ref()
1012 .consensus()
1013 .await
1014 .read()
1015 .await
1016 .consensus()
1017 .clone();
1018 if let Err(err) =
1019 add_v2_reward_accounts_to_state::<N, P>(&consensus, &view, accounts, &tree, leaf).await
1020 {
1021 tracing::warn!(?view, "cannot update fetched account state: {err:#}");
1022 }
1023 tracing::info!(?view, "updated with fetched account state");
1024
1025 Ok(tree)
1026 }
1027
1028 #[tracing::instrument(skip(self, instance))]
1029 async fn get_reward_accounts_v1(
1030 &self,
1031 instance: &NodeState,
1032 height: u64,
1033 view: ViewNumber,
1034 accounts: &[RewardAccountV1],
1035 ) -> anyhow::Result<RewardMerkleTreeV1> {
1036 match self
1038 .as_ref()
1039 .get_reward_accounts_v1(instance, height, view, accounts)
1040 .await
1041 {
1042 Ok(accounts) => return Ok(accounts),
1043 Err(err) => {
1044 tracing::info!("reward accounts not in memory, trying storage: {err:#}");
1045 },
1046 }
1047
1048 let (tree, leaf) = self
1050 .inner()
1051 .get_reward_accounts_v1(instance, height, view, accounts)
1052 .await
1053 .context("accounts not in memory, and could not fetch from storage")?;
1054
1055 let consensus = self
1058 .as_ref()
1059 .consensus()
1060 .await
1061 .read()
1062 .await
1063 .consensus()
1064 .clone();
1065 if let Err(err) =
1066 add_v1_reward_accounts_to_state::<N, P>(&consensus, &view, accounts, &tree, leaf).await
1067 {
1068 tracing::warn!(?view, "cannot update fetched account state: {err:#}");
1069 }
1070 tracing::info!(?view, "updated with fetched account state");
1071
1072 Ok(tree)
1073 }
1074
1075 async fn get_reward_merkle_tree_v2(
1076 &self,
1077 height: u64,
1078 view: ViewNumber,
1079 ) -> anyhow::Result<Vec<u8>> {
1080 self.as_ref().get_reward_merkle_tree_v2(height, view).await
1081 }
1082
1083 #[tracing::instrument(skip(self))]
1084 async fn get_state_cert(
1085 &self,
1086 epoch: u64,
1087 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1088 let consensus = self.as_ref().consensus().await;
1089 let consensus_lock = consensus.read().await;
1090 let persistence = consensus_lock.storage();
1091
1092 persistence
1093 .get_state_cert_by_epoch(epoch)
1094 .await?
1095 .context(format!("state cert for epoch {epoch} not found"))
1096 }
1097}
1098
1099impl<N, P> NodeStateDataSource for ApiState<N, P>
1100where
1101 N: ConnectedNetwork<PubKey>,
1102 P: SequencerPersistence,
1103{
1104 async fn node_state(&self) -> NodeState {
1105 self.sequencer_context.as_ref().get().await.node_state()
1106 }
1107}
1108
1109impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> CatchupDataSource for ApiState<N, P> {
1110 #[tracing::instrument(skip(self, _instance))]
1111 async fn get_accounts(
1112 &self,
1113 _instance: &NodeState,
1114 height: u64,
1115 view: ViewNumber,
1116 accounts: &[FeeAccount],
1117 ) -> anyhow::Result<FeeMerkleTree> {
1118 let state = self
1119 .consensus()
1120 .await
1121 .read()
1122 .await
1123 .state(view)
1124 .await
1125 .context(format!(
1126 "state not available for height {height}, view {view}"
1127 ))?;
1128 retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
1129 }
1130
1131 #[tracing::instrument(skip(self, _instance))]
1132 async fn get_frontier(
1133 &self,
1134 _instance: &NodeState,
1135 height: u64,
1136 view: ViewNumber,
1137 ) -> anyhow::Result<BlocksFrontier> {
1138 let state = self
1139 .consensus()
1140 .await
1141 .read()
1142 .await
1143 .state(view)
1144 .await
1145 .context(format!(
1146 "state not available for height {height}, view {view}"
1147 ))?;
1148 let tree = &state.block_merkle_tree;
1149 let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
1150 Ok(frontier)
1151 }
1152
1153 async fn get_chain_config(
1154 &self,
1155 commitment: Commitment<ChainConfig>,
1156 ) -> anyhow::Result<ChainConfig> {
1157 let state = self.consensus().await.read().await.decided_state().await;
1158 let chain_config = state.chain_config;
1159
1160 if chain_config.commit() == commitment {
1161 chain_config.resolve().context("chain config found")
1162 } else {
1163 bail!("chain config not found")
1164 }
1165 }
1166
1167 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
1168 let mut leaves = self
1169 .consensus()
1170 .await
1171 .read()
1172 .await
1173 .consensus()
1174 .read()
1175 .await
1176 .undecided_leaves();
1177 leaves.sort_by_key(|l| l.view_number());
1178 let (position, mut last_leaf) = leaves
1179 .iter()
1180 .find_position(|l| l.height() == height)
1181 .context(format!("leaf chain not available for {height}"))?;
1182 let mut chain = vec![last_leaf.clone()];
1183 for leaf in leaves.iter().skip(position + 1) {
1184 if leaf.justify_qc().view_number() == last_leaf.view_number() {
1185 chain.push(leaf.clone());
1186 } else {
1187 continue;
1188 }
1189 if leaf.view_number() == last_leaf.view_number() + 1 {
1190 last_leaf = leaf;
1192 break;
1193 }
1194 last_leaf = leaf;
1195 }
1196 for leaf in leaves
1198 .iter()
1199 .skip_while(|l| l.view_number() <= last_leaf.view_number())
1200 {
1201 if leaf.justify_qc().view_number() == last_leaf.view_number() {
1202 chain.push(leaf.clone());
1203 return Ok(chain);
1204 }
1205 }
1206 bail!(format!("leaf chain not available for {height}"))
1207 }
1208
1209 #[tracing::instrument(skip(self, _instance))]
1210 async fn get_reward_accounts_v2(
1211 &self,
1212 _instance: &NodeState,
1213 height: u64,
1214 view: ViewNumber,
1215 accounts: &[RewardAccountV2],
1216 ) -> anyhow::Result<RewardMerkleTreeV2> {
1217 let state = self
1218 .consensus()
1219 .await
1220 .read()
1221 .await
1222 .state(view)
1223 .await
1224 .context(format!(
1225 "state not available for height {height}, view {view}"
1226 ))?;
1227
1228 retain_v2_reward_accounts(&state.reward_merkle_tree_v2, accounts.iter().copied())
1229 }
1230
1231 #[tracing::instrument(skip(self, _instance))]
1232 async fn get_reward_accounts_v1(
1233 &self,
1234 _instance: &NodeState,
1235 height: u64,
1236 view: ViewNumber,
1237 accounts: &[RewardAccountV1],
1238 ) -> anyhow::Result<RewardMerkleTreeV1> {
1239 let state = self
1240 .consensus()
1241 .await
1242 .read()
1243 .await
1244 .state(view)
1245 .await
1246 .context(format!(
1247 "state not available for height {height}, view {view}"
1248 ))?;
1249
1250 retain_v1_reward_accounts(&state.reward_merkle_tree_v1, accounts.iter().copied())
1251 }
1252
1253 async fn get_reward_merkle_tree_v2(
1254 &self,
1255 height: u64,
1256 view: ViewNumber,
1257 ) -> anyhow::Result<Vec<u8>> {
1258 let state = self
1259 .consensus()
1260 .await
1261 .read()
1262 .await
1263 .state(view)
1264 .await
1265 .context(format!(
1266 "state not available for height {height}, view {view}"
1267 ))?;
1268
1269 let merkle_tree_bytes = bincode::serialize(&TryInto::<RewardMerkleTreeV2Data>::try_into(
1270 &state.reward_merkle_tree_v2,
1271 )?)
1272 .context("Merkle tree serialization failed; this should never happen.")?;
1273
1274 Ok(merkle_tree_bytes)
1275 }
1276
1277 async fn get_state_cert(
1278 &self,
1279 epoch: u64,
1280 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1281 self.get_state_cert_by_epoch(epoch)
1282 .await?
1283 .context(format!("state cert not found for epoch {epoch}"))
1284 }
1285}
1286
1287impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence> HotShotConfigDataSource
1288 for StorageState<N, P, D>
1289{
1290 async fn get_config(&self) -> PublicNetworkConfig {
1291 self.as_ref().network_config().await.into()
1292 }
1293}
1294
1295impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> HotShotConfigDataSource
1296 for ApiState<N, P>
1297{
1298 async fn get_config(&self) -> PublicNetworkConfig {
1299 self.network_config().await.into()
1300 }
1301}
1302
1303#[async_trait]
1304impl<N: ConnectedNetwork<PubKey>, D: Sync, P: SequencerPersistence> StateSignatureDataSource<N>
1305 for StorageState<N, P, D>
1306{
1307 async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody> {
1308 self.as_ref().get_state_signature(height).await
1309 }
1310}
1311
1312#[async_trait]
1313impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> StateSignatureDataSource<N>
1314 for ApiState<N, P>
1315{
1316 async fn get_state_signature(&self, height: u64) -> Option<LCV3StateSignatureRequestBody> {
1317 self.state_signer()
1318 .await
1319 .read()
1320 .await
1321 .get_state_signature(height)
1322 .await
1323 }
1324}
1325
1326#[derive(Serialize, Deserialize, Debug, Clone)]
1327pub(crate) struct RewardMerkleTreeV2Data {
1329 pub balances: Vec<(RewardAccountV2, RewardAmount)>,
1330}
1331
1332impl TryInto<RewardMerkleTreeV2Data> for &RewardMerkleTreeV2 {
1333 type Error = anyhow::Error;
1334 fn try_into(self) -> anyhow::Result<RewardMerkleTreeV2Data> {
1336 let num_leaves = self.num_leaves();
1337
1338 let balances: Vec<_> = self
1339 .iter()
1340 .map(|(account, balance)| (*account, *balance))
1341 .collect();
1342
1343 if balances.len() as u64 == num_leaves {
1344 Ok(RewardMerkleTreeV2Data { balances })
1345 } else {
1346 tracing::error!(
1347 "Attempted to serialize an incomplete RewardMerkleTreeV2. This is not a fatal \
1348 error, but it should never happen and indicates that something may be seriously \
1349 wrong. Balances length: {}, num_leaves: {}.",
1350 balances.len(),
1351 num_leaves
1352 );
1353 bail!(
1354 "Failed to convert RewardMerkleTreeV2 into key-value pairs. Some accounts are \
1355 missing."
1356 );
1357 }
1358 }
1359}
1360
1361pub(crate) trait RewardMerkleTreeDataSource: Send + Sync + Clone + 'static {
1362 fn load_v1_reward_account_proof(
1363 &self,
1364 _height: u64,
1365 _account: RewardAccountV1,
1366 ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV1>>;
1367
1368 fn save_and_gc_reward_tree_v2(
1369 &self,
1370 node_state: &NodeState,
1371 height: u64,
1372 version: Version,
1373 merkle_tree: &RewardMerkleTreeV2,
1374 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1375 async move {
1376 let serialization =
1377 bincode::serialize(&TryInto::<RewardMerkleTreeV2Data>::try_into(merkle_tree)?)
1378 .context("Merkle tree serialization failed")?;
1379 self.persist_tree(height, serialization).await?;
1380
1381 if cfg!(any(test, feature = "testing")) {
1383 return Ok(());
1384 }
1385
1386 let finalized_hotshot_height = match node_state.finalized_hotshot_height().await {
1387 Ok(h) => h,
1388 Err(err) => {
1389 tracing::warn!("failed to get finalized hotshot height: {err:#}");
1390 return Ok(());
1391 },
1392 };
1393
1394 let mut gc_height = height.min(finalized_hotshot_height);
1396
1397 if version >= versions::EPOCH_REWARD_VERSION
1399 && let Some(epoch_height) = node_state.epoch_height
1400 {
1401 let current_epoch = epoch_from_block_number(height, epoch_height);
1402 let gc_epoch = current_epoch.saturating_sub(4);
1403 gc_height = gc_height.min(gc_epoch * epoch_height);
1404 }
1405 if let Err(err) = self.garbage_collect(gc_height).await {
1406 tracing::info!(gc_height, "failed to garbage collect: {err:#}");
1407 }
1408
1409 Ok(())
1410 }
1411 }
1412
1413 fn persist_reward_proofs(
1414 &self,
1415 node_state: &NodeState,
1416 height: u64,
1417 version: Version,
1418 ) -> impl Send + Future<Output = anyhow::Result<()>>;
1419
1420 fn load_reward_merkle_tree_v2(
1421 &self,
1422 height: u64,
1423 ) -> impl Send + Future<Output = anyhow::Result<PermittedRewardMerkleTreeV2>> {
1424 async move {
1425 let tree_bytes = self.load_tree(height).await?;
1426
1427 let tree_data = bincode::deserialize::<RewardMerkleTreeV2Data>(&tree_bytes).context(
1428 "Failed to deserialize RewardMerkleTreeV2 for height {height} from storage; this \
1429 should never happen.",
1430 )?;
1431
1432 PermittedRewardMerkleTreeV2::try_from_kv_set(tree_data.balances)
1433 .await
1434 .context("Failed to reconstruct reward merkle tree from storage")
1435 }
1436 }
1437
1438 fn load_reward_account_proof_v2(
1439 &self,
1440 _height: u64,
1441 _account: RewardAccountV2,
1442 ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV2>> {
1443 async {
1444 bail!("load_reward_account_proof_v2 is not supported for this data source");
1445 }
1446 }
1447
1448 fn load_latest_reward_account_proof_v2(
1449 &self,
1450 account: RewardAccountV2,
1451 ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV2>> {
1452 async move {
1453 let serialized_account = bincode::serialize(&account).context(
1454 "Failed to serialize RewardAccountV2 for lookup; this should never happen.",
1455 )?;
1456 let proof_bytes = self.load_latest_proof(serialized_account).await?;
1457
1458 bincode::deserialize::<RewardAccountQueryDataV2>(&proof_bytes).context(
1459 "Failed to deserialize RewardAccountQueryDataV2 for account {account} from \
1460 storage; this should never happen.",
1461 )
1462 }
1463 }
1464
1465 fn persist_tree(
1466 &self,
1467 height: u64,
1468 merkle_tree: Vec<u8>,
1469 ) -> impl Send + Future<Output = anyhow::Result<()>>;
1470
1471 fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>>;
1472
1473 fn persist_proofs(
1474 &self,
1475 height: u64,
1476 proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
1477 ) -> impl Send + Future<Output = anyhow::Result<()>>;
1478
1479 fn load_proof(
1480 &self,
1481 height: u64,
1482 account: Vec<u8>,
1483 epoch_height: u64,
1484 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>>;
1485
1486 fn load_latest_proof(
1487 &self,
1488 account: Vec<u8>,
1489 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>>;
1490
1491 fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool>;
1492
1493 fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>>;
1495}
1496
1497impl RewardMerkleTreeDataSource for hotshot_query_service::data_source::MetricsDataSource {
1498 fn load_v1_reward_account_proof(
1499 &self,
1500 _height: u64,
1501 _account: RewardAccountV1,
1502 ) -> impl Send + Future<Output = anyhow::Result<RewardAccountQueryDataV1>> {
1503 async {
1504 bail!("reward merklized state is not supported for this data source");
1505 }
1506 }
1507
1508 fn persist_reward_proofs(
1509 &self,
1510 _node_state: &NodeState,
1511 _height: u64,
1512 _version: Version,
1513 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1514 async {
1515 bail!("reward merklized state is not supported for this data source");
1516 }
1517 }
1518
1519 fn persist_tree(
1520 &self,
1521 _height: u64,
1522 _merkle_tree: Vec<u8>,
1523 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1524 async move {
1525 bail!("reward merklized state is not supported for this data source");
1526 }
1527 }
1528
1529 fn load_tree(&self, _height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1530 async move {
1531 bail!("reward merklized state is not supported for this data source");
1532 }
1533 }
1534
1535 fn garbage_collect(&self, _height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
1536 async move {
1537 bail!("reward merklized state is not supported for this data source");
1538 }
1539 }
1540
1541 fn persist_proofs(
1542 &self,
1543 _height: u64,
1544 _proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
1545 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1546 async move {
1547 bail!("reward merklized state is not supported for this data source");
1548 }
1549 }
1550
1551 fn load_proof(
1552 &self,
1553 _height: u64,
1554 _account: Vec<u8>,
1555 _epoch_height: u64,
1556 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1557 async move {
1558 bail!("reward merklized state is not supported for this data source");
1559 }
1560 }
1561
1562 fn load_latest_proof(
1563 &self,
1564 _account: Vec<u8>,
1565 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1566 async move {
1567 bail!("reward merklized state is not supported for this data source");
1568 }
1569 }
1570
1571 fn proof_exists(&self, _height: u64) -> impl Send + Future<Output = bool> {
1572 async move { false }
1573 }
1574}
1575
1576impl<T, S> RewardMerkleTreeDataSource
1577 for hotshot_query_service::data_source::ExtensibleDataSource<T, S>
1578where
1579 T: RewardMerkleTreeDataSource,
1580 S: Send + Sync + Clone + NodeStateDataSource + 'static,
1581{
1582 async fn load_v1_reward_account_proof(
1583 &self,
1584 height: u64,
1585 account: RewardAccountV1,
1586 ) -> anyhow::Result<RewardAccountQueryDataV1> {
1587 self.inner()
1588 .load_v1_reward_account_proof(height, account)
1589 .await
1590 }
1591
1592 async fn load_reward_account_proof_v2(
1593 &self,
1594 height: u64,
1595 account: RewardAccountV2,
1596 ) -> anyhow::Result<RewardAccountQueryDataV2> {
1597 let epoch_height = self
1598 .as_ref()
1599 .node_state()
1600 .await
1601 .epoch_height
1602 .context("epoch height not found")?;
1603 let serialized_account = bincode::serialize(&account)
1604 .context("Failed to serialize RewardAccountV2 for lookup; this should never happen.")?;
1605 let proof_bytes = self
1606 .inner()
1607 .load_proof(height, serialized_account, epoch_height)
1608 .await?;
1609
1610 bincode::deserialize::<RewardAccountQueryDataV2>(&proof_bytes)
1611 .context("Failed to deserialize RewardAccountQueryDataV2 from storage")
1612 }
1613
1614 fn persist_tree(
1615 &self,
1616 height: u64,
1617 merkle_tree: Vec<u8>,
1618 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1619 async move { self.inner().persist_tree(height, merkle_tree).await }
1620 }
1621
1622 fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1623 async move { self.inner().load_tree(height).await }
1624 }
1625
1626 fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
1627 async move { self.inner().garbage_collect(height).await }
1628 }
1629
1630 fn persist_proofs(
1631 &self,
1632 height: u64,
1633 proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
1634 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1635 async move { self.inner().persist_proofs(height, proofs).await }
1636 }
1637
1638 fn load_proof(
1639 &self,
1640 height: u64,
1641 account: Vec<u8>,
1642 epoch_height: u64,
1643 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1644 async move { self.inner().load_proof(height, account, epoch_height).await }
1645 }
1646
1647 fn load_latest_proof(
1648 &self,
1649 account: Vec<u8>,
1650 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
1651 async move { self.inner().load_latest_proof(account).await }
1652 }
1653
1654 fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool> {
1655 async move { self.inner().proof_exists(height).await }
1656 }
1657
1658 fn persist_reward_proofs(
1659 &self,
1660 node_state: &NodeState,
1661 height: u64,
1662 version: Version,
1663 ) -> impl Send + Future<Output = anyhow::Result<()>> {
1664 async move {
1665 self.inner()
1666 .persist_reward_proofs(node_state, height, version)
1667 .await
1668 }
1669 }
1670}
1671
1672#[cfg(any(test, feature = "testing"))]
1673pub mod test_helpers {
1674 use std::{cmp::max, time::Duration};
1675
1676 use alloy::{
1677 network::EthereumWallet,
1678 primitives::{Address, U256},
1679 providers::{ProviderBuilder, ext::AnvilApi},
1680 };
1681 use committable::Committable;
1682 use espresso_contract_deployer::{
1683 Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS, builder::DeployerArgsBuilder,
1684 network_config::light_client_genesis_from_stake_table,
1685 };
1686 use espresso_types::{
1687 MOCK_SEQUENCER_VERSIONS, NamespaceId, ValidatedState,
1688 v0::traits::{NullEventConsumer, PersistenceOptions, StateCatchup},
1689 };
1690 use futures::{
1691 future::{FutureExt, join_all},
1692 stream::StreamExt,
1693 };
1694 use hotshot::types::{Event, EventType};
1695 use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
1696 use hotshot_types::{
1697 event::LeafInfo, light_client::LCV3StateSignatureRequestBody, traits::metrics::NoMetrics,
1698 };
1699 use itertools::izip;
1700 use jf_merkle_tree_compat::{MerkleCommitment, MerkleTreeScheme};
1701 use staking_cli::demo::{DelegationConfig, StakingTransactions};
1702 use surf_disco::Client;
1703 use tempfile::TempDir;
1704 use test_utils::reserve_tcp_port;
1705 use tide_disco::{Api, App, Error, StatusCode, error::ServerError};
1706 use tokio::{spawn, task::JoinHandle, time::sleep};
1707 use url::Url;
1708 use vbs::version::{StaticVersion, StaticVersionType};
1709 use versions::{EPOCH_VERSION, Upgrade};
1710
1711 use super::*;
1712 use crate::{
1713 catchup::NullStateCatchup,
1714 network,
1715 persistence::no_storage,
1716 testing::{TestConfig, TestConfigBuilder, run_legacy_builder, wait_for_decide_on_handle},
1717 };
1718
1719 pub const STAKE_TABLE_CAPACITY_FOR_TEST: usize = 10;
1720
1721 pub struct TestNetwork<P: PersistenceOptions, const NUM_NODES: usize> {
1722 pub server: SequencerContext<network::Memory, P::Persistence>,
1723 pub peers: Vec<SequencerContext<network::Memory, P::Persistence>>,
1724 pub cfg: TestConfig<{ NUM_NODES }>,
1725 pub temp_dir: Option<TempDir>,
1727 pub contracts: Option<Contracts>,
1728 }
1729
1730 pub struct TestNetworkConfig<const NUM_NODES: usize, P, C>
1731 where
1732 P: PersistenceOptions,
1733 C: StateCatchup + 'static,
1734 {
1735 state: [ValidatedState; NUM_NODES],
1736 persistence: [P; NUM_NODES],
1737 catchup: [C; NUM_NODES],
1738 network_config: TestConfig<{ NUM_NODES }>,
1739 api_config: Options,
1740 contracts: Option<Contracts>,
1741 }
1742
1743 impl<const NUM_NODES: usize, P, C> TestNetworkConfig<{ NUM_NODES }, P, C>
1744 where
1745 P: PersistenceOptions,
1746 C: StateCatchup + 'static,
1747 {
1748 pub fn states(&self) -> [ValidatedState; NUM_NODES] {
1749 self.state.clone()
1750 }
1751 }
1752
1753 #[derive(Clone)]
1754 pub struct TestNetworkConfigBuilder<const NUM_NODES: usize, P, C>
1755 where
1756 P: PersistenceOptions,
1757 C: StateCatchup + 'static,
1758 {
1759 state: [ValidatedState; NUM_NODES],
1760 persistence: Option<[P; NUM_NODES]>,
1761 catchup: Option<[C; NUM_NODES]>,
1762 api_config: Option<Options>,
1763 network_config: Option<TestConfig<{ NUM_NODES }>>,
1764 contracts: Option<Contracts>,
1765 initial_token_supply: Option<U256>,
1766 }
1767
1768 impl Default for TestNetworkConfigBuilder<5, no_storage::Options, NullStateCatchup> {
1769 fn default() -> Self {
1770 TestNetworkConfigBuilder {
1771 state: std::array::from_fn(|_| ValidatedState::default()),
1772 persistence: Some([no_storage::Options; 5]),
1773 catchup: Some(std::array::from_fn(|_| NullStateCatchup::default())),
1774 network_config: None,
1775 api_config: None,
1776 contracts: None,
1777 initial_token_supply: None,
1778 }
1779 }
1780 }
1781
1782 impl<const NUM_NODES: usize>
1783 TestNetworkConfigBuilder<{ NUM_NODES }, no_storage::Options, NullStateCatchup>
1784 {
1785 pub fn with_num_nodes()
1786 -> TestNetworkConfigBuilder<{ NUM_NODES }, no_storage::Options, NullStateCatchup> {
1787 TestNetworkConfigBuilder {
1788 state: std::array::from_fn(|_| ValidatedState::default()),
1789 persistence: Some([no_storage::Options; { NUM_NODES }]),
1790 catchup: Some(std::array::from_fn(|_| NullStateCatchup::default())),
1791 network_config: None,
1792 api_config: None,
1793 contracts: None,
1794 initial_token_supply: None,
1795 }
1796 }
1797 }
1798
1799 impl<const NUM_NODES: usize, P, C> TestNetworkConfigBuilder<{ NUM_NODES }, P, C>
1800 where
1801 P: PersistenceOptions,
1802 C: StateCatchup + 'static,
1803 {
1804 pub fn states(mut self, state: [ValidatedState; NUM_NODES]) -> Self {
1805 self.state = state;
1806 self
1807 }
1808
1809 pub fn initial_token_supply(mut self, supply: U256) -> Self {
1810 self.initial_token_supply = Some(supply);
1811 self
1812 }
1813
1814 pub fn persistences<NP: PersistenceOptions>(
1815 self,
1816 persistence: [NP; NUM_NODES],
1817 ) -> TestNetworkConfigBuilder<{ NUM_NODES }, NP, C> {
1818 TestNetworkConfigBuilder {
1819 state: self.state,
1820 catchup: self.catchup,
1821 network_config: self.network_config,
1822 api_config: self.api_config,
1823 persistence: Some(persistence),
1824 contracts: self.contracts,
1825 initial_token_supply: self.initial_token_supply,
1826 }
1827 }
1828
1829 pub fn api_config(mut self, api_config: Options) -> Self {
1830 self.api_config = Some(api_config);
1831 self
1832 }
1833
1834 pub fn catchups<NC: StateCatchup + 'static>(
1835 self,
1836 catchup: [NC; NUM_NODES],
1837 ) -> TestNetworkConfigBuilder<{ NUM_NODES }, P, NC> {
1838 TestNetworkConfigBuilder {
1839 state: self.state,
1840 catchup: Some(catchup),
1841 network_config: self.network_config,
1842 api_config: self.api_config,
1843 persistence: self.persistence,
1844 contracts: self.contracts,
1845 initial_token_supply: self.initial_token_supply,
1846 }
1847 }
1848
1849 pub fn network_config(mut self, network_config: TestConfig<{ NUM_NODES }>) -> Self {
1850 self.network_config = Some(network_config);
1851 self
1852 }
1853
1854 pub fn contracts(mut self, contracts: Contracts) -> Self {
1855 self.contracts = Some(contracts);
1856 self
1857 }
1858
1859 pub async fn pos_hook(
1862 self,
1863 delegation_config: DelegationConfig,
1864 stake_table_version: StakeTableContractVersion,
1865 upgrade: Upgrade,
1866 ) -> anyhow::Result<Self> {
1867 if upgrade.base < EPOCH_VERSION && upgrade.target < EPOCH_VERSION {
1868 panic!("given version does not require pos deployment");
1869 };
1870
1871 let network_config = self
1872 .network_config
1873 .as_ref()
1874 .expect("network_config is required");
1875
1876 let l1_url = network_config.l1_url();
1877 let signer = network_config.signer();
1878 let deployer = ProviderBuilder::new()
1879 .wallet(EthereumWallet::from(signer.clone()))
1880 .connect_http(l1_url.clone());
1881
1882 let blocks_per_epoch = network_config.hotshot_config().epoch_height;
1883 let epoch_start_block = network_config.hotshot_config().epoch_start_block;
1884 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1885 &network_config.hotshot_config().hotshot_stake_table(),
1886 STAKE_TABLE_CAPACITY_FOR_TEST,
1887 )
1888 .unwrap();
1889
1890 let mut contracts = Contracts::new();
1891 let args = DeployerArgsBuilder::default()
1892 .deployer(deployer.clone())
1893 .rpc_url(l1_url.clone())
1894 .mock_light_client(true)
1895 .genesis_lc_state(genesis_state)
1896 .genesis_st_state(genesis_stake)
1897 .blocks_per_epoch(blocks_per_epoch)
1898 .epoch_start_block(epoch_start_block)
1899 .exit_escrow_period(U256::from(max(
1900 blocks_per_epoch * 15 + 100,
1901 DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1902 )))
1903 .multisig_pauser(signer.address())
1904 .token_name("Espresso".to_string())
1905 .token_symbol("ESP".to_string())
1906 .initial_token_supply(self.initial_token_supply.unwrap_or(U256::from(100000u64)))
1907 .ops_timelock_delay(U256::from(0))
1908 .ops_timelock_admin(signer.address())
1909 .ops_timelock_proposers(vec![signer.address()])
1910 .ops_timelock_executors(vec![signer.address()])
1911 .safe_exit_timelock_delay(U256::from(10))
1912 .safe_exit_timelock_admin(signer.address())
1913 .safe_exit_timelock_proposers(vec![signer.address()])
1914 .safe_exit_timelock_executors(vec![signer.address()])
1915 .build()
1916 .unwrap();
1917
1918 match stake_table_version {
1919 StakeTableContractVersion::V1 => {
1920 args.deploy_to_stake_table_v1(&mut contracts).await
1921 },
1922 StakeTableContractVersion::V2 => args.deploy_all(&mut contracts).await,
1923 }
1924 .context("failed to deploy contracts")?;
1925
1926 let stake_table_address = contracts
1927 .address(Contract::StakeTableProxy)
1928 .expect("StakeTableProxy address not found");
1929
1930 StakingTransactions::create(
1931 l1_url.clone(),
1932 &deployer,
1933 stake_table_address,
1934 network_config.staking_priv_keys(),
1935 None,
1936 delegation_config,
1937 )
1938 .await
1939 .expect("stake table setup failed")
1940 .apply_all()
1941 .await
1942 .expect("send all txns failed");
1943
1944 if let Some(anvil) = network_config.anvil() {
1949 anvil
1950 .anvil_set_interval_mining(1)
1951 .await
1952 .expect("interval mining");
1953 }
1954
1955 let state = self.state[0].clone();
1959 let chain_config = if let Some(cf) = state.chain_config.resolve() {
1960 ChainConfig {
1961 base_fee: 0.into(),
1962 stake_table_contract: Some(stake_table_address),
1963 ..cf
1964 }
1965 } else {
1966 ChainConfig {
1967 base_fee: 0.into(),
1968 stake_table_contract: Some(stake_table_address),
1969 ..Default::default()
1970 }
1971 };
1972
1973 let state = ValidatedState {
1974 chain_config: chain_config.into(),
1975 ..state
1976 };
1977 Ok(self
1978 .states(std::array::from_fn(|_| state.clone()))
1979 .contracts(contracts))
1980 }
1981
1982 pub fn build(self) -> TestNetworkConfig<{ NUM_NODES }, P, C> {
1983 TestNetworkConfig {
1984 state: self.state,
1985 persistence: self.persistence.unwrap(),
1986 catchup: self.catchup.unwrap(),
1987 network_config: self.network_config.unwrap(),
1988 api_config: self.api_config.unwrap(),
1989 contracts: self.contracts,
1990 }
1991 }
1992 }
1993
1994 impl<P: PersistenceOptions, const NUM_NODES: usize> TestNetwork<P, { NUM_NODES }> {
1995 pub async fn new<C: StateCatchup + 'static>(
1996 cfg: TestNetworkConfig<{ NUM_NODES }, P, C>,
1997 upgrade: versions::Upgrade,
1998 ) -> Self {
1999 let mut cfg = cfg;
2000 let mut builder_tasks = Vec::new();
2001
2002 let chain_config = cfg.state[0].chain_config.resolve();
2003 if chain_config.is_none() {
2004 tracing::warn!("Chain config is not set, using default max_block_size");
2005 }
2006 let (task, builder_url) = run_legacy_builder::<{ NUM_NODES }>(
2007 cfg.network_config.builder_port(),
2008 chain_config.map(|c| *c.max_block_size),
2009 )
2010 .await;
2011 builder_tasks.push(task);
2012 cfg.network_config
2013 .set_builder_urls(vec1::vec1![builder_url.clone()]);
2014
2015 let mut opt = cfg.api_config.clone();
2017 let temp_dir = if opt.storage_fs.is_none() && opt.storage_sql.is_none() {
2018 let temp_dir = tempfile::tempdir().unwrap();
2019 opt = opt.query_fs(
2020 Default::default(),
2021 crate::persistence::fs::Options::new(temp_dir.path().to_path_buf()),
2022 );
2023 Some(temp_dir)
2024 } else {
2025 None
2026 };
2027
2028 let mut nodes = join_all(
2029 izip!(cfg.state, cfg.persistence, cfg.catchup)
2030 .enumerate()
2031 .map(|(i, (state, persistence, state_peers))| {
2032 let opt = opt.clone();
2033 let cfg = &cfg.network_config;
2034 let upgrades_map = cfg.upgrades();
2035 async move {
2036 if i == 0 {
2037 opt.serve(|metrics, consumer, storage| {
2038 let cfg = cfg.clone();
2039 async move {
2040 Ok(cfg
2041 .init_node(
2042 0,
2043 state,
2044 persistence,
2045 Some(state_peers),
2046 storage,
2047 &*metrics,
2048 STAKE_TABLE_CAPACITY_FOR_TEST,
2049 consumer,
2050 upgrade,
2051 upgrades_map,
2052 )
2053 .await)
2054 }
2055 .boxed()
2056 })
2057 .await
2058 .unwrap()
2059 } else {
2060 cfg.init_node(
2061 i,
2062 state,
2063 persistence,
2064 Some(state_peers),
2065 None,
2066 &NoMetrics,
2067 STAKE_TABLE_CAPACITY_FOR_TEST,
2068 NullEventConsumer,
2069 upgrade,
2070 upgrades_map,
2071 )
2072 .await
2073 }
2074 }
2075 }),
2076 )
2077 .await;
2078
2079 let handle_0 = &nodes[0];
2080
2081 for builder_task in builder_tasks {
2083 builder_task.start(Box::new(handle_0.event_stream().await));
2084 }
2085
2086 for ctx in &nodes {
2087 ctx.start_consensus().await;
2088 }
2089
2090 let server = nodes.remove(0);
2091 let peers = nodes;
2092
2093 Self {
2094 server,
2095 peers,
2096 cfg: cfg.network_config,
2097 temp_dir,
2098 contracts: cfg.contracts,
2099 }
2100 }
2101
2102 pub async fn stop_consensus(&mut self) {
2103 self.server.shutdown_consensus().await;
2104
2105 for ctx in &mut self.peers {
2106 ctx.shutdown_consensus().await;
2107 }
2108 }
2109 }
2110
2111 pub async fn status_test_helper(opt: impl FnOnce(Options) -> Options) {
2119 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2120 let url = format!("http://localhost:{port}").parse().unwrap();
2121 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2122
2123 let options = opt(Options::with_port(port));
2124 let network_config = TestConfigBuilder::default().build();
2125 let config = TestNetworkConfigBuilder::default()
2126 .api_config(options)
2127 .network_config(network_config)
2128 .build();
2129 let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2130 client.connect(None).await;
2131
2132 while client
2136 .get::<u64>("status/block-height")
2137 .send()
2138 .await
2139 .unwrap()
2140 <= 1
2141 {
2142 sleep(Duration::from_secs(1)).await;
2143 }
2144 let success_rate = client
2145 .get::<f64>("status/success-rate")
2146 .send()
2147 .await
2148 .unwrap();
2149 assert!(success_rate.is_finite(), "{success_rate}");
2152 assert!(success_rate > 0.0, "{success_rate}");
2154 }
2155
2156 pub async fn submit_test_helper(opt: impl FnOnce(Options) -> Options) {
2164 let txn = Transaction::new(NamespaceId::from(1_u32), vec![1, 2, 3, 4]);
2165
2166 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2167
2168 let url = format!("http://localhost:{port}").parse().unwrap();
2169 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2170
2171 let options = opt(Options::with_port(port).submit(Default::default()));
2172 let network_config = TestConfigBuilder::default().build();
2173 let config = TestNetworkConfigBuilder::default()
2174 .api_config(options)
2175 .network_config(network_config)
2176 .build();
2177 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2178 let mut events = network.server.event_stream().await;
2179
2180 client.connect(None).await;
2181
2182 let hash = client
2183 .post("submit/submit")
2184 .body_json(&txn)
2185 .unwrap()
2186 .send()
2187 .await
2188 .unwrap();
2189 assert_eq!(txn.commit(), hash);
2190
2191 wait_for_decide_on_handle(&mut events, &txn).await;
2193 }
2194
2195 pub async fn state_signature_test_helper(opt: impl FnOnce(Options) -> Options) {
2197 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2198
2199 let url = format!("http://localhost:{port}").parse().unwrap();
2200
2201 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2202
2203 let options = opt(Options::with_port(port));
2204 let network_config = TestConfigBuilder::default().build();
2205 let config = TestNetworkConfigBuilder::default()
2206 .api_config(options)
2207 .network_config(network_config)
2208 .build();
2209 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2210
2211 let mut height: u64;
2212 loop {
2215 height = network.server.decided_leaf().await.height();
2216 sleep(std::time::Duration::from_secs(1)).await;
2217 if height >= 2 {
2218 break;
2219 }
2220 }
2221 client
2223 .get::<LCV3StateSignatureRequestBody>(&format!("state-signature/block/{height}"))
2224 .send()
2225 .await
2226 .unwrap();
2227 }
2228
2229 pub async fn catchup_test_helper(opt: impl FnOnce(Options) -> Options) {
2237 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2238 let url = format!("http://localhost:{port}").parse().unwrap();
2239 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
2240
2241 let options = opt(Options::with_port(port));
2242 let network_config = TestConfigBuilder::default().build();
2243 let config = TestNetworkConfigBuilder::default()
2244 .api_config(options)
2245 .network_config(network_config)
2246 .build();
2247 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2248 client.connect(None).await;
2249
2250 let mut events = network.server.event_stream().await;
2252 loop {
2253 if let Event {
2254 event: EventType::Decide { leaf_chain, .. },
2255 ..
2256 } = events.next().await.unwrap()
2257 && leaf_chain
2258 .iter()
2259 .any(|LeafInfo { leaf, .. }| leaf.block_header().height() > 2)
2260 {
2261 break;
2262 }
2263 }
2264
2265 {
2268 network.server.shutdown_consensus().await;
2269 }
2270
2271 let leaf = network.server.decided_leaf().await;
2273 let height = leaf.height() + 1;
2274 let view = leaf.view_number() + 1;
2275 let res = client
2276 .get::<AccountQueryData>(&format!(
2277 "catchup/{height}/{}/account/{:x}",
2278 view.u64(),
2279 Address::default()
2280 ))
2281 .send()
2282 .await
2283 .unwrap();
2284 assert_eq!(res.balance, U256::ZERO);
2285 assert_eq!(
2286 res.proof
2287 .verify(
2288 &network
2289 .server
2290 .state(view)
2291 .await
2292 .unwrap()
2293 .fee_merkle_tree
2294 .commitment()
2295 )
2296 .unwrap(),
2297 U256::ZERO,
2298 );
2299
2300 let res = client
2302 .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
2303 .send()
2304 .await
2305 .unwrap();
2306 let root = &network
2307 .server
2308 .state(view)
2309 .await
2310 .unwrap()
2311 .block_merkle_tree
2312 .commitment();
2313 BlockMerkleTree::verify(root, root.size() - 1, res)
2314 .unwrap()
2315 .unwrap();
2316 }
2317
2318 pub async fn spawn_dishonest_peer_catchup_api() -> anyhow::Result<(Url, JoinHandle<()>)> {
2319 let toml = toml::from_str::<toml::Value>(include_str!("../api/catchup.toml")).unwrap();
2320 let mut api =
2321 Api::<(), hotshot_query_service::Error, SequencerApiVersion>::new(toml).unwrap();
2322
2323 api.get("account", |_req, _state: &()| {
2324 async move {
2325 Result::<AccountQueryData, _>::Err(hotshot_query_service::Error::catch_all(
2326 StatusCode::BAD_REQUEST,
2327 "no account found".to_string(),
2328 ))
2329 }
2330 .boxed()
2331 })?
2332 .get("blocks", |_req, _state| {
2333 async move {
2334 Result::<BlocksFrontier, _>::Err(hotshot_query_service::Error::catch_all(
2335 StatusCode::BAD_REQUEST,
2336 "no block found".to_string(),
2337 ))
2338 }
2339 .boxed()
2340 })?
2341 .get("chainconfig", |_req, _state| {
2342 async move {
2343 Result::<ChainConfig, _>::Ok(ChainConfig {
2344 max_block_size: 300.into(),
2345 base_fee: 1.into(),
2346 fee_recipient: "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
2347 .parse()
2348 .unwrap(),
2349 ..Default::default()
2350 })
2351 }
2352 .boxed()
2353 })?
2354 .get("leafchain", |_req, _state| {
2355 async move {
2356 Result::<Vec<Leaf2>, _>::Err(hotshot_query_service::Error::catch_all(
2357 StatusCode::BAD_REQUEST,
2358 "No leafchain found".to_string(),
2359 ))
2360 }
2361 .boxed()
2362 })?;
2363
2364 let mut app = App::<_, hotshot_query_service::Error>::with_state(());
2365 app.with_version(env!("CARGO_PKG_VERSION").parse().unwrap());
2366
2367 app.register_module::<_, _>("catchup", api).unwrap();
2368
2369 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2370 let url: Url = Url::parse(&format!("http://localhost:{port}")).unwrap();
2371
2372 let handle = spawn({
2373 let url = url.clone();
2374 async move {
2375 let _ = app.serve(url, SequencerApiVersion::instance()).await;
2376 }
2377 });
2378
2379 Ok((url, handle))
2380 }
2381}
2382
2383#[cfg(test)]
2384mod api_tests {
2385 use std::{fmt::Debug, marker::PhantomData};
2386
2387 use committable::Committable;
2388 use data_source::testing::TestableSequencerDataSource;
2389 use espresso_types::{
2390 Header, Leaf2, MOCK_SEQUENCER_VERSIONS, NamespaceId, NamespaceProofQueryData,
2391 ValidatedState,
2392 traits::{EventConsumer, PersistenceOptions},
2393 };
2394 use futures::{future, stream::StreamExt};
2395 use hotshot_example_types::node_types::TEST_VERSIONS;
2396 use hotshot_query_service::availability::{
2397 AvailabilityDataSource, BlockQueryData, VidCommonQueryData,
2398 };
2399 use hotshot_types::{
2400 data::{
2401 DaProposal2, EpochNumber, QuorumProposal2, QuorumProposalWrapper, VidCommitment,
2402 VidDisperseShare, ns_table::parse_ns_table, vid_disperse::AvidMDisperseShare,
2403 },
2404 event::LeafInfo,
2405 message::Proposal,
2406 simple_certificate::{CertificatePair, QuorumCertificate2},
2407 traits::{EncodeBytes, signature_key::SignatureKey},
2408 utils::EpochTransitionIndicator,
2409 vid::avidm::{AvidMScheme, init_avidm_param},
2410 };
2411 use surf_disco::Client;
2412 use test_helpers::{
2413 TestNetwork, TestNetworkConfigBuilder, catchup_test_helper, state_signature_test_helper,
2414 status_test_helper, submit_test_helper,
2415 };
2416 use test_utils::reserve_tcp_port;
2417 use tide_disco::error::ServerError;
2418 use vbs::version::StaticVersion;
2419
2420 use super::{update::ApiEventConsumer, *};
2421 use crate::{
2422 network,
2423 persistence::no_storage::NoStorage,
2424 testing::{TestConfigBuilder, wait_for_decide_on_handle},
2425 };
2426
2427 #[rstest_reuse::template]
2428 #[rstest::rstest]
2429 #[case(PhantomData::<crate::api::sql::DataSource>)]
2430 #[case(PhantomData::<crate::api::fs::DataSource>)]
2431 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2432 pub fn testable_sequencer_data_source<D: TestableSequencerDataSource>(
2433 #[case] _d: PhantomData<D>,
2434 ) {
2435 }
2436
2437 #[rstest_reuse::apply(testable_sequencer_data_source)]
2438 pub(crate) async fn submit_test_with_query_module<D: TestableSequencerDataSource>(
2439 _d: PhantomData<D>,
2440 ) {
2441 let storage = D::create_storage().await;
2442 submit_test_helper(|opt| D::options(&storage, opt)).await
2443 }
2444
2445 #[rstest_reuse::apply(testable_sequencer_data_source)]
2446 pub(crate) async fn status_test_with_query_module<D: TestableSequencerDataSource>(
2447 _d: PhantomData<D>,
2448 ) {
2449 let storage = D::create_storage().await;
2450 status_test_helper(|opt| D::options(&storage, opt)).await
2451 }
2452
2453 #[rstest_reuse::apply(testable_sequencer_data_source)]
2454 pub(crate) async fn state_signature_test_with_query_module<D: TestableSequencerDataSource>(
2455 _d: PhantomData<D>,
2456 ) {
2457 let storage = D::create_storage().await;
2458 state_signature_test_helper(|opt| D::options(&storage, opt)).await
2459 }
2460
2461 #[rstest_reuse::apply(testable_sequencer_data_source)]
2462 pub(crate) async fn test_namespace_query<D: TestableSequencerDataSource>(_d: PhantomData<D>) {
2463 let ns_id = NamespaceId::from(42_u32);
2465 let txn = Transaction::new(ns_id, vec![1, 2, 3, 4]);
2466
2467 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
2469 let storage = D::create_storage().await;
2470 let network_config = TestConfigBuilder::default().build();
2471 let config = TestNetworkConfigBuilder::default()
2472 .api_config(D::options(&storage, Options::with_port(port)).submit(Default::default()))
2473 .network_config(network_config)
2474 .build();
2475 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
2476 let mut events = network.server.event_stream().await;
2477
2478 let client: Client<ServerError, StaticVersion<0, 1>> =
2480 Client::new(format!("http://localhost:{port}").parse().unwrap());
2481 client.connect(None).await;
2482
2483 let hash = client
2484 .post("submit/submit")
2485 .body_json(&txn)
2486 .unwrap()
2487 .send()
2488 .await
2489 .unwrap();
2490 assert_eq!(txn.commit(), hash);
2491
2492 let block_height = wait_for_decide_on_handle(&mut events, &txn).await.0 as usize;
2494 tracing::info!(block_height, "transaction sequenced");
2495
2496 let txn2 = Transaction::new(ns_id, vec![5, 6, 7, 8]);
2498 client
2499 .post::<Commitment<Transaction>>("submit/submit")
2500 .body_json(&txn2)
2501 .unwrap()
2502 .send()
2503 .await
2504 .unwrap();
2505 let block_height2 = wait_for_decide_on_handle(&mut events, &txn2).await.0 as usize;
2506 tracing::info!(block_height2, "transaction sequenced");
2507
2508 client
2510 .socket(&format!("availability/stream/blocks/{block_height2}"))
2511 .subscribe::<BlockQueryData<SeqTypes>>()
2512 .await
2513 .unwrap()
2514 .next()
2515 .await
2516 .unwrap()
2517 .unwrap();
2518
2519 let mut found_txn = false;
2520 let mut found_empty_block = false;
2521 for block_num in 0..=block_height {
2522 let header: Header = client
2523 .get(&format!("availability/header/{block_num}"))
2524 .send()
2525 .await
2526 .unwrap();
2527 let ns_query_res: NamespaceProofQueryData = client
2528 .get(&format!("availability/block/{block_num}/namespace/{ns_id}"))
2529 .send()
2530 .await
2531 .unwrap();
2532
2533 assert_eq!(
2535 ns_query_res,
2536 client
2537 .get(&format!(
2538 "availability/block/hash/{}/namespace/{ns_id}",
2539 header.commit()
2540 ))
2541 .send()
2542 .await
2543 .unwrap()
2544 );
2545 assert_eq!(
2546 ns_query_res,
2547 client
2548 .get(&format!(
2549 "availability/block/payload-hash/{}/namespace/{ns_id}",
2550 header.payload_commitment()
2551 ))
2552 .send()
2553 .await
2554 .unwrap()
2555 );
2556
2557 if let Some(ns_proof) = ns_query_res.proof {
2559 let vid_common: VidCommonQueryData<SeqTypes> = client
2560 .get(&format!("availability/vid/common/{block_num}"))
2561 .send()
2562 .await
2563 .unwrap();
2564 ns_proof
2565 .verify(
2566 header.ns_table(),
2567 &header.payload_commitment(),
2568 vid_common.common(),
2569 )
2570 .unwrap();
2571 } else {
2572 assert!(header.ns_table().find_ns_id(&ns_id).is_none());
2574 assert!(ns_query_res.transactions.is_empty());
2575 }
2576
2577 found_empty_block = found_empty_block || ns_query_res.transactions.is_empty();
2578
2579 for txn in ns_query_res.transactions {
2580 if txn.commit() == hash {
2581 found_txn = true;
2583 }
2584 }
2585 }
2586 assert!(found_txn);
2587 assert!(found_empty_block);
2588
2589 let ns_proofs: Vec<NamespaceProofQueryData> = client
2591 .get(&format!(
2592 "availability/block/{block_height}/{}/namespace/{ns_id}",
2593 block_height2 + 1
2594 ))
2595 .send()
2596 .await
2597 .unwrap();
2598 assert_eq!(ns_proofs.len(), block_height2 + 1 - block_height);
2599 assert_eq!(&ns_proofs[0].transactions, std::slice::from_ref(&txn));
2600 assert_eq!(
2601 &ns_proofs[ns_proofs.len() - 1].transactions,
2602 std::slice::from_ref(&txn2)
2603 );
2604 for proof in &ns_proofs[1..ns_proofs.len() - 1] {
2605 assert_eq!(proof.transactions, &[]);
2606 }
2607 }
2608
2609 #[rstest_reuse::apply(testable_sequencer_data_source)]
2610 pub(crate) async fn catchup_test_with_query_module<D: TestableSequencerDataSource>(
2611 _d: PhantomData<D>,
2612 ) {
2613 let storage = D::create_storage().await;
2614 catchup_test_helper(|opt| D::options(&storage, opt)).await
2615 }
2616
2617 #[rstest_reuse::apply(testable_sequencer_data_source)]
2618 pub async fn test_non_consecutive_decide_with_failing_event_consumer<D>(_d: PhantomData<D>)
2619 where
2620 D: TestableSequencerDataSource + Debug + 'static,
2621 {
2622 #[derive(Clone, Copy, Debug)]
2623 struct FailConsumer;
2624
2625 #[async_trait]
2626 impl EventConsumer for FailConsumer {
2627 async fn handle_event(&self, _: &Event<SeqTypes>) -> anyhow::Result<()> {
2628 bail!("mock error injection");
2629 }
2630 }
2631
2632 let (pubkey, privkey) = PubKey::generated_from_seed_indexed([0; 32], 1);
2633
2634 let storage = D::create_storage().await;
2635 let persistence = D::persistence_options(&storage).create().await.unwrap();
2636 let data_source: Arc<StorageState<network::Memory, NoStorage, _>> =
2637 Arc::new(StorageState::new(
2638 D::create(D::persistence_options(&storage), Default::default(), false)
2639 .await
2640 .unwrap(),
2641 ApiState::new(future::pending()),
2642 ));
2643
2644 let mut chain1 = vec![];
2646
2647 let genesis = Leaf2::genesis(
2648 &Default::default(),
2649 &NodeState::mock(),
2650 TEST_VERSIONS.test.base,
2651 )
2652 .await;
2653 let payload = genesis.block_payload().unwrap();
2654 let payload_bytes_arc = payload.encode();
2655
2656 let avidm_param = init_avidm_param(2).unwrap();
2657 let weights = vec![1u32; 2];
2658
2659 let ns_table = parse_ns_table(payload.byte_len().as_usize(), &payload.ns_table().encode());
2660 let (payload_commitment, shares) =
2661 AvidMScheme::ns_disperse(&avidm_param, &weights, &payload_bytes_arc, ns_table).unwrap();
2662
2663 let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
2664 proposal: QuorumProposal2::<SeqTypes> {
2665 block_header: genesis.block_header().clone(),
2666 view_number: ViewNumber::genesis(),
2667 justify_qc: QuorumCertificate2::genesis(
2668 &ValidatedState::default(),
2669 &NodeState::mock(),
2670 MOCK_SEQUENCER_VERSIONS,
2671 )
2672 .await,
2673 upgrade_certificate: None,
2674 view_change_evidence: None,
2675 next_drb_result: None,
2676 next_epoch_justify_qc: None,
2677 epoch: None,
2678 state_cert: None,
2679 },
2680 };
2681 let mut qc = QuorumCertificate2::genesis(
2682 &ValidatedState::default(),
2683 &NodeState::mock(),
2684 MOCK_SEQUENCER_VERSIONS,
2685 )
2686 .await;
2687
2688 let mut justify_qc = qc.clone();
2689 for i in 0..5 {
2690 *quorum_proposal.proposal.block_header.height_mut() = i;
2691 quorum_proposal.proposal.view_number = ViewNumber::new(i);
2692 quorum_proposal.proposal.justify_qc = justify_qc;
2693 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
2694 qc.view_number = leaf.view_number();
2695 qc.data.leaf_commit = Committable::commit(&leaf);
2696 justify_qc = qc.clone();
2697 chain1.push((leaf.clone(), CertificatePair::non_epoch_change(qc.clone())));
2698
2699 let quorum_proposal_signature =
2701 PubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
2702 .expect("Failed to sign quorum_proposal");
2703 persistence
2704 .append_quorum_proposal2(&Proposal {
2705 data: quorum_proposal.clone(),
2706 signature: quorum_proposal_signature,
2707 _pd: Default::default(),
2708 })
2709 .await
2710 .unwrap();
2711
2712 let share: VidDisperseShare<SeqTypes> = AvidMDisperseShare {
2714 view_number: leaf.view_number(),
2715 payload_commitment,
2716 share: shares[0].clone(),
2717 recipient_key: pubkey,
2718 epoch: Some(EpochNumber::new(0)),
2719 target_epoch: Some(EpochNumber::new(0)),
2720 common: avidm_param.clone(),
2721 }
2722 .into();
2723
2724 persistence
2725 .append_vid(&share.to_proposal(&privkey).unwrap())
2726 .await
2727 .unwrap();
2728
2729 let block_payload_signature =
2731 PubKey::sign(&privkey, &payload_bytes_arc).expect("Failed to sign block payload");
2732 let da_proposal_inner = DaProposal2::<SeqTypes> {
2733 encoded_transactions: payload_bytes_arc.clone(),
2734 metadata: payload.ns_table().clone(),
2735 view_number: leaf.view_number(),
2736 epoch: Some(EpochNumber::new(0)),
2737 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
2738 };
2739 let da_proposal = Proposal {
2740 data: da_proposal_inner,
2741 signature: block_payload_signature,
2742 _pd: Default::default(),
2743 };
2744 persistence
2745 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
2746 .await
2747 .unwrap();
2748 }
2749 let mut chain2 = chain1.split_off(2);
2751 chain2.remove(0);
2753
2754 let leaf_chain = chain1
2756 .iter()
2757 .map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
2758 .collect::<Vec<_>>();
2759 tracing::info!("decide with event handling failure");
2760 persistence
2761 .append_decided_leaves(
2762 ViewNumber::new(1),
2763 leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
2764 None,
2765 &FailConsumer,
2766 )
2767 .await
2768 .unwrap();
2769
2770 let consumer = ApiEventConsumer::from(data_source.clone());
2773 let leaf_chain = chain2
2774 .iter()
2775 .map(|(leaf, qc)| (leaf_info(leaf.clone()), qc.clone()))
2776 .collect::<Vec<_>>();
2777 tracing::info!("decide successfully");
2778 persistence
2779 .append_decided_leaves(
2780 ViewNumber::new(4),
2781 leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())),
2782 None,
2783 &consumer,
2784 )
2785 .await
2786 .unwrap();
2787
2788 for (leaf, cert) in chain1.iter().chain(&chain2) {
2791 tracing::info!(height = leaf.height(), "check archive");
2792 let qd = data_source.get_leaf(leaf.height() as usize).await.await;
2793 let stored_leaf: Leaf2 = qd.leaf().clone();
2794 let stored_qc = qd.qc().clone();
2795 assert_eq!(&stored_leaf, leaf);
2796 assert_eq!(&stored_qc, cert.qc());
2797
2798 data_source
2799 .get_block(leaf.height() as usize)
2800 .await
2801 .try_resolve()
2802 .ok()
2803 .unwrap();
2804 data_source
2805 .get_vid_common(leaf.height() as usize)
2806 .await
2807 .try_resolve()
2808 .ok()
2809 .unwrap();
2810
2811 assert!(
2813 persistence
2814 .load_da_proposal(leaf.view_number())
2815 .await
2816 .unwrap()
2817 .is_none()
2818 );
2819 assert!(
2820 persistence
2821 .load_vid_share(leaf.view_number())
2822 .await
2823 .unwrap()
2824 .is_none()
2825 );
2826 assert!(
2827 persistence
2828 .load_quorum_proposal(leaf.view_number())
2829 .await
2830 .is_err()
2831 );
2832 }
2833
2834 assert!(
2836 persistence
2837 .load_da_proposal(ViewNumber::new(2))
2838 .await
2839 .unwrap()
2840 .is_some()
2841 );
2842 assert!(
2843 persistence
2844 .load_vid_share(ViewNumber::new(2))
2845 .await
2846 .unwrap()
2847 .is_some()
2848 );
2849 persistence
2850 .load_quorum_proposal(ViewNumber::new(2))
2851 .await
2852 .unwrap();
2853 }
2854
2855 #[rstest_reuse::apply(testable_sequencer_data_source)]
2856 pub async fn test_decide_missing_data<D>(_d: PhantomData<D>)
2857 where
2858 D: TestableSequencerDataSource + Debug + 'static,
2859 {
2860 use ark_serialize::CanonicalDeserialize;
2861
2862 let storage = D::create_storage().await;
2863 let persistence = D::persistence_options(&storage).create().await.unwrap();
2864 let data_source: Arc<StorageState<network::Memory, NoStorage, _>> =
2865 Arc::new(StorageState::new(
2866 D::create(D::persistence_options(&storage), Default::default(), false)
2867 .await
2868 .unwrap(),
2869 ApiState::new(future::pending()),
2870 ));
2871 let consumer = ApiEventConsumer::from(data_source.clone());
2872
2873 let mut qc = QuorumCertificate2::genesis(
2874 &ValidatedState::default(),
2875 &NodeState::mock(),
2876 MOCK_SEQUENCER_VERSIONS,
2877 )
2878 .await;
2879 let leaf = Leaf2::genesis(
2880 &ValidatedState::default(),
2881 &NodeState::mock(),
2882 TEST_VERSIONS.test.base,
2883 )
2884 .await;
2885
2886 tracing::info!(?leaf, ?qc, "decide genesis leaf");
2890 persistence
2891 .append_decided_leaves(
2892 leaf.view_number(),
2893 [(
2894 &leaf_info(leaf.clone()),
2895 CertificatePair::non_epoch_change(qc.clone()),
2896 )],
2897 None,
2898 &consumer,
2899 )
2900 .await
2901 .unwrap();
2902
2903 let mut block_header = leaf.block_header().clone();
2907 *block_header.height_mut() += 1;
2908 *block_header.payload_commitment_mut() = VidCommitment::V1(
2909 CanonicalDeserialize::deserialize_uncompressed_unchecked([1u8; 32].as_slice()).unwrap(),
2910 );
2911 let qp = QuorumProposalWrapper {
2912 proposal: QuorumProposal2 {
2913 block_header,
2914 view_number: leaf.view_number() + 1,
2915 justify_qc: qc.clone(),
2916 upgrade_certificate: None,
2917 view_change_evidence: None,
2918 next_drb_result: None,
2919 next_epoch_justify_qc: None,
2920 epoch: None,
2921 state_cert: None,
2922 },
2923 };
2924
2925 let leaf = Leaf2::from_quorum_proposal(&qp);
2926 qc.view_number = leaf.view_number();
2927 qc.data.leaf_commit = Committable::commit(&leaf);
2928
2929 tracing::info!(?leaf, ?qc, "append leaf 1");
2931 persistence
2932 .append_decided_leaves(
2933 leaf.view_number(),
2934 [(
2935 &leaf_info(leaf.clone()),
2936 CertificatePair::non_epoch_change(qc),
2937 )],
2938 None,
2939 &consumer,
2940 )
2941 .await
2942 .unwrap();
2943
2944 assert_eq!(leaf, data_source.get_leaf(1).await.await.leaf().clone());
2946 assert!(data_source.get_vid_common(1).await.is_pending());
2947 assert!(data_source.get_block(1).await.is_pending());
2948 }
2949
2950 fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
2951 LeafInfo {
2952 leaf,
2953 vid_share: None,
2954 state: Default::default(),
2955 delta: None,
2956 state_cert: None,
2957 }
2958 }
2959}
2960
2961#[cfg(test)]
2962mod test {
2963 use std::{
2964 collections::{HashMap, HashSet},
2965 time::Duration,
2966 };
2967
2968 use ::light_client::{
2969 consensus::{
2970 header::HeaderProof,
2971 leaf::{LeafProof, LeafProofHint},
2972 payload::PayloadProof,
2973 },
2974 testing::{EpochChangeQuorum, LEGACY_VERSION},
2975 };
2976 use alloy::{
2977 eips::BlockId,
2978 network::EthereumWallet,
2979 primitives::{Address, U256},
2980 providers::ProviderBuilder,
2981 };
2982 use async_lock::Mutex;
2983 use committable::{Commitment, Committable};
2984 use espresso_contract_deployer::{
2985 Contract, Contracts, builder::DeployerArgsBuilder,
2986 network_config::light_client_genesis_from_stake_table, upgrade_stake_table_v2,
2987 };
2988 use espresso_types::{
2989 ADVZNamespaceProofQueryData, FeeAmount, Header, L1Client, L1ClientOptions,
2990 MOCK_SEQUENCER_VERSIONS, NamespaceId, NamespaceProofQueryData, NsProof,
2991 RegisteredValidatorMap, RewardDistributor, StakeTableState, StateCertQueryDataV1,
2992 StateCertQueryDataV2, ValidatedState, ValidatorLeaderCounts,
2993 config::PublicHotShotConfig,
2994 traits::{MembershipPersistence, NullEventConsumer, PersistenceOptions},
2995 v0_3::{COMMISSION_BASIS_POINTS, Fetcher, RewardAmount, RewardMerkleProofV1},
2996 v0_4::{RewardAccountV2, RewardMerkleProofV2},
2997 validators_from_l1_events,
2998 };
2999 use futures::{
3000 future::{self, join_all, try_join_all},
3001 stream::{StreamExt, TryStreamExt},
3002 try_join,
3003 };
3004 use hotshot::types::EventType;
3005 use hotshot_contract_adapter::{
3006 reward::RewardClaimInput,
3007 sol_types::{EspToken, StakeTableV2},
3008 stake_table::StakeTableContractVersion,
3009 };
3010 use hotshot_query_service::{
3011 availability::{
3012 BlockQueryData, BlockSummaryQueryData, LeafQueryData, TransactionQueryData,
3013 VidCommonQueryData,
3014 },
3015 data_source::{
3016 VersionedDataSource,
3017 sql::Config,
3018 storage::{SqlStorage, StorageConnectionType},
3019 },
3020 explorer::TransactionSummariesResponse,
3021 types::HeightIndexed,
3022 };
3023 use hotshot_types::{
3024 ValidatorConfig,
3025 data::EpochNumber,
3026 event::LeafInfo,
3027 traits::{block_contents::BlockHeader, election::Membership, metrics::NoMetrics},
3028 utils::epoch_from_block_number,
3029 };
3030 use jf_merkle_tree_compat::{
3031 MerkleTreeScheme,
3032 prelude::{MerkleProof, Sha3Node},
3033 };
3034 use pretty_assertions::assert_matches;
3035 use rand::seq::SliceRandom;
3036 use rstest::rstest;
3037 use staking_cli::{demo::DelegationConfig, fetch_commission, update_commission};
3038 use surf_disco::Client;
3039 use test_helpers::{
3040 TestNetwork, TestNetworkConfigBuilder, catchup_test_helper, state_signature_test_helper,
3041 status_test_helper, submit_test_helper,
3042 };
3043 use test_utils::reserve_tcp_port;
3044 use tide_disco::{
3045 Error, StatusCode, Url, app::AppHealth, error::ServerError, healthcheck::HealthStatus,
3046 };
3047 use tokio::time::sleep;
3048 use vbs::version::StaticVersion;
3049 use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_VERSION, FEE_VERSION, Upgrade, version};
3050
3051 use self::{
3052 data_source::testing::TestableSequencerDataSource, options::HotshotEvents,
3053 sql::DataSource as SqlDataSource,
3054 };
3055 use super::*;
3056
3057 async fn wait_until_block_height(
3058 client: &Client<ServerError, StaticVersion<0, 1>>,
3059 endpoint: &str,
3060 height: u64,
3061 ) {
3062 for _retry in 0.. {
3063 let bh = client
3064 .get::<u64>(endpoint)
3065 .send()
3066 .await
3067 .expect("block height not found");
3068
3069 if bh >= height {
3070 return;
3071 }
3072 sleep(Duration::from_secs(3)).await;
3073 }
3074 }
3075 use crate::{
3076 api::{
3077 options::Query,
3078 sql::{impl_testable_data_source::tmp_options, reconstruct_state},
3079 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3080 },
3081 catchup::{NullStateCatchup, StatePeers},
3082 persistence,
3083 persistence::no_storage,
3084 testing::{TestConfig, TestConfigBuilder, wait_for_decide_on_handle, wait_for_epochs},
3085 };
3086
3087 const POS_V3: Upgrade = Upgrade::trivial(version(0, 3));
3088 const POS_V4: Upgrade = Upgrade::trivial(version(0, 4));
3089
3090 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3091 async fn test_healthcheck() {
3092 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3093 let url = format!("http://localhost:{port}").parse().unwrap();
3094 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
3095 let options = Options::with_port(port);
3096 let network_config = TestConfigBuilder::default().build();
3097 let config = TestNetworkConfigBuilder::<5, _, NullStateCatchup>::default()
3098 .api_config(options)
3099 .network_config(network_config)
3100 .build();
3101 let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3102
3103 client.connect(None).await;
3104 let health = client.get::<AppHealth>("healthcheck").send().await.unwrap();
3105 assert_eq!(health.status, HealthStatus::Available);
3106 }
3107
3108 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3109 async fn status_test_without_query_module() {
3110 status_test_helper(|opt| opt).await
3111 }
3112
3113 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3114 async fn submit_test_without_query_module() {
3115 submit_test_helper(|opt| opt).await
3116 }
3117
3118 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3119 async fn state_signature_test_without_query_module() {
3120 state_signature_test_helper(|opt| opt).await
3121 }
3122
3123 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3124 async fn catchup_test_without_query_module() {
3125 catchup_test_helper(|opt| opt).await
3126 }
3127
3128 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3129 async fn test_leaf_only_data_source() {
3130 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3131
3132 let storage = SqlDataSource::create_storage().await;
3133 let options =
3134 SqlDataSource::leaf_only_ds_options(&storage, Options::with_port(port)).unwrap();
3135
3136 let network_config = TestConfigBuilder::default().build();
3137 let config = TestNetworkConfigBuilder::default()
3138 .api_config(options)
3139 .network_config(network_config)
3140 .build();
3141 let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3142 let url = format!("http://localhost:{port}").parse().unwrap();
3143 let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
3144
3145 tracing::info!("waiting for blocks");
3146 client.connect(Some(Duration::from_secs(15))).await;
3147 let account = TestConfig::<5>::builder_key().fee_account();
3150
3151 let _headers = client
3152 .socket("availability/stream/headers/0")
3153 .subscribe::<Header>()
3154 .await
3155 .unwrap()
3156 .take(10)
3157 .try_collect::<Vec<_>>()
3158 .await
3159 .unwrap();
3160
3161 for i in 1..5 {
3162 let leaf = client
3163 .get::<LeafQueryData<SeqTypes>>(&format!("availability/leaf/{i}"))
3164 .send()
3165 .await
3166 .unwrap();
3167
3168 assert_eq!(leaf.height(), i);
3169
3170 let header = client
3171 .get::<Header>(&format!("availability/header/{i}"))
3172 .send()
3173 .await
3174 .unwrap();
3175
3176 assert_eq!(header.height(), i);
3177
3178 let vid = client
3179 .get::<VidCommonQueryData<SeqTypes>>(&format!("availability/vid/common/{i}"))
3180 .send()
3181 .await
3182 .unwrap();
3183
3184 assert_eq!(vid.height(), i);
3185
3186 client
3187 .get::<MerkleProof<Commitment<Header>, u64, Sha3Node, 3>>(&format!(
3188 "block-state/{i}/{}",
3189 i - 1
3190 ))
3191 .send()
3192 .await
3193 .unwrap();
3194
3195 client
3196 .get::<MerkleProof<FeeAmount, FeeAccount, Sha3Node, 256>>(&format!(
3197 "fee-state/{}/{}",
3198 i + 1,
3199 account
3200 ))
3201 .send()
3202 .await
3203 .unwrap();
3204 }
3205
3206 client
3209 .get::<BlockQueryData<SeqTypes>>("availability/block/1")
3210 .send()
3211 .await
3212 .unwrap_err();
3213 }
3214
3215 async fn run_catchup_test(url_suffix: &str) {
3216 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3218 const NUM_NODES: usize = 5;
3219
3220 let url: url::Url = format!("http://localhost:{port}{url_suffix}")
3221 .parse()
3222 .unwrap();
3223
3224 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3225 .api_config(Options::with_port(port))
3226 .network_config(TestConfigBuilder::default().build())
3227 .catchups(std::array::from_fn(|_| {
3228 StatePeers::<StaticVersion<0, 1>>::from_urls(
3229 vec![url.clone()],
3230 Default::default(),
3231 Duration::from_secs(2),
3232 &NoMetrics,
3233 )
3234 }))
3235 .build();
3236 let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3237
3238 let mut events = network.peers[0].event_stream().await;
3240 loop {
3241 let event = events.next().await.unwrap();
3242 let EventType::Decide { leaf_chain, .. } = event.event else {
3243 continue;
3244 };
3245 if leaf_chain[0].leaf.height() > 0 {
3246 break;
3247 }
3248 }
3249
3250 tracing::info!("shutting down node");
3255 network.peers.remove(0);
3256
3257 network
3259 .server
3260 .event_stream()
3261 .await
3262 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3263 .take(3)
3264 .collect::<Vec<_>>()
3265 .await;
3266
3267 tracing::info!("restarting node");
3268 let node = network
3269 .cfg
3270 .init_node(
3271 1,
3272 ValidatedState::default(),
3273 no_storage::Options,
3274 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
3275 vec![url],
3276 Default::default(),
3277 Duration::from_secs(2),
3278 &NoMetrics,
3279 )),
3280 None,
3281 &NoMetrics,
3282 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3283 NullEventConsumer,
3284 MOCK_SEQUENCER_VERSIONS,
3285 Default::default(),
3286 )
3287 .await;
3288 let mut events = node.event_stream().await;
3289
3290 let mut proposers = [false; NUM_NODES];
3293 loop {
3294 let event = events.next().await.unwrap();
3295 let EventType::Decide { leaf_chain, .. } = event.event else {
3296 continue;
3297 };
3298 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
3299 let height = leaf.height();
3300 let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
3301 if height == 0 {
3302 continue;
3303 }
3304
3305 tracing::info!(
3306 "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
3307 );
3308 proposers[leaf_builder] = true;
3309 }
3310
3311 if proposers.iter().all(|has_proposed| *has_proposed) {
3312 break;
3313 }
3314 }
3315 }
3316
3317 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3318 async fn test_catchup() {
3319 run_catchup_test("").await;
3320 }
3321
3322 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3323 async fn test_catchup_v0() {
3324 run_catchup_test("/v0").await;
3325 }
3326
3327 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3328 async fn test_catchup_v1() {
3329 run_catchup_test("/v1").await;
3330 }
3331
3332 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3333 async fn test_catchup_no_state_peers() {
3334 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3336 const NUM_NODES: usize = 5;
3337 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3338 .api_config(Options::with_port(port))
3339 .network_config(TestConfigBuilder::default().build())
3340 .build();
3341 let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3342
3343 let mut events = network.peers[0].event_stream().await;
3345 loop {
3346 let event = events.next().await.unwrap();
3347 let EventType::Decide { leaf_chain, .. } = event.event else {
3348 continue;
3349 };
3350 if leaf_chain[0].leaf.height() > 0 {
3351 break;
3352 }
3353 }
3354
3355 tracing::info!("shutting down node");
3360 network.peers.remove(0);
3361
3362 network
3364 .server
3365 .event_stream()
3366 .await
3367 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3368 .take(3)
3369 .collect::<Vec<_>>()
3370 .await;
3371
3372 tracing::info!("restarting node");
3373 let node = network
3374 .cfg
3375 .init_node(
3376 1,
3377 ValidatedState::default(),
3378 no_storage::Options,
3379 None::<NullStateCatchup>,
3380 None,
3381 &NoMetrics,
3382 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3383 NullEventConsumer,
3384 MOCK_SEQUENCER_VERSIONS,
3385 Default::default(),
3386 )
3387 .await;
3388 let mut events = node.event_stream().await;
3389
3390 let mut proposers = [false; NUM_NODES];
3393 loop {
3394 let event = events.next().await.unwrap();
3395 let EventType::Decide { leaf_chain, .. } = event.event else {
3396 continue;
3397 };
3398 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
3399 let height = leaf.height();
3400 let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
3401 if height == 0 {
3402 continue;
3403 }
3404
3405 tracing::info!(
3406 "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
3407 );
3408 proposers[leaf_builder] = true;
3409 }
3410
3411 if proposers.iter().all(|has_proposed| *has_proposed) {
3412 break;
3413 }
3414 }
3415 }
3416
3417 #[ignore]
3418 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3419 async fn test_catchup_epochs_no_state_peers() {
3420 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3422 const EPOCH_HEIGHT: u64 = 5;
3423 let network_config = TestConfigBuilder::default()
3424 .epoch_height(EPOCH_HEIGHT)
3425 .build();
3426 const NUM_NODES: usize = 5;
3427 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3428 .api_config(Options::with_port(port))
3429 .network_config(network_config)
3430 .build();
3431 let mut network = TestNetwork::new(config, Upgrade::trivial(EPOCH_VERSION)).await;
3432
3433 let mut events = network.peers[0].event_stream().await;
3435 loop {
3436 let event = events.next().await.unwrap();
3437 let EventType::Decide { leaf_chain, .. } = event.event else {
3438 continue;
3439 };
3440 tracing::error!("got decide height {}", leaf_chain[0].leaf.height());
3441
3442 if leaf_chain[0].leaf.height() > EPOCH_HEIGHT * 3 {
3443 tracing::error!("decided past one epoch");
3444 break;
3445 }
3446 }
3447
3448 tracing::info!("shutting down node");
3453 network.peers.remove(0);
3454
3455 network
3457 .server
3458 .event_stream()
3459 .await
3460 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3461 .take(3)
3462 .collect::<Vec<_>>()
3463 .await;
3464
3465 tracing::error!("restarting node");
3466 let node = network
3467 .cfg
3468 .init_node(
3469 1,
3470 ValidatedState::default(),
3471 no_storage::Options,
3472 None::<NullStateCatchup>,
3473 None,
3474 &NoMetrics,
3475 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
3476 NullEventConsumer,
3477 MOCK_SEQUENCER_VERSIONS,
3478 Default::default(),
3479 )
3480 .await;
3481 let mut events = node.event_stream().await;
3482
3483 let mut proposers = [false; NUM_NODES];
3486 loop {
3487 let event = events.next().await.unwrap();
3488 let EventType::Decide { leaf_chain, .. } = event.event else {
3489 continue;
3490 };
3491 for LeafInfo { leaf, .. } in leaf_chain.iter().rev() {
3492 let height = leaf.height();
3493 let leaf_builder = (leaf.view_number().u64() as usize) % NUM_NODES;
3494 if height == 0 {
3495 continue;
3496 }
3497
3498 tracing::info!(
3499 "waiting for blocks from {proposers:?}, block {height} is from {leaf_builder}",
3500 );
3501 proposers[leaf_builder] = true;
3502 }
3503
3504 if proposers.iter().all(|has_proposed| *has_proposed) {
3505 break;
3506 }
3507 }
3508 }
3509
3510 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3511 async fn test_chain_config_from_instance() {
3512 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3518
3519 let chain_config: ChainConfig = ChainConfig::default();
3520
3521 let state = ValidatedState {
3522 chain_config: chain_config.commit().into(),
3523 ..Default::default()
3524 };
3525
3526 let states = std::array::from_fn(|_| state.clone());
3527
3528 let config = TestNetworkConfigBuilder::default()
3529 .api_config(Options::with_port(port))
3530 .states(states)
3531 .catchups(std::array::from_fn(|_| {
3532 StatePeers::<StaticVersion<0, 1>>::from_urls(
3533 vec![format!("http://localhost:{port}").parse().unwrap()],
3534 Default::default(),
3535 Duration::from_secs(2),
3536 &NoMetrics,
3537 )
3538 }))
3539 .network_config(TestConfigBuilder::default().build())
3540 .build();
3541
3542 let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3543
3544 network
3546 .server
3547 .event_stream()
3548 .await
3549 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3550 .take(3)
3551 .collect::<Vec<_>>()
3552 .await;
3553
3554 for peer in &network.peers {
3555 let state = peer.consensus().read().await.decided_state().await;
3556
3557 assert_eq!(state.chain_config.resolve().unwrap(), chain_config)
3558 }
3559
3560 network.server.shut_down().await;
3561 drop(network);
3562 }
3563
3564 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3565 async fn test_chain_config_catchup() {
3566 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3572
3573 let cf = ChainConfig {
3574 max_block_size: 300.into(),
3575 base_fee: 1.into(),
3576 ..Default::default()
3577 };
3578
3579 let state1 = ValidatedState {
3581 chain_config: cf.commit().into(),
3582 ..Default::default()
3583 };
3584
3585 let state2 = ValidatedState {
3587 chain_config: cf.into(),
3588 ..Default::default()
3589 };
3590
3591 let mut states = std::array::from_fn(|_| state1.clone());
3592 states[0] = state2;
3595
3596 const NUM_NODES: usize = 5;
3597 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3598 .api_config(Options::from(options::Http {
3599 port,
3600 max_connections: None,
3601 }))
3602 .states(states)
3603 .catchups(std::array::from_fn(|_| {
3604 StatePeers::<StaticVersion<0, 1>>::from_urls(
3605 vec![format!("http://localhost:{port}").parse().unwrap()],
3606 Default::default(),
3607 Duration::from_secs(2),
3608 &NoMetrics,
3609 )
3610 }))
3611 .network_config(TestConfigBuilder::default().build())
3612 .build();
3613
3614 let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3615
3616 network
3618 .server
3619 .event_stream()
3620 .await
3621 .filter(|event| future::ready(matches!(event.event, EventType::Decide { .. })))
3622 .take(3)
3623 .collect::<Vec<_>>()
3624 .await;
3625
3626 for peer in &network.peers {
3627 let state = peer.consensus().read().await.decided_state().await;
3628
3629 assert_eq!(state.chain_config.resolve().unwrap(), cf)
3630 }
3631
3632 network.server.shut_down().await;
3633 drop(network);
3634 }
3635
3636 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3637 async fn test_pos_upgrade_view_based() {
3638 test_upgrade_helper(Upgrade::new(FEE_VERSION, EPOCH_VERSION)).await;
3639 }
3640
3641 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3642 async fn test_epoch_reward_upgrade() {
3643 test_upgrade_helper(Upgrade::new(
3644 versions::DRB_AND_HEADER_UPGRADE_VERSION,
3645 versions::EPOCH_REWARD_VERSION,
3646 ))
3647 .await;
3648 }
3649
3650 async fn test_upgrade_helper(upgrade: Upgrade) {
3651 let wait_extra_views = 10;
3654 const NUM_NODES: usize = 5;
3656 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3657 let epoch_start_block = if upgrade.base >= versions::EPOCH_VERSION {
3658 0
3659 } else {
3660 321
3661 };
3662
3663 let test_config = TestConfigBuilder::default()
3664 .epoch_height(200)
3665 .epoch_start_block(epoch_start_block)
3666 .set_upgrades(upgrade.target)
3667 .await
3668 .build();
3669
3670 let chain_config_genesis = ValidatedState::default().chain_config.resolve().unwrap();
3671 let chain_config_upgrade = test_config.get_upgrade_map().chain_config(upgrade.target);
3672 assert_ne!(chain_config_genesis, chain_config_upgrade);
3673 tracing::debug!(?chain_config_genesis, ?chain_config_upgrade);
3674
3675 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3676 let persistence: [_; NUM_NODES] = storage
3677 .iter()
3678 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3679 .collect::<Vec<_>>()
3680 .try_into()
3681 .unwrap();
3682
3683 let mut builder = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
3684 .api_config(SqlDataSource::options(
3685 &storage[0],
3686 Options::with_port(port),
3687 ))
3688 .persistences(persistence)
3689 .catchups(std::array::from_fn(|_| {
3690 StatePeers::<SequencerApiVersion>::from_urls(
3691 vec![format!("http://localhost:{port}").parse().unwrap()],
3692 Default::default(),
3693 Duration::from_secs(2),
3694 &NoMetrics,
3695 )
3696 }))
3697 .network_config(test_config);
3698
3699 if upgrade.base >= versions::EPOCH_VERSION {
3702 let state = ValidatedState {
3703 chain_config: chain_config_upgrade.into(),
3704 ..Default::default()
3705 };
3706 builder = builder.states(std::array::from_fn(|_| state.clone()));
3707 }
3708
3709 let config = builder.build();
3710
3711 let mut network = TestNetwork::new(config, upgrade).await;
3712 let mut events = network.server.event_stream().await;
3713
3714 let target = upgrade.target;
3715
3716 let upgrade = loop {
3720 let event = events.next().await.unwrap();
3721 match event.event {
3722 EventType::UpgradeProposal { proposal, .. } => {
3723 tracing::info!(?proposal, "proposal");
3724 let upgrade = proposal.data.upgrade_proposal;
3725 let new_version = upgrade.new_version;
3726 tracing::info!(?new_version, "upgrade proposal new version");
3727 assert_eq!(new_version, target);
3728 break upgrade;
3729 },
3730 _ => continue,
3731 }
3732 };
3733
3734 let wanted_view = upgrade.new_version_first_view + wait_extra_views;
3735 loop {
3737 let event = events.next().await.unwrap();
3738 let view_number = event.view_number;
3739
3740 tracing::debug!(?view_number, ?upgrade.new_version_first_view, "upgrade_new_view");
3741 if view_number > wanted_view {
3742 tracing::info!(?view_number, ?upgrade.new_version_first_view, "passed upgrade view");
3743 let states = join_all(
3744 network
3745 .peers
3746 .iter()
3747 .map(|peer| async { peer.consensus().read().await.decided_state().await }),
3748 )
3749 .await;
3750 let leaves = join_all(
3751 network
3752 .peers
3753 .iter()
3754 .map(|peer| async { peer.consensus().read().await.decided_leaf().await }),
3755 )
3756 .await;
3757 let configs: Vec<ChainConfig> = states
3758 .iter()
3759 .map(|state| state.chain_config.resolve().unwrap())
3760 .collect();
3761
3762 tracing::info!(?leaves, ?configs, "post upgrade state");
3763 for config in configs {
3764 assert_eq!(config, chain_config_upgrade);
3765 }
3766 for leaf in leaves {
3767 assert_eq!(leaf.block_header().version(), target);
3768 }
3769 break;
3770 }
3771 sleep(Duration::from_millis(200)).await;
3772 }
3773
3774 network.server.shut_down().await;
3775 }
3776
3777 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3778 pub(crate) async fn test_restart() {
3779 const NUM_NODES: usize = 5;
3780 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
3782 let persistence: [_; NUM_NODES] = storage
3783 .iter()
3784 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
3785 .collect::<Vec<_>>()
3786 .try_into()
3787 .unwrap();
3788 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3789 let config = TestNetworkConfigBuilder::default()
3790 .api_config(SqlDataSource::options(
3791 &storage[0],
3792 Options::with_port(port),
3793 ))
3794 .persistences(persistence.clone())
3795 .network_config(TestConfigBuilder::default().build())
3796 .build();
3797 let mut network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3798
3799 let client: Client<ServerError, SequencerApiVersion> =
3801 Client::new(format!("http://localhost:{port}").parse().unwrap());
3802 client.connect(None).await;
3803 tracing::info!(port, "server running");
3804
3805 client
3807 .socket("availability/stream/blocks/0")
3808 .subscribe::<BlockQueryData<SeqTypes>>()
3809 .await
3810 .unwrap()
3811 .take(3)
3812 .collect::<Vec<_>>()
3813 .await;
3814
3815 tracing::info!("shutting down nodes");
3817 network.stop_consensus().await;
3818
3819 let height = client
3821 .get::<usize>("status/block-height")
3822 .send()
3823 .await
3824 .unwrap();
3825 tracing::info!("decided {height} blocks before shutting down");
3826
3827 let chain: Vec<LeafQueryData<SeqTypes>> = client
3829 .socket("availability/stream/leaves/0")
3830 .subscribe()
3831 .await
3832 .unwrap()
3833 .take(height)
3834 .try_collect()
3835 .await
3836 .unwrap();
3837 let decided_view = chain.last().unwrap().leaf().view_number();
3838
3839 let state = network.server.decided_state().await;
3842 tracing::info!(?decided_view, ?state, "consensus state");
3843
3844 drop(network);
3846
3847 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3849
3850 let config = TestNetworkConfigBuilder::default()
3851 .api_config(SqlDataSource::options(
3852 &storage[0],
3853 Options::with_port(port),
3854 ))
3855 .persistences(persistence)
3856 .catchups(std::array::from_fn(|_| {
3857 StatePeers::<StaticVersion<0, 1>>::from_urls(
3861 vec![format!("http://localhost:{port}").parse().unwrap()],
3862 Default::default(),
3863 Duration::from_secs(2),
3864 &NoMetrics,
3865 )
3866 }))
3867 .network_config(TestConfigBuilder::default().build())
3868 .build();
3869 let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3870 let client: Client<ServerError, StaticVersion<0, 1>> =
3871 Client::new(format!("http://localhost:{port}").parse().unwrap());
3872 client.connect(None).await;
3873 tracing::info!(port, "server running");
3874
3875 tracing::info!("waiting for decide, height {height}");
3877 let new_leaf: LeafQueryData<SeqTypes> = client
3878 .socket(&format!("availability/stream/leaves/{height}"))
3879 .subscribe()
3880 .await
3881 .unwrap()
3882 .next()
3883 .await
3884 .unwrap()
3885 .unwrap();
3886 assert_eq!(new_leaf.height(), height as u64);
3887 assert_eq!(
3888 new_leaf.leaf().parent_commitment(),
3889 chain[height - 1].hash()
3890 );
3891
3892 let new_chain: Vec<LeafQueryData<SeqTypes>> = client
3894 .socket("availability/stream/leaves/0")
3895 .subscribe()
3896 .await
3897 .unwrap()
3898 .take(height)
3899 .try_collect()
3900 .await
3901 .unwrap();
3902 assert_eq!(chain, new_chain);
3903 }
3904
3905 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3906 async fn test_fetch_config() {
3907 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
3908 let url: surf_disco::Url = format!("http://localhost:{port}").parse().unwrap();
3909 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url.clone());
3910
3911 let options = Options::with_port(port).config(Default::default());
3912 let network_config = TestConfigBuilder::default().build();
3913 let config = TestNetworkConfigBuilder::default()
3914 .api_config(options)
3915 .network_config(network_config)
3916 .build();
3917 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3918 client.connect(None).await;
3919
3920 let peers = StatePeers::<StaticVersion<0, 1>>::from_urls(
3923 vec!["https://notarealnode.network".parse().unwrap(), url],
3924 Default::default(),
3925 Duration::from_secs(2),
3926 &NoMetrics,
3927 );
3928
3929 let validator =
3931 ValidatorConfig::generated_from_seed_indexed([0; 32], 1, U256::from(1), false);
3932 let config = peers.fetch_config(validator.clone()).await.unwrap();
3933
3934 assert_eq!(config.node_index, 1);
3936
3937 pretty_assertions::assert_eq!(
3940 serde_json::to_value(PublicHotShotConfig::from(config.config)).unwrap(),
3941 serde_json::to_value(PublicHotShotConfig::from(
3942 network.cfg.hotshot_config().clone()
3943 ))
3944 .unwrap()
3945 );
3946 }
3947
3948 async fn run_hotshot_event_streaming_test(url_suffix: &str) {
3949 let query_service_port =
3950 reserve_tcp_port().expect("OS should have ephemeral ports available");
3951
3952 let url = format!("http://localhost:{query_service_port}{url_suffix}")
3953 .parse()
3954 .unwrap();
3955
3956 let client: Client<ServerError, SequencerApiVersion> = Client::new(url);
3957
3958 let options = Options::with_port(query_service_port).hotshot_events(HotshotEvents);
3959
3960 let network_config = TestConfigBuilder::default().build();
3961 let config = TestNetworkConfigBuilder::default()
3962 .api_config(options)
3963 .network_config(network_config)
3964 .build();
3965 let _network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
3966
3967 let mut subscribed_events = client
3968 .socket("hotshot-events/events")
3969 .subscribe::<Event<SeqTypes>>()
3970 .await
3971 .unwrap();
3972
3973 let total_count = 5;
3974 let mut receive_count = 0;
3976 loop {
3977 let event = subscribed_events.next().await.unwrap();
3978 tracing::info!("Received event in hotshot event streaming Client 1: {event:?}");
3979 receive_count += 1;
3980 if receive_count > total_count {
3981 tracing::info!("Client Received at least desired events, exiting loop");
3982 break;
3983 }
3984 }
3985 assert_eq!(receive_count, total_count + 1);
3986 }
3987
3988 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3989 async fn test_hotshot_event_streaming_v0() {
3990 run_hotshot_event_streaming_test("/v0").await;
3991 }
3992
3993 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3994 async fn test_hotshot_event_streaming_v1() {
3995 run_hotshot_event_streaming_test("/v1").await;
3996 }
3997
3998 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3999 async fn test_hotshot_event_streaming() {
4000 run_hotshot_event_streaming_test("").await;
4001 }
4002
4003 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4006 async fn test_hotshot_event_streaming_epoch_progression() {
4007 let epoch_height = 35;
4008 let wanted_epochs = 4;
4009
4010 let network_config = TestConfigBuilder::default()
4011 .epoch_height(epoch_height)
4012 .build();
4013
4014 let query_service_port =
4015 reserve_tcp_port().expect("OS should have ephemeral ports available");
4016
4017 let hotshot_url = format!("http://localhost:{query_service_port}")
4018 .parse()
4019 .unwrap();
4020
4021 let client: Client<ServerError, SequencerApiVersion> = Client::new(hotshot_url);
4022 let options = Options::with_port(query_service_port).hotshot_events(HotshotEvents);
4023
4024 let config = TestNetworkConfigBuilder::default()
4025 .api_config(options)
4026 .network_config(network_config.clone())
4027 .pos_hook(
4028 DelegationConfig::VariableAmounts,
4029 Default::default(),
4030 POS_V3,
4031 )
4032 .await
4033 .expect("Pos Deployment")
4034 .build();
4035
4036 let _network = TestNetwork::new(config, POS_V3).await;
4037
4038 let mut subscribed_events = client
4039 .socket("hotshot-events/events")
4040 .subscribe::<Event<SeqTypes>>()
4041 .await
4042 .unwrap();
4043
4044 let wanted_views = epoch_height * wanted_epochs;
4045
4046 let mut views = HashSet::new();
4047 let mut epochs = HashSet::new();
4048 for _ in 0..=600 {
4049 let event = subscribed_events.next().await.unwrap();
4050 let event = event.unwrap();
4051 let view_number = event.view_number;
4052 views.insert(view_number.u64());
4053
4054 if let hotshot::types::EventType::Decide { committing_qc, .. } = event.event {
4055 assert!(committing_qc.epoch().is_some(), "epochs are live");
4056 assert!(committing_qc.block_number().is_some());
4057
4058 let epoch = committing_qc.epoch().unwrap().u64();
4059 epochs.insert(epoch);
4060
4061 tracing::debug!(
4062 "Got decide: epoch: {:?}, block: {:?} ",
4063 epoch,
4064 committing_qc.block_number()
4065 );
4066
4067 let expected_epoch =
4068 epoch_from_block_number(committing_qc.block_number().unwrap(), epoch_height);
4069 tracing::debug!("expected epoch: {expected_epoch}, qc epoch: {epoch}");
4070
4071 assert_eq!(expected_epoch, epoch);
4072 }
4073 if views.contains(&wanted_views) {
4074 tracing::info!("Client Received at least desired views, exiting loop");
4075 break;
4076 }
4077 }
4078
4079 assert!(views.contains(&wanted_views), "Views are not progressing");
4081 assert!(
4082 epochs.contains(&wanted_epochs),
4083 "Epochs are not progressing"
4084 );
4085 }
4086
4087 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4088 async fn test_pos_rewards_basic() -> anyhow::Result<()> {
4089 let epoch_height = 20;
4096
4097 let network_config = TestConfigBuilder::default()
4098 .epoch_height(epoch_height)
4099 .build();
4100
4101 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4102
4103 const NUM_NODES: usize = 1;
4104 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4106 let persistence: [_; NUM_NODES] = storage
4107 .iter()
4108 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4109 .collect::<Vec<_>>()
4110 .try_into()
4111 .unwrap();
4112
4113 let config = TestNetworkConfigBuilder::with_num_nodes()
4114 .api_config(SqlDataSource::options(
4115 &storage[0],
4116 Options::with_port(api_port),
4117 ))
4118 .network_config(network_config.clone())
4119 .persistences(persistence.clone())
4120 .catchups(std::array::from_fn(|_| {
4121 StatePeers::<StaticVersion<0, 1>>::from_urls(
4122 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4123 Default::default(),
4124 Duration::from_secs(2),
4125 &NoMetrics,
4126 )
4127 }))
4128 .pos_hook(
4129 DelegationConfig::VariableAmounts,
4130 Default::default(),
4131 POS_V4,
4132 )
4133 .await
4134 .unwrap()
4135 .build();
4136
4137 let network = TestNetwork::new(config, POS_V4).await;
4138 let client: Client<ServerError, SequencerApiVersion> =
4139 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4140
4141 let _blocks = client
4146 .socket("availability/stream/blocks/0")
4147 .subscribe::<BlockQueryData<SeqTypes>>()
4148 .await
4149 .unwrap()
4150 .take(65)
4151 .try_collect::<Vec<_>>()
4152 .await
4153 .unwrap();
4154
4155 let staking_priv_keys = network_config.staking_priv_keys();
4156 let account = staking_priv_keys[0].0.clone();
4157 let address = account.address();
4158
4159 let block_height = 60;
4160
4161 let node_state = network.server.node_state();
4162 let membership = node_state.coordinator.membership().read().await;
4163 let expected_amount = U256::from(20)
4164 * (membership
4165 .epoch_block_reward(3.into())
4166 .expect("block reward is not None"))
4167 .0;
4168 drop(membership);
4169
4170 let amount = client
4172 .get::<Option<RewardAmount>>(&format!(
4173 "reward-state/reward-balance/{block_height}/{address}"
4174 ))
4175 .send()
4176 .await
4177 .unwrap()
4178 .unwrap();
4179
4180 tracing::info!("amount={amount:?}");
4181
4182 assert_eq!(amount.0, expected_amount, "reward amount don't match");
4183
4184 Ok(())
4185 }
4186
4187 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4188 async fn test_cumulative_pos_rewards() -> anyhow::Result<()> {
4189 let epoch_height = 20;
4195
4196 let network_config = TestConfigBuilder::default()
4197 .epoch_height(epoch_height)
4198 .build();
4199
4200 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4201
4202 const NUM_NODES: usize = 5;
4203 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4205 let persistence: [_; NUM_NODES] = storage
4206 .iter()
4207 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4208 .collect::<Vec<_>>()
4209 .try_into()
4210 .unwrap();
4211
4212 let config = TestNetworkConfigBuilder::with_num_nodes()
4213 .api_config(SqlDataSource::options(
4214 &storage[0],
4215 Options::with_port(api_port),
4216 ))
4217 .network_config(network_config)
4218 .persistences(persistence.clone())
4219 .catchups(std::array::from_fn(|_| {
4220 StatePeers::<StaticVersion<0, 1>>::from_urls(
4221 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4222 Default::default(),
4223 Duration::from_secs(2),
4224 &NoMetrics,
4225 )
4226 }))
4227 .pos_hook(
4228 DelegationConfig::MultipleDelegators,
4229 Default::default(),
4230 POS_V4,
4231 )
4232 .await
4233 .unwrap()
4234 .build();
4235
4236 let network = TestNetwork::new(config, POS_V4).await;
4237 let node_state = network.server.node_state();
4238 let client: Client<ServerError, SequencerApiVersion> =
4239 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4240
4241 let _blocks = client
4243 .socket("availability/stream/blocks/0")
4244 .subscribe::<BlockQueryData<SeqTypes>>()
4245 .await
4246 .unwrap()
4247 .take(75)
4248 .try_collect::<Vec<_>>()
4249 .await
4250 .unwrap();
4251
4252 let validators = client
4256 .get::<AuthenticatedValidatorMap>("node/validators/3")
4257 .send()
4258 .await
4259 .expect("failed to get validator");
4260
4261 let mut addresses = HashSet::new();
4265 for v in validators.values() {
4266 addresses.insert(v.account);
4267 addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
4268 }
4269 let validators = client
4271 .get::<AuthenticatedValidatorMap>("node/validators/4")
4272 .send()
4273 .await
4274 .expect("failed to get validator");
4275 for v in validators.values() {
4276 addresses.insert(v.account);
4277 addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
4278 }
4279
4280 let mut prev_cumulative_amount = U256::ZERO;
4281 for block in 41..=67 {
4283 let membership = node_state.coordinator.membership().read().await;
4284 let block_reward = membership
4285 .epoch_block_reward(epoch_from_block_number(block, epoch_height).into())
4286 .expect("block reward is not None");
4287 drop(membership);
4288
4289 let mut cumulative_amount = U256::ZERO;
4290 for address in addresses.clone() {
4291 let amount = client
4292 .get::<Option<RewardAmount>>(&format!(
4293 "reward-state/reward-balance/{block}/{address}"
4294 ))
4295 .send()
4296 .await
4297 .ok()
4298 .flatten();
4299
4300 if let Some(amount) = amount {
4301 tracing::info!("address={address}, amount={amount}");
4302 cumulative_amount += amount.0;
4303 };
4304 }
4305
4306 assert_eq!(cumulative_amount - prev_cumulative_amount, block_reward.0);
4308 tracing::info!("cumulative_amount is correct for block={block}");
4309 prev_cumulative_amount = cumulative_amount;
4310 }
4311
4312 Ok(())
4313 }
4314
4315 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4316 async fn test_stake_table_duplicate_events_from_contract() -> anyhow::Result<()> {
4317 let epoch_height = 20;
4321
4322 let network_config = TestConfigBuilder::default()
4323 .epoch_height(epoch_height)
4324 .build();
4325
4326 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4327
4328 const NUM_NODES: usize = 5;
4329 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4331 let persistence: [_; NUM_NODES] = storage
4332 .iter()
4333 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4334 .collect::<Vec<_>>()
4335 .try_into()
4336 .unwrap();
4337
4338 let l1_url = network_config.l1_url();
4339 let config = TestNetworkConfigBuilder::with_num_nodes()
4340 .api_config(SqlDataSource::options(
4341 &storage[0],
4342 Options::with_port(api_port),
4343 ))
4344 .network_config(network_config)
4345 .persistences(persistence.clone())
4346 .catchups(std::array::from_fn(|_| {
4347 StatePeers::<StaticVersion<0, 1>>::from_urls(
4348 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4349 Default::default(),
4350 Duration::from_secs(2),
4351 &NoMetrics,
4352 )
4353 }))
4354 .pos_hook(
4355 DelegationConfig::MultipleDelegators,
4356 Default::default(),
4357 POS_V3,
4358 )
4359 .await
4360 .unwrap()
4361 .build();
4362
4363 let network = TestNetwork::new(config, POS_V3).await;
4364
4365 let mut prev_st = None;
4366 let state = network.server.decided_state().await;
4367 let chain_config = state.chain_config.resolve().expect("resolve chain config");
4368 let stake_table = chain_config.stake_table_contract.unwrap();
4369
4370 let l1_client = L1ClientOptions::default()
4371 .connect(vec![l1_url])
4372 .expect("failed to connect to l1");
4373
4374 let client: Client<ServerError, SequencerApiVersion> =
4375 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4376
4377 let mut headers = client
4378 .socket("availability/stream/headers/0")
4379 .subscribe::<Header>()
4380 .await
4381 .unwrap();
4382
4383 let mut target_bh = 0;
4384 while let Some(header) = headers.next().await {
4385 let header = header.unwrap();
4386 println!("got header with height {}", header.height());
4387 if header.height() == 0 {
4388 continue;
4389 }
4390 let l1_block = header.l1_finalized().expect("l1 block not found");
4391
4392 let sorted_events = Fetcher::fetch_events_from_contract(
4393 l1_client.clone(),
4394 stake_table,
4395 None,
4396 l1_block.number(),
4397 )
4398 .await?;
4399
4400 let mut sorted_dedup_removed = sorted_events.clone();
4401 sorted_dedup_removed.dedup();
4402
4403 assert_eq!(
4404 sorted_events.len(),
4405 sorted_dedup_removed.len(),
4406 "duplicates found"
4407 );
4408
4409 let stake_table =
4411 validators_from_l1_events(sorted_events.into_iter().map(|(_, e)| e)).unwrap();
4412 if let Some(prev_st) = prev_st {
4413 assert_eq!(stake_table, prev_st);
4414 }
4415
4416 prev_st = Some(stake_table);
4417
4418 if target_bh == 100 {
4419 break;
4420 }
4421
4422 target_bh = header.height();
4423 }
4424
4425 Ok(())
4426 }
4427
4428 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4429 async fn test_rewards_v4() -> anyhow::Result<()> {
4430 const EPOCH_HEIGHT: u64 = 20;
4440
4441 let network_config = TestConfigBuilder::default()
4442 .epoch_height(EPOCH_HEIGHT)
4443 .build();
4444
4445 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
4446
4447 const NUM_NODES: usize = 5;
4448
4449 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4450 let persistence: [_; NUM_NODES] = storage
4451 .iter()
4452 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4453 .collect::<Vec<_>>()
4454 .try_into()
4455 .unwrap();
4456
4457 let config = TestNetworkConfigBuilder::with_num_nodes()
4458 .api_config(SqlDataSource::options(
4459 &storage[0],
4460 Options::with_port(api_port),
4461 ))
4462 .network_config(network_config)
4463 .persistences(persistence.clone())
4464 .catchups(std::array::from_fn(|_| {
4465 StatePeers::<StaticVersion<0, 1>>::from_urls(
4466 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4467 Default::default(),
4468 Duration::from_secs(2),
4469 &NoMetrics,
4470 )
4471 }))
4472 .pos_hook(
4473 DelegationConfig::MultipleDelegators,
4474 Default::default(),
4475 POS_V4,
4476 )
4477 .await
4478 .unwrap()
4479 .build();
4480
4481 let network = TestNetwork::new(config, POS_V4).await;
4482 let client: Client<ServerError, SequencerApiVersion> =
4483 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4484
4485 let mut events = network.peers[0].event_stream().await;
4487 while let Some(event) = events.next().await {
4488 if let EventType::Decide { leaf_chain, .. } = event.event {
4489 let height = leaf_chain[0].leaf.height();
4490 tracing::info!("Node 0 decided at height: {height}");
4491 if height > EPOCH_HEIGHT * 3 {
4492 break;
4493 }
4494 }
4495 }
4496
4497 {
4499 client
4500 .get::<AuthenticatedValidatorMap>("node/validators/1")
4501 .send()
4502 .await
4503 .unwrap()
4504 .is_empty();
4505
4506 client
4507 .get::<AuthenticatedValidatorMap>("node/validators/2")
4508 .send()
4509 .await
4510 .unwrap()
4511 .is_empty();
4512 }
4513
4514 let validators = client
4516 .get::<AuthenticatedValidatorMap>("node/validators/3")
4517 .send()
4518 .await
4519 .expect("validators");
4520
4521 assert!(!validators.is_empty());
4522
4523 let mut addresses = HashSet::new();
4525 for v in validators.values() {
4526 addresses.insert(v.account);
4527 addresses.extend(v.clone().delegators.keys().collect::<Vec<_>>());
4528 }
4529
4530 let mut leaves = client
4531 .socket("availability/stream/leaves/0")
4532 .subscribe::<LeafQueryData<SeqTypes>>()
4533 .await
4534 .unwrap();
4535
4536 let node_state = network.server.node_state();
4537 let coordinator = node_state.coordinator;
4538
4539 let membership = coordinator.membership().read().await;
4540
4541 while let Some(leaf) = leaves.next().await {
4543 let leaf = leaf.unwrap();
4544 let header = leaf.header();
4545 assert_eq!(header.total_reward_distributed().unwrap().0, U256::ZERO);
4546
4547 let epoch_number =
4548 EpochNumber::new(epoch_from_block_number(leaf.height(), EPOCH_HEIGHT));
4549
4550 assert!(membership.epoch_block_reward(epoch_number).is_none());
4551
4552 let height = header.height();
4553 for address in addresses.clone() {
4554 let amount = client
4555 .get::<Option<RewardAmount>>(&format!(
4556 "reward-state-v2/reward-balance/{height}/{address}"
4557 ))
4558 .send()
4559 .await
4560 .ok()
4561 .flatten();
4562 assert!(amount.is_none(), "amount is not none for block {height}")
4563 }
4564
4565 if leaf.height() == EPOCH_HEIGHT * 2 {
4566 break;
4567 }
4568 }
4569
4570 drop(membership);
4571
4572 let mut rewards_map = HashMap::new();
4573 let mut total_distributed = U256::ZERO;
4574 let mut epoch_rewards = HashMap::<EpochNumber, U256>::new();
4575
4576 while let Some(leaf) = leaves.next().await {
4577 let leaf = leaf.unwrap();
4578
4579 let header = leaf.header();
4580 let distributed = header
4581 .total_reward_distributed()
4582 .expect("rewards distributed is none");
4583
4584 let block = leaf.height();
4585 tracing::info!("verify rewards for block={block:?}");
4586 let membership = coordinator.membership().read().await;
4587 let epoch_number =
4588 EpochNumber::new(epoch_from_block_number(leaf.height(), EPOCH_HEIGHT));
4589
4590 let block_reward = membership.epoch_block_reward(epoch_number).unwrap();
4591 let leader = membership
4592 .leader(leaf.leaf().view_number(), Some(epoch_number))
4593 .expect("leader");
4594 let leader_eth_address = membership.address(&epoch_number, leader).expect("address");
4595
4596 drop(membership);
4597
4598 let validators = client
4599 .get::<AuthenticatedValidatorMap>(&format!("node/validators/{epoch_number}"))
4600 .send()
4601 .await
4602 .expect("validators");
4603
4604 let leader_validator = validators
4605 .get(&leader_eth_address)
4606 .expect("leader not found");
4607
4608 let distributor =
4609 RewardDistributor::new(leader_validator.clone(), block_reward, distributed);
4610 for validator in validators.values() {
4612 let delegator_stake_sum: U256 = validator.delegators.values().cloned().sum();
4613
4614 assert_eq!(delegator_stake_sum, validator.stake);
4615 }
4616
4617 let computed_rewards = distributor.compute_rewards().expect("reward computation");
4618
4619 let total_reward = block_reward.0;
4621 let leader_commission_basis_points = U256::from(leader_validator.commission);
4622 let calculated_leader_commission_reward = leader_commission_basis_points
4623 .checked_mul(total_reward)
4624 .context("overflow")?
4625 .checked_div(U256::from(COMMISSION_BASIS_POINTS))
4626 .context("overflow")?;
4627
4628 assert!(
4629 computed_rewards.leader_commission().0 - calculated_leader_commission_reward
4630 <= U256::from(10_u64)
4631 );
4632
4633 let leader_commission = *computed_rewards.leader_commission();
4635 for (address, amount) in computed_rewards.delegators().clone() {
4636 rewards_map
4637 .entry(address)
4638 .and_modify(|entry| *entry += amount)
4639 .or_insert(amount);
4640 }
4641
4642 rewards_map
4644 .entry(leader_eth_address)
4645 .and_modify(|entry| *entry += leader_commission)
4646 .or_insert(leader_commission);
4647
4648 for (address, calculated_amount) in rewards_map.iter() {
4650 let mut attempt = 0;
4651 let amount_from_api = loop {
4652 let result = client
4653 .get::<Option<RewardAmount>>(&format!(
4654 "reward-state-v2/reward-balance/{block}/{address}"
4655 ))
4656 .send()
4657 .await
4658 .ok()
4659 .flatten();
4660
4661 if let Some(amount) = result {
4662 break amount;
4663 }
4664
4665 attempt += 1;
4666 if attempt >= 3 {
4667 panic!(
4668 "Failed to fetch reward amount for address {address} after 3 retries"
4669 );
4670 }
4671
4672 sleep(Duration::from_secs(2)).await;
4673 };
4674
4675 assert_eq!(amount_from_api, *calculated_amount);
4676 }
4677
4678 total_distributed += block_reward.0;
4680 assert_eq!(
4681 header.total_reward_distributed().unwrap().0,
4682 total_distributed
4683 );
4684
4685 epoch_rewards
4687 .entry(epoch_number)
4688 .and_modify(|r| assert_eq!(*r, block_reward.0))
4689 .or_insert(block_reward.0);
4690
4691 if leaf.height() == EPOCH_HEIGHT * 5 {
4693 break;
4694 }
4695 }
4696
4697 Ok(())
4698 }
4699
4700 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4701 async fn test_epoch_reward_distribution_basic() -> anyhow::Result<()> {
4702 const EPOCH_HEIGHT: u64 = 10;
4703 const NUM_NODES: usize = 5;
4704
4705 const V6: Upgrade = Upgrade::trivial(version(0, 6));
4706
4707 let network_config = TestConfigBuilder::default()
4708 .epoch_height(EPOCH_HEIGHT)
4709 .build();
4710
4711 let api_port = reserve_tcp_port().expect("No ports free for query service");
4712
4713 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4714 let persistence: [_; NUM_NODES] = storage
4715 .iter()
4716 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4717 .collect::<Vec<_>>()
4718 .try_into()
4719 .unwrap();
4720
4721 let config = TestNetworkConfigBuilder::with_num_nodes()
4722 .api_config(SqlDataSource::options(
4723 &storage[0],
4724 Options::with_port(api_port),
4725 ))
4726 .network_config(network_config)
4727 .persistences(persistence.clone())
4728 .catchups(std::array::from_fn(|_| {
4729 StatePeers::<StaticVersion<0, 1>>::from_urls(
4730 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4731 Default::default(),
4732 Duration::from_secs(2),
4733 &NoMetrics,
4734 )
4735 }))
4736 .pos_hook(DelegationConfig::MultipleDelegators, Default::default(), V6)
4737 .await
4738 .unwrap()
4739 .build();
4740
4741 let _network = TestNetwork::new(config, V6).await;
4742 let client: Client<ServerError, SequencerApiVersion> =
4743 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4744
4745 let height_client: Client<ServerError, StaticVersion<0, 1>> =
4747 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4748 wait_until_block_height(&height_client, "node/block-height", EPOCH_HEIGHT * 5).await;
4749
4750 let mut leaves = client
4751 .socket("availability/stream/leaves/0")
4752 .subscribe::<LeafQueryData<SeqTypes>>()
4753 .await
4754 .unwrap();
4755
4756 while let Some(leaf) = leaves.next().await {
4758 let leaf = leaf.unwrap();
4759 let header = leaf.header();
4760 let height = header.height();
4761
4762 let total_distributed = header.total_reward_distributed().unwrap();
4763 assert_eq!(
4764 total_distributed.0,
4765 U256::ZERO,
4766 "epochs 1-3 should have no rewards, height={height}"
4767 );
4768
4769 if height == EPOCH_HEIGHT * 3 {
4770 break;
4771 }
4772 }
4773
4774 while let Some(leaf) = leaves.next().await {
4775 let leaf = leaf.unwrap();
4776 let header = leaf.header();
4777 let height = header.height();
4778
4779 if height == EPOCH_HEIGHT * 4 {
4780 let total_distributed = header.total_reward_distributed().unwrap();
4781 assert!(total_distributed.0 > U256::ZERO,);
4782 break;
4783 }
4784 }
4785
4786 while let Some(leaf) = leaves.next().await {
4787 let leaf = leaf.unwrap();
4788 let header = leaf.header();
4789 let height = header.height();
4790
4791 if height == EPOCH_HEIGHT * 5 {
4792 let total_distributed = header.total_reward_distributed().unwrap();
4793 assert!(total_distributed.0 > U256::ZERO,);
4794 break;
4795 }
4796 }
4797
4798 Ok(())
4799 }
4800
4801 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4802 async fn test_epoch_reward_total_distributed_rewards() -> anyhow::Result<()> {
4803 const EPOCH_HEIGHT: u64 = 10;
4808 const NUM_NODES: usize = 5;
4809
4810 const V6: Upgrade = Upgrade::trivial(version(0, 6));
4811
4812 let network_config = TestConfigBuilder::default()
4813 .epoch_height(EPOCH_HEIGHT)
4814 .build();
4815
4816 let api_port = reserve_tcp_port().expect("No ports free for query service");
4817
4818 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4819 let persistence: [_; NUM_NODES] = storage
4820 .iter()
4821 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4822 .collect::<Vec<_>>()
4823 .try_into()
4824 .unwrap();
4825
4826 let config = TestNetworkConfigBuilder::with_num_nodes()
4827 .api_config(SqlDataSource::options(
4828 &storage[0],
4829 Options::with_port(api_port),
4830 ))
4831 .network_config(network_config)
4832 .persistences(persistence.clone())
4833 .catchups(std::array::from_fn(|_| {
4834 StatePeers::<StaticVersion<0, 1>>::from_urls(
4835 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4836 Default::default(),
4837 Duration::from_secs(2),
4838 &NoMetrics,
4839 )
4840 }))
4841 .pos_hook(DelegationConfig::MultipleDelegators, Default::default(), V6)
4842 .await
4843 .unwrap()
4844 .build();
4845
4846 let _network = TestNetwork::new(config, V6).await;
4847 let client: Client<ServerError, SequencerApiVersion> =
4848 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4849
4850 let height_client: Client<ServerError, StaticVersion<0, 1>> =
4851 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4852 wait_until_block_height(&height_client, "node/block-height", EPOCH_HEIGHT * 5).await;
4853
4854 let mut leaves = client
4855 .socket("availability/stream/leaves/0")
4856 .subscribe::<LeafQueryData<SeqTypes>>()
4857 .await
4858 .unwrap();
4859
4860 while let Some(leaf) = leaves.next().await {
4861 let leaf = leaf.unwrap();
4862 let header = leaf.header();
4863 let height = header.height();
4864
4865 let total_distributed = header.total_reward_distributed().unwrap();
4866 assert_eq!(total_distributed.0, U256::ZERO,);
4867
4868 if height == EPOCH_HEIGHT * 3 {
4869 break;
4870 }
4871 }
4872
4873 while let Some(leaf) = leaves.next().await {
4874 let leaf = leaf.unwrap();
4875 let header = leaf.header();
4876 let height = header.height();
4877
4878 let total_distributed = header.total_reward_distributed().unwrap();
4879
4880 if height < EPOCH_HEIGHT * 4 {
4881 assert_eq!(total_distributed.0, U256::ZERO,);
4882 } else {
4883 assert!(total_distributed.0 > U256::ZERO,);
4884 break;
4885 }
4886 }
4887
4888 let epoch4_last_reward = {
4889 let header = client
4890 .get::<Header>(&format!("availability/header/{}", EPOCH_HEIGHT * 4))
4891 .send()
4892 .await
4893 .unwrap();
4894 header.total_reward_distributed().unwrap()
4895 };
4896
4897 assert!(
4898 epoch4_last_reward.0 > U256::ZERO,
4899 "epoch 4 last block should have positive rewards"
4900 );
4901
4902 while let Some(leaf) = leaves.next().await {
4903 let leaf = leaf.unwrap();
4904 let header = leaf.header();
4905 let height = header.height();
4906
4907 let total_distributed = header.total_reward_distributed().unwrap();
4908
4909 if height < EPOCH_HEIGHT * 5 {
4910 assert_eq!(total_distributed, epoch4_last_reward,);
4911 } else {
4912 assert!(total_distributed.0 > epoch4_last_reward.0,);
4913 break;
4914 }
4915 }
4916
4917 Ok(())
4918 }
4919
4920 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4923 async fn test_reward_state_v2_epoch_distribution() -> anyhow::Result<()> {
4924 const EPOCH_HEIGHT: u64 = 10;
4925 const NUM_NODES: usize = 5;
4926 const NUM_EPOCHS: u64 = 6;
4927 const V6: Upgrade = Upgrade::trivial(version(0, 6));
4928
4929 let network_config = TestConfigBuilder::default()
4930 .epoch_height(EPOCH_HEIGHT)
4931 .build();
4932
4933 let api_port = reserve_tcp_port().expect("No ports free for query service");
4934
4935 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
4936 let persistence: [_; NUM_NODES] = storage
4937 .iter()
4938 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
4939 .collect::<Vec<_>>()
4940 .try_into()
4941 .unwrap();
4942
4943 let config = TestNetworkConfigBuilder::with_num_nodes()
4944 .api_config(SqlDataSource::options(
4945 &storage[0],
4946 Options::with_port(api_port),
4947 ))
4948 .network_config(network_config)
4949 .persistences(persistence.clone())
4950 .catchups(std::array::from_fn(|_| {
4951 StatePeers::<StaticVersion<0, 1>>::from_urls(
4952 vec![format!("http://localhost:{api_port}").parse().unwrap()],
4953 Default::default(),
4954 Duration::from_secs(2),
4955 &NoMetrics,
4956 )
4957 }))
4958 .pos_hook(DelegationConfig::MultipleDelegators, Default::default(), V6)
4959 .await
4960 .unwrap()
4961 .build();
4962
4963 let network = TestNetwork::new(config, V6).await;
4964 let client: Client<ServerError, SequencerApiVersion> =
4965 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
4966
4967 let node_state = network.server.node_state();
4968 let coordinator = node_state.coordinator;
4969
4970 let mut expected_total_distributed = U256::ZERO;
4971
4972 let mut leaves = client
4973 .socket("availability/stream/leaves/0")
4974 .subscribe::<LeafQueryData<SeqTypes>>()
4975 .await
4976 .unwrap();
4977
4978 while let Some(leaf) = leaves.next().await {
4979 let leaf = leaf.unwrap();
4980 let header = leaf.header();
4981 let height = header.height();
4982
4983 let epoch = epoch_from_block_number(height, EPOCH_HEIGHT);
4984
4985 let is_epoch_last_block = height % EPOCH_HEIGHT == 0;
4986
4987 if epoch <= 3 {
4988 continue;
4989 }
4990
4991 let header_total_distributed = header
4992 .total_reward_distributed()
4993 .expect("total_reward_distributed should exist");
4994
4995 if is_epoch_last_block {
4996 let prev_epoch = epoch - 1;
4997 let prev_epoch_number = EpochNumber::new(prev_epoch);
4998 let membership = coordinator.membership().read().await;
4999 let prev_block_reward = membership
5000 .epoch_block_reward(prev_epoch_number)
5001 .expect("epoch block reward should exist");
5002 drop(membership);
5003
5004 let epoch_total = prev_block_reward.0 * U256::from(EPOCH_HEIGHT);
5005 expected_total_distributed += epoch_total;
5006 }
5007
5008 assert_eq!(
5009 header_total_distributed.0, expected_total_distributed,
5010 "total_reward_distributed mismatch at height {height}"
5011 );
5012
5013 if height >= NUM_EPOCHS * EPOCH_HEIGHT {
5014 break;
5015 }
5016 }
5017
5018 Ok(())
5019 }
5020
5021 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5023 async fn test_epoch_leader_counts() -> anyhow::Result<()> {
5024 const EPOCH_HEIGHT: u64 = 10;
5025 const NUM_NODES: usize = 5;
5026 const NUM_EPOCHS: u64 = 6;
5027 const V6: Upgrade = Upgrade::trivial(version(0, 6));
5028
5029 let network_config = TestConfigBuilder::default()
5030 .epoch_height(EPOCH_HEIGHT)
5031 .build();
5032
5033 let api_port = reserve_tcp_port().expect("No ports free for query service");
5034
5035 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5036 let persistence: [_; NUM_NODES] = storage
5037 .iter()
5038 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5039 .collect::<Vec<_>>()
5040 .try_into()
5041 .unwrap();
5042
5043 let config = TestNetworkConfigBuilder::with_num_nodes()
5044 .api_config(SqlDataSource::options(
5045 &storage[0],
5046 Options::with_port(api_port),
5047 ))
5048 .network_config(network_config)
5049 .persistences(persistence.clone())
5050 .catchups(std::array::from_fn(|_| {
5051 StatePeers::<StaticVersion<0, 1>>::from_urls(
5052 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5053 Default::default(),
5054 Duration::from_secs(2),
5055 &NoMetrics,
5056 )
5057 }))
5058 .pos_hook(DelegationConfig::MultipleDelegators, Default::default(), V6)
5059 .await
5060 .unwrap()
5061 .build();
5062
5063 let network = TestNetwork::new(config, V6).await;
5064 let client: Client<ServerError, SequencerApiVersion> =
5065 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5066
5067 let node_state = network.server.node_state();
5068 let coordinator = node_state.coordinator;
5069
5070 let mut expected_counts: HashMap<Address, u16> = HashMap::new();
5072
5073 let mut leaves = client
5074 .socket("availability/stream/leaves/0")
5075 .subscribe::<LeafQueryData<SeqTypes>>()
5076 .await
5077 .unwrap();
5078
5079 while let Some(leaf) = leaves.next().await {
5080 let leaf = leaf.unwrap();
5081 let header = leaf.header();
5082 let height = header.height();
5083 let epoch = epoch_from_block_number(height, EPOCH_HEIGHT);
5084 let epoch_number = EpochNumber::new(epoch);
5085
5086 if epoch <= 2 {
5087 continue;
5088 }
5089
5090 let header_leader_counts = header
5091 .leader_counts()
5092 .expect("V6 header must have leader_counts");
5093
5094 let is_epoch_start = (height - 1) % EPOCH_HEIGHT == 0;
5096 if is_epoch_start {
5097 expected_counts.clear();
5098 }
5099
5100 let view_number = leaf.leaf().view_number();
5102 let membership = coordinator.membership().read().await;
5103 let leader = membership
5104 .leader(view_number, Some(epoch_number))
5105 .expect("leader should exist");
5106 let leader_address = membership
5107 .address(&epoch_number, leader)
5108 .expect("leader should have an address");
5109
5110 let validator_leader_counts =
5111 ValidatorLeaderCounts::new(&membership, &epoch_number, *header_leader_counts)
5112 .expect("ValidatorLeaderCounts should build from header leader_counts");
5113 drop(membership);
5114
5115 *expected_counts.entry(leader_address).or_insert(0) += 1;
5116
5117 let header_counts: HashMap<Address, u16> = validator_leader_counts
5118 .active_leaders()
5119 .map(|(v, count)| (v.account, count))
5120 .collect();
5121
5122 assert_eq!(
5123 header_counts, expected_counts,
5124 "leader_counts mismatch at height {height} (epoch {epoch})"
5125 );
5126
5127 if height % EPOCH_HEIGHT == 0 {
5128 let total: u16 = expected_counts.values().sum();
5129 assert_eq!(
5130 total, EPOCH_HEIGHT as u16,
5131 "total leader_counts at epoch boundary should equal EPOCH_HEIGHT at height \
5132 {height}"
5133 );
5134 }
5135
5136 if height >= NUM_EPOCHS * EPOCH_HEIGHT {
5137 break;
5138 }
5139 }
5140
5141 Ok(())
5142 }
5143
5144 #[rstest]
5145 #[case(POS_V3)]
5146 #[case(POS_V4)]
5147 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5148 async fn test_node_stake_table_api(#[case] upgrade: Upgrade) {
5149 let epoch_height = 20;
5150
5151 let network_config = TestConfigBuilder::default()
5152 .epoch_height(epoch_height)
5153 .build();
5154
5155 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5156
5157 const NUM_NODES: usize = 2;
5158 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5160 let persistence: [_; NUM_NODES] = storage
5161 .iter()
5162 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5163 .collect::<Vec<_>>()
5164 .try_into()
5165 .unwrap();
5166
5167 let config = TestNetworkConfigBuilder::with_num_nodes()
5168 .api_config(SqlDataSource::options(
5169 &storage[0],
5170 Options::with_port(api_port),
5171 ))
5172 .network_config(network_config)
5173 .persistences(persistence.clone())
5174 .catchups(std::array::from_fn(|_| {
5175 StatePeers::<StaticVersion<0, 1>>::from_urls(
5176 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5177 Default::default(),
5178 Duration::from_secs(2),
5179 &NoMetrics,
5180 )
5181 }))
5182 .pos_hook(
5183 DelegationConfig::MultipleDelegators,
5184 Default::default(),
5185 upgrade,
5186 )
5187 .await
5188 .unwrap()
5189 .build();
5190
5191 let _network = TestNetwork::new(config, upgrade).await;
5192
5193 let client: Client<ServerError, SequencerApiVersion> =
5194 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
5195
5196 let _blocks = client
5198 .socket("availability/stream/blocks/0")
5199 .subscribe::<BlockQueryData<SeqTypes>>()
5200 .await
5201 .unwrap()
5202 .take(40)
5203 .try_collect::<Vec<_>>()
5204 .await
5205 .unwrap();
5206
5207 for i in 1..=3 {
5208 let _st = client
5209 .get::<Vec<PeerConfig<SeqTypes>>>(&format!("node/stake-table/{}", i as u64))
5210 .send()
5211 .await
5212 .expect("failed to get stake table");
5213 }
5214
5215 let _st = client
5216 .get::<StakeTableWithEpochNumber<SeqTypes>>("node/stake-table/current")
5217 .send()
5218 .await
5219 .expect("failed to get stake table");
5220 }
5221
5222 #[rstest]
5223 #[case(POS_V3)]
5224 #[case(POS_V4)]
5225 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5226 async fn test_epoch_stake_table_catchup(#[case] upgrade: Upgrade) {
5227 const EPOCH_HEIGHT: u64 = 10;
5228 const NUM_NODES: usize = 6;
5229
5230 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5231
5232 let network_config = TestConfigBuilder::default()
5233 .epoch_height(EPOCH_HEIGHT)
5234 .build();
5235
5236 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5238
5239 let persistence_options: [_; NUM_NODES] = storage
5240 .iter()
5241 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5242 .collect::<Vec<_>>()
5243 .try_into()
5244 .unwrap();
5245
5246 let catchup_peers = std::array::from_fn(|_| {
5248 StatePeers::<StaticVersion<0, 1>>::from_urls(
5249 vec![format!("http://localhost:{port}").parse().unwrap()],
5250 Default::default(),
5251 Duration::from_secs(2),
5252 &NoMetrics,
5253 )
5254 });
5255 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
5256 .api_config(SqlDataSource::options(
5257 &storage[0],
5258 Options::with_port(port),
5259 ))
5260 .network_config(network_config)
5261 .persistences(persistence_options.clone())
5262 .catchups(catchup_peers)
5263 .pos_hook(
5264 DelegationConfig::MultipleDelegators,
5265 Default::default(),
5266 upgrade,
5267 )
5268 .await
5269 .unwrap()
5270 .build();
5271
5272 let state = config.states()[0].clone();
5273 let mut network = TestNetwork::new(config, upgrade).await;
5274
5275 let mut events = network.peers[0].event_stream().await;
5277 while let Some(event) = events.next().await {
5278 if let EventType::Decide { leaf_chain, .. } = event.event {
5279 let height = leaf_chain[0].leaf.height();
5280 tracing::info!("Node 0 decided at height: {height}");
5281 if height > EPOCH_HEIGHT * 3 {
5282 break;
5283 }
5284 }
5285 }
5286
5287 tracing::info!("Shutting down peer 0");
5289 network.peers.remove(0);
5290
5291 let mut events = network.server.event_stream().await;
5293 while let Some(event) = events.next().await {
5294 if let EventType::Decide { leaf_chain, .. } = event.event {
5295 let height = leaf_chain[0].leaf.height();
5296 if height > EPOCH_HEIGHT * 7 {
5297 break;
5298 }
5299 }
5300 }
5301
5302 let storage = SqlDataSource::create_storage().await;
5304 let options = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
5305 tracing::info!("Restarting peer 0");
5306 let node = network
5307 .cfg
5308 .init_node(
5309 1,
5310 state,
5311 options,
5312 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5313 vec![format!("http://localhost:{port}").parse().unwrap()],
5314 Default::default(),
5315 Duration::from_secs(2),
5316 &NoMetrics,
5317 )),
5318 None,
5319 &NoMetrics,
5320 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5321 NullEventConsumer,
5322 upgrade,
5323 Default::default(),
5324 )
5325 .await;
5326
5327 let coordinator = node.node_state().coordinator;
5328 let server_node_state = network.server.node_state();
5329 let server_coordinator = server_node_state.coordinator;
5330 for epoch_num in 1..=7 {
5332 let epoch = EpochNumber::new(epoch_num);
5333 let membership_for_epoch = coordinator.membership_for_epoch(Some(epoch)).await;
5334 if membership_for_epoch.is_err() {
5335 coordinator.wait_for_catchup(epoch).await.unwrap();
5336 }
5337
5338 println!("have stake table for epoch = {epoch_num}");
5339
5340 let node_stake_table = coordinator
5341 .membership()
5342 .read()
5343 .await
5344 .stake_table(Some(epoch));
5345 let stake_table = server_coordinator
5346 .membership()
5347 .read()
5348 .await
5349 .stake_table(Some(epoch));
5350 println!("asserting stake table for epoch = {epoch_num}");
5351
5352 assert_eq!(
5353 node_stake_table, stake_table,
5354 "Stake table mismatch for epoch {epoch_num}",
5355 );
5356 }
5357 }
5358
5359 #[rstest]
5360 #[case(POS_V3)]
5361 #[case(POS_V4)]
5362 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5363 async fn test_epoch_stake_table_catchup_stress(#[case] upgrade: Upgrade) {
5364 const EPOCH_HEIGHT: u64 = 10;
5365 const NUM_NODES: usize = 6;
5366
5367 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5368
5369 let network_config = TestConfigBuilder::default()
5370 .epoch_height(EPOCH_HEIGHT)
5371 .build();
5372
5373 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5375
5376 let persistence_options: [_; NUM_NODES] = storage
5377 .iter()
5378 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5379 .collect::<Vec<_>>()
5380 .try_into()
5381 .unwrap();
5382
5383 let catchup_peers = std::array::from_fn(|_| {
5385 StatePeers::<StaticVersion<0, 1>>::from_urls(
5386 vec![format!("http://localhost:{port}").parse().unwrap()],
5387 Default::default(),
5388 Duration::from_secs(2),
5389 &NoMetrics,
5390 )
5391 });
5392 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
5393 .api_config(SqlDataSource::options(
5394 &storage[0],
5395 Options::with_port(port),
5396 ))
5397 .network_config(network_config)
5398 .persistences(persistence_options.clone())
5399 .catchups(catchup_peers)
5400 .pos_hook(
5401 DelegationConfig::MultipleDelegators,
5402 Default::default(),
5403 upgrade,
5404 )
5405 .await
5406 .unwrap()
5407 .build();
5408
5409 let state = config.states()[0].clone();
5410 let mut network = TestNetwork::new(config, upgrade).await;
5411
5412 let mut events = network.peers[0].event_stream().await;
5414 while let Some(event) = events.next().await {
5415 if let EventType::Decide { leaf_chain, .. } = event.event {
5416 let height = leaf_chain[0].leaf.height();
5417 tracing::info!("Node 0 decided at height: {height}");
5418 if height > EPOCH_HEIGHT * 3 {
5419 break;
5420 }
5421 }
5422 }
5423
5424 tracing::info!("Shutting down peer 0");
5426 network.peers.remove(0);
5427
5428 let mut events = network.server.event_stream().await;
5430 while let Some(event) = events.next().await {
5431 if let EventType::Decide { leaf_chain, .. } = event.event {
5432 let height = leaf_chain[0].leaf.height();
5433 tracing::info!("Server decided at height: {height}");
5434 if height > EPOCH_HEIGHT * 7 {
5436 break;
5437 }
5438 }
5439 }
5440
5441 let storage = SqlDataSource::create_storage().await;
5443 let options = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
5444
5445 tracing::info!("Restarting peer 0");
5446 let node = network
5447 .cfg
5448 .init_node(
5449 1,
5450 state,
5451 options,
5452 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5453 vec![format!("http://localhost:{port}").parse().unwrap()],
5454 Default::default(),
5455 Duration::from_secs(2),
5456 &NoMetrics,
5457 )),
5458 None,
5459 &NoMetrics,
5460 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5461 NullEventConsumer,
5462 upgrade,
5463 Default::default(),
5464 )
5465 .await;
5466
5467 let coordinator = node.node_state().coordinator;
5468
5469 let server_node_state = network.server.node_state();
5470 let server_coordinator = server_node_state.coordinator;
5471
5472 let mut rand_epochs: Vec<_> = (1..=7).collect();
5474 rand_epochs.shuffle(&mut rand::thread_rng());
5475 println!("trigger catchup in this order: {rand_epochs:?}");
5476 for epoch_num in rand_epochs {
5477 let epoch = EpochNumber::new(epoch_num);
5478 let _ = coordinator.membership_for_epoch(Some(epoch)).await;
5479 }
5480
5481 for epoch_num in 1..=7 {
5483 println!("getting stake table for epoch = {epoch_num}");
5484 let epoch = EpochNumber::new(epoch_num);
5485 let _ = coordinator.wait_for_catchup(epoch).await.unwrap();
5486
5487 println!("have stake table for epoch = {epoch_num}");
5488
5489 let node_stake_table = coordinator
5490 .membership()
5491 .read()
5492 .await
5493 .stake_table(Some(epoch));
5494 let stake_table = server_coordinator
5495 .membership()
5496 .read()
5497 .await
5498 .stake_table(Some(epoch));
5499
5500 println!("asserting stake table for epoch = {epoch_num}");
5501
5502 assert_eq!(
5503 node_stake_table, stake_table,
5504 "Stake table mismatch for epoch {epoch_num}",
5505 );
5506 }
5507 }
5508
5509 #[rstest]
5510 #[case(POS_V3)]
5511 #[case(POS_V4)]
5512 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5513 async fn test_merklized_state_catchup_on_restart(
5514 #[case] upgrade: Upgrade,
5515 ) -> anyhow::Result<()> {
5516 const EPOCH_HEIGHT: u64 = 10;
5528
5529 let network_config = TestConfigBuilder::default()
5530 .epoch_height(EPOCH_HEIGHT)
5531 .build();
5532
5533 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5534
5535 tracing::info!("API PORT = {api_port}");
5536 const NUM_NODES: usize = 5;
5537
5538 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5539 let persistence: [_; NUM_NODES] = storage
5540 .iter()
5541 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5542 .collect::<Vec<_>>()
5543 .try_into()
5544 .unwrap();
5545
5546 let config = TestNetworkConfigBuilder::with_num_nodes()
5547 .api_config(SqlDataSource::options(
5548 &storage[0],
5549 Options::with_port(api_port).catchup(Default::default()),
5550 ))
5551 .network_config(network_config)
5552 .persistences(persistence.clone())
5553 .catchups(std::array::from_fn(|_| {
5554 StatePeers::<StaticVersion<0, 1>>::from_urls(
5555 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5556 Default::default(),
5557 Duration::from_secs(2),
5558 &NoMetrics,
5559 )
5560 }))
5561 .pos_hook(
5562 DelegationConfig::MultipleDelegators,
5563 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
5564 upgrade,
5565 )
5566 .await
5567 .unwrap()
5568 .build();
5569 let state = config.states()[0].clone();
5570 let mut network = TestNetwork::new(config, upgrade).await;
5571
5572 network.peers[0].shut_down().await;
5577 network.peers.remove(0);
5578 let node_0_storage = &storage[1];
5579 let node_0_persistence = persistence[1].clone();
5580 let node_0_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5581 tracing::info!("node_0_port {node_0_port}");
5582 let opt = Options::with_port(node_0_port).query_sql(
5584 Query {
5585 peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
5586 },
5587 tmp_options(node_0_storage),
5588 );
5589
5590 let node_0 = opt
5592 .clone()
5593 .serve(|metrics, consumer, storage| {
5594 let cfg = network.cfg.clone();
5595 let node_0_persistence = node_0_persistence.clone();
5596 let state = state.clone();
5597 async move {
5598 Ok(cfg
5599 .init_node(
5600 1,
5601 state,
5602 node_0_persistence.clone(),
5603 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5604 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5605 Default::default(),
5606 Duration::from_secs(2),
5607 &NoMetrics,
5608 )),
5609 storage,
5610 &*metrics,
5611 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5612 consumer,
5613 upgrade,
5614 Default::default(),
5615 )
5616 .await)
5617 }
5618 .boxed()
5619 })
5620 .await
5621 .unwrap();
5622
5623 let mut events = network.peers[2].event_stream().await;
5624 wait_for_epochs(&mut events, EPOCH_HEIGHT, 1).await;
5626
5627 drop(node_0);
5629
5630 wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
5632
5633 tracing::info!("restarting node");
5635 let node_0 = opt
5636 .serve(|metrics, consumer, storage| {
5637 let cfg = network.cfg.clone();
5638 async move {
5639 Ok(cfg
5640 .init_node(
5641 1,
5642 state,
5643 node_0_persistence,
5644 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5645 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5646 Default::default(),
5647 Duration::from_secs(2),
5648 &NoMetrics,
5649 )),
5650 storage,
5651 &*metrics,
5652 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5653 consumer,
5654 upgrade,
5655 Default::default(),
5656 )
5657 .await)
5658 }
5659 .boxed()
5660 })
5661 .await
5662 .unwrap();
5663
5664 let client: Client<ServerError, SequencerApiVersion> =
5665 Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
5666 client.connect(None).await;
5667
5668 wait_for_epochs(&mut events, EPOCH_HEIGHT, 6).await;
5669
5670 let epoch_7_block = EPOCH_HEIGHT * 6 + 1;
5671
5672 let mut retries = 0;
5674 loop {
5675 sleep(Duration::from_secs(1)).await;
5676 let state = node_0.decided_state().await;
5677
5678 let leaves = if upgrade.base == EPOCH_VERSION {
5679 state.reward_merkle_tree_v1.num_leaves()
5681 } else {
5682 state.reward_merkle_tree_v2.num_leaves()
5684 };
5685
5686 if leaves > 0 {
5687 tracing::info!("Node's state has reward accounts");
5688 break;
5689 }
5690
5691 retries += 1;
5692 if retries > 120 {
5693 panic!("max retries reached. failed to catchup reward state");
5694 }
5695 }
5696
5697 retries = 0;
5698 loop {
5700 sleep(Duration::from_secs(3)).await;
5701
5702 let bh = client
5703 .get::<u64>("block-state/block-height")
5704 .send()
5705 .await
5706 .expect("block height not found");
5707
5708 tracing::info!("block state: block height={bh}");
5709 if bh > epoch_7_block {
5710 break;
5711 }
5712
5713 retries += 1;
5714 if retries > 30 {
5715 panic!(
5716 "max retries reached. block state block height is less than epoch 7 start \
5717 block"
5718 );
5719 }
5720 }
5721
5722 node_0.shutdown_consensus().await;
5724 let decided_leaf = node_0.decided_leaf().await;
5725 let state = node_0.decided_state().await;
5726 tracing::info!(
5727 height = decided_leaf.height(),
5728 ?decided_leaf,
5729 ?state,
5730 "final state"
5731 );
5732
5733 let height = decided_leaf.height();
5734 let num_leaves = state.block_merkle_tree.num_leaves();
5735 tracing::info!(height, num_leaves, "checking block merkle tree state");
5736 state
5737 .block_merkle_tree
5738 .lookup(height - 1)
5739 .expect_ok()
5740 .unwrap_or_else(|err| {
5741 panic!(
5742 "block state not found ({err:#}):\n{:#?}",
5743 state.block_merkle_tree
5744 )
5745 });
5746
5747 Ok(())
5748 }
5749
5750 #[rstest]
5751 #[case(POS_V3)]
5752 #[case(POS_V4)]
5753 #[test_log::test(tokio::test(flavor = "multi_thread"))]
5754 async fn test_state_reconstruction(#[case] upgrade: Upgrade) -> anyhow::Result<()> {
5755 const EPOCH_HEIGHT: u64 = 10;
5771
5772 let network_config = TestConfigBuilder::default()
5773 .epoch_height(EPOCH_HEIGHT)
5774 .build();
5775
5776 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5777
5778 tracing::info!("API PORT = {api_port}");
5779 const NUM_NODES: usize = 5;
5780
5781 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
5782 let persistence: [_; NUM_NODES] = storage
5783 .iter()
5784 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
5785 .collect::<Vec<_>>()
5786 .try_into()
5787 .unwrap();
5788
5789 let config = TestNetworkConfigBuilder::with_num_nodes()
5790 .api_config(SqlDataSource::options(
5791 &storage[0],
5792 Options::with_port(api_port),
5793 ))
5794 .network_config(network_config)
5795 .persistences(persistence.clone())
5796 .catchups(std::array::from_fn(|_| {
5797 StatePeers::<StaticVersion<0, 1>>::from_urls(
5798 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5799 Default::default(),
5800 Duration::from_secs(2),
5801 &NoMetrics,
5802 )
5803 }))
5804 .pos_hook(
5805 DelegationConfig::MultipleDelegators,
5806 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
5807 upgrade,
5808 )
5809 .await
5810 .unwrap()
5811 .build();
5812 let state = config.states()[0].clone();
5813 let mut network = TestNetwork::new(config, upgrade).await;
5814 network.peers.remove(0);
5819
5820 let node_0_storage = &storage[1];
5821 let node_0_persistence = persistence[1].clone();
5822 let node_0_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
5823 tracing::info!("node_0_port {node_0_port}");
5824 let opt = Options::with_port(node_0_port).query_sql(
5825 Query {
5826 peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
5827 },
5828 tmp_options(node_0_storage),
5829 );
5830 let node_0 = opt
5831 .clone()
5832 .serve(|metrics, consumer, storage| {
5833 let cfg = network.cfg.clone();
5834 let node_0_persistence = node_0_persistence.clone();
5835 let state = state.clone();
5836 async move {
5837 Ok(cfg
5838 .init_node(
5839 1,
5840 state,
5841 node_0_persistence.clone(),
5842 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
5843 vec![format!("http://localhost:{api_port}").parse().unwrap()],
5844 Default::default(),
5845 Duration::from_secs(2),
5846 &NoMetrics,
5847 )),
5848 storage,
5849 &*metrics,
5850 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
5851 consumer,
5852 upgrade,
5853 Default::default(),
5854 )
5855 .await)
5856 }
5857 .boxed()
5858 })
5859 .await
5860 .unwrap();
5861
5862 let mut events = network.peers[2].event_stream().await;
5863 wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
5865
5866 tracing::warn!("shutting down node 0");
5867
5868 node_0.shutdown_consensus().await;
5869
5870 let instance = node_0.node_state();
5871 let state = node_0.decided_state().await;
5872 let fee_accounts = state
5873 .fee_merkle_tree
5874 .clone()
5875 .into_iter()
5876 .map(|(acct, _)| acct)
5877 .collect::<Vec<_>>();
5878 let reward_accounts = match upgrade.base {
5879 EPOCH_VERSION => state
5880 .reward_merkle_tree_v1
5881 .clone()
5882 .into_iter()
5883 .map(|(acct, _)| RewardAccountV2::from(acct))
5884 .collect::<Vec<_>>(),
5885 DRB_AND_HEADER_UPGRADE_VERSION => state
5886 .reward_merkle_tree_v2
5887 .clone()
5888 .into_iter()
5889 .map(|(acct, _)| acct)
5890 .collect::<Vec<_>>(),
5891 _ => panic!("invalid version"),
5892 };
5893
5894 let client: Client<ServerError, SequencerApiVersion> =
5895 Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
5896 client.connect(Some(Duration::from_secs(10))).await;
5897
5898 sleep(Duration::from_secs(3)).await;
5901
5902 tracing::info!("getting node block height");
5903 let node_block_height = client
5904 .get::<u64>("node/block-height")
5905 .send()
5906 .await
5907 .context("getting Espresso block height")
5908 .unwrap();
5909
5910 tracing::info!("node block height={node_block_height}");
5911
5912 let leaf_query_data = client
5913 .get::<LeafQueryData<SeqTypes>>(&format!("availability/leaf/{}", node_block_height - 1))
5914 .send()
5915 .await
5916 .context("error getting leaf")
5917 .unwrap();
5918
5919 tracing::info!("leaf={leaf_query_data:?}");
5920 let leaf = leaf_query_data.leaf();
5921 let to_view = leaf.view_number() + 1;
5922
5923 let ds = SqlStorage::connect(
5924 Config::try_from(&node_0_persistence).unwrap(),
5925 StorageConnectionType::Sequencer,
5926 )
5927 .await
5928 .unwrap();
5929 let mut tx = ds.read().await?;
5930
5931 let (state, leaf) = reconstruct_state(
5932 &instance,
5933 &ds,
5934 &mut tx,
5935 node_block_height - 1,
5936 to_view,
5937 &[],
5938 &[],
5939 )
5940 .await
5941 .unwrap();
5942 assert_eq!(leaf.view_number(), to_view);
5943 assert!(
5944 state
5945 .block_merkle_tree
5946 .lookup(node_block_height - 1)
5947 .expect_ok()
5948 .is_ok(),
5949 "inconsistent block merkle tree"
5950 );
5951
5952 let (state, leaf) = reconstruct_state(
5954 &instance,
5955 &ds,
5956 &mut tx,
5957 node_block_height - 1,
5958 to_view,
5959 &fee_accounts,
5960 &[],
5961 )
5962 .await
5963 .unwrap();
5964
5965 assert_eq!(leaf.view_number(), to_view);
5966 assert!(
5967 state
5968 .block_merkle_tree
5969 .lookup(node_block_height - 1)
5970 .expect_ok()
5971 .is_ok(),
5972 "inconsistent block merkle tree"
5973 );
5974
5975 for account in &fee_accounts {
5976 state.fee_merkle_tree.lookup(account).expect_ok().unwrap();
5977 }
5978
5979 let (state, leaf) = reconstruct_state(
5982 &instance,
5983 &ds,
5984 &mut tx,
5985 node_block_height - 1,
5986 to_view,
5987 &[],
5988 &reward_accounts,
5989 )
5990 .await
5991 .unwrap();
5992
5993 match upgrade.base {
5994 EPOCH_VERSION => {
5995 for account in reward_accounts.clone() {
5996 state
5997 .reward_merkle_tree_v1
5998 .lookup(RewardAccountV1::from(account))
5999 .expect_ok()
6000 .unwrap();
6001 }
6002 },
6003 DRB_AND_HEADER_UPGRADE_VERSION => {
6004 for account in &reward_accounts {
6005 state
6006 .reward_merkle_tree_v2
6007 .lookup(account)
6008 .expect_ok()
6009 .unwrap();
6010 }
6011 },
6012 _ => panic!("invalid version"),
6013 };
6014
6015 assert_eq!(leaf.view_number(), to_view);
6016 assert!(
6017 state
6018 .block_merkle_tree
6019 .lookup(node_block_height - 1)
6020 .expect_ok()
6021 .is_ok(),
6022 "inconsistent block merkle tree"
6023 );
6024 let (state, leaf) = reconstruct_state(
6027 &instance,
6028 &ds,
6029 &mut tx,
6030 node_block_height - 1,
6031 to_view,
6032 &fee_accounts,
6033 &reward_accounts,
6034 )
6035 .await
6036 .unwrap();
6037
6038 assert!(
6039 state
6040 .block_merkle_tree
6041 .lookup(node_block_height - 1)
6042 .expect_ok()
6043 .is_ok(),
6044 "inconsistent block merkle tree"
6045 );
6046 assert_eq!(leaf.view_number(), to_view);
6047
6048 match upgrade.base {
6049 EPOCH_VERSION => {
6050 for account in reward_accounts.clone() {
6051 state
6052 .reward_merkle_tree_v1
6053 .lookup(RewardAccountV1::from(account))
6054 .expect_ok()
6055 .unwrap();
6056 }
6057 },
6058 DRB_AND_HEADER_UPGRADE_VERSION => {
6059 for account in &reward_accounts {
6060 state
6061 .reward_merkle_tree_v2
6062 .lookup(account)
6063 .expect_ok()
6064 .unwrap();
6065 }
6066 },
6067 _ => panic!("invalid version"),
6068 };
6069
6070 for account in &fee_accounts {
6071 state.fee_merkle_tree.lookup(account).expect_ok().unwrap();
6072 }
6073
6074 Ok(())
6075 }
6076
6077 #[rstest]
6078 #[case(POS_V3)]
6079 #[case(POS_V4)]
6080 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6081 async fn test_block_reward_api(#[case] upgrade: Upgrade) -> anyhow::Result<()> {
6082 let epoch_height = 10;
6083
6084 let network_config = TestConfigBuilder::default()
6085 .epoch_height(epoch_height)
6086 .build();
6087
6088 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6089
6090 const NUM_NODES: usize = 1;
6091 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6093 let persistence: [_; NUM_NODES] = storage
6094 .iter()
6095 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6096 .collect::<Vec<_>>()
6097 .try_into()
6098 .unwrap();
6099
6100 let config = TestNetworkConfigBuilder::with_num_nodes()
6101 .api_config(SqlDataSource::options(
6102 &storage[0],
6103 Options::with_port(api_port),
6104 ))
6105 .network_config(network_config.clone())
6106 .persistences(persistence.clone())
6107 .catchups(std::array::from_fn(|_| {
6108 StatePeers::<StaticVersion<0, 1>>::from_urls(
6109 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6110 Default::default(),
6111 Duration::from_secs(2),
6112 &NoMetrics,
6113 )
6114 }))
6115 .pos_hook(
6116 DelegationConfig::VariableAmounts,
6117 Default::default(),
6118 upgrade,
6119 )
6120 .await
6121 .unwrap()
6122 .build();
6123
6124 let _network = TestNetwork::new(config, upgrade).await;
6125 let client: Client<ServerError, SequencerApiVersion> =
6126 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6127
6128 let _blocks = client
6129 .socket("availability/stream/blocks/0")
6130 .subscribe::<BlockQueryData<SeqTypes>>()
6131 .await
6132 .unwrap()
6133 .take(3)
6134 .try_collect::<Vec<_>>()
6135 .await
6136 .unwrap();
6137
6138 let block_reward = client
6139 .get::<Option<RewardAmount>>("node/block-reward")
6140 .send()
6141 .await
6142 .expect("failed to get block reward")
6143 .expect("block reward is None");
6144 tracing::info!("block_reward={block_reward:?}");
6145
6146 assert!(block_reward.0 > U256::ZERO);
6147
6148 Ok(())
6149 }
6150
6151 #[rstest]
6153 #[case(POS_V4, None)]
6154 #[case(POS_V4, Some(1u64))]
6155 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6156 async fn test_token_supply_api(
6157 #[case] upgrade: Upgrade,
6158 #[case] chain_id: Option<u64>,
6159 ) -> anyhow::Result<()> {
6160 use alloy::primitives::utils::parse_ether;
6161 use espresso_types::v0_3::ChainConfig;
6162
6163 let epoch_height = 10;
6164 let network_config = TestConfigBuilder::default()
6165 .epoch_height(epoch_height)
6166 .build();
6167
6168 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6169
6170 const NUM_NODES: usize = 1;
6171 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6172 let persistence: [_; NUM_NODES] = storage
6173 .iter()
6174 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6175 .collect::<Vec<_>>()
6176 .try_into()
6177 .unwrap();
6178
6179 let initial_supply_tokens = U256::from(3_590_000_000u64);
6182 let initial_supply_wei = parse_ether("3590000000").unwrap();
6183
6184 let mut builder = TestNetworkConfigBuilder::with_num_nodes()
6185 .api_config(SqlDataSource::options(
6186 &storage[0],
6187 Options::with_port(api_port),
6188 ))
6189 .network_config(network_config.clone())
6190 .persistences(persistence.clone())
6191 .catchups(std::array::from_fn(|_| {
6192 StatePeers::<StaticVersion<0, 1>>::from_urls(
6193 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6194 Default::default(),
6195 Duration::from_secs(2),
6196 &NoMetrics,
6197 )
6198 }))
6199 .initial_token_supply(initial_supply_tokens);
6200
6201 if let Some(id) = chain_id {
6203 let state = ValidatedState {
6204 chain_config: ChainConfig {
6205 chain_id: U256::from(id).into(),
6206 ..Default::default()
6207 }
6208 .into(),
6209 ..Default::default()
6210 };
6211 builder = builder.states(std::array::from_fn(|_| state.clone()));
6212 }
6213
6214 let config = builder
6215 .pos_hook(
6216 DelegationConfig::VariableAmounts,
6217 Default::default(),
6218 upgrade,
6219 )
6220 .await
6221 .unwrap()
6222 .build();
6223
6224 let _network = TestNetwork::new(config, upgrade).await;
6225 let client: Client<ServerError, SequencerApiVersion> =
6226 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6227
6228 let _blocks = client
6229 .socket("availability/stream/blocks/0")
6230 .subscribe::<BlockQueryData<SeqTypes>>()
6231 .await
6232 .unwrap()
6233 .take(3)
6234 .try_collect::<Vec<_>>()
6235 .await
6236 .unwrap();
6237
6238 let minted: String = client
6239 .get("token/total-minted-supply")
6240 .send()
6241 .await
6242 .expect("total-minted-supply");
6243 let circ_eth: String = client
6244 .get("token/circulating-supply-ethereum")
6245 .send()
6246 .await
6247 .expect("circulating-supply-ethereum");
6248 let circulating: String = client
6249 .get("token/circulating-supply")
6250 .send()
6251 .await
6252 .expect("circulating-supply");
6253 tracing::info!(%minted, %circ_eth, %circulating);
6254
6255 let minted = parse_ether(&minted)?;
6256 let circ_eth = parse_ether(&circ_eth)?;
6257 let circ = parse_ether(&circulating)?;
6258
6259 assert_eq!(minted, initial_supply_wei);
6260 assert!(circ_eth <= minted);
6261 assert!(circ >= circ_eth);
6262 assert!(circ > U256::ZERO);
6263
6264 if chain_id == Some(1) {
6265 assert!(circ_eth < minted);
6268 }
6269
6270 Ok(())
6271 }
6272
6273 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6274 async fn test_scanning_token_contract_initialized_event() -> anyhow::Result<()> {
6275 use espresso_types::v0_3::ChainConfig;
6276
6277 let blocks_per_epoch = 10;
6278
6279 let network_config = TestConfigBuilder::<1>::default()
6280 .epoch_height(blocks_per_epoch)
6281 .build();
6282
6283 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
6284 &network_config.hotshot_config().hotshot_stake_table(),
6285 STAKE_TABLE_CAPACITY_FOR_TEST,
6286 )
6287 .unwrap();
6288
6289 let deployer = ProviderBuilder::new()
6290 .wallet(EthereumWallet::from(network_config.signer().clone()))
6291 .connect_http(network_config.l1_url().clone());
6292
6293 let mut contracts = Contracts::new();
6294 let args = DeployerArgsBuilder::default()
6295 .deployer(deployer.clone())
6296 .rpc_url(network_config.l1_url().clone())
6297 .mock_light_client(true)
6298 .genesis_lc_state(genesis_state)
6299 .genesis_st_state(genesis_stake)
6300 .blocks_per_epoch(blocks_per_epoch)
6301 .epoch_start_block(1)
6302 .multisig_pauser(network_config.signer().address())
6303 .token_name("Espresso".to_string())
6304 .token_symbol("ESP".to_string())
6305 .initial_token_supply(U256::from(3590000000u64))
6306 .ops_timelock_delay(U256::from(0))
6307 .ops_timelock_admin(network_config.signer().address())
6308 .ops_timelock_proposers(vec![network_config.signer().address()])
6309 .ops_timelock_executors(vec![network_config.signer().address()])
6310 .safe_exit_timelock_delay(U256::from(0))
6311 .safe_exit_timelock_admin(network_config.signer().address())
6312 .safe_exit_timelock_proposers(vec![network_config.signer().address()])
6313 .safe_exit_timelock_executors(vec![network_config.signer().address()])
6314 .build()
6315 .unwrap();
6316
6317 args.deploy_all(&mut contracts).await.unwrap();
6318
6319 let st_addr = contracts
6320 .address(Contract::StakeTableProxy)
6321 .expect("StakeTableProxy deployed");
6322
6323 let l1_url = network_config.l1_url().clone();
6324
6325 let storage = SqlDataSource::create_storage().await;
6326 let mut opt = <SqlDataSource as TestableSequencerDataSource>::persistence_options(&storage);
6327 let persistence = opt.create().await.unwrap();
6328
6329 let l1_client = L1ClientOptions {
6330 stake_table_update_interval: Duration::from_secs(7),
6331 l1_retry_delay: Duration::from_millis(10),
6332 l1_events_max_block_range: 10000,
6333 ..Default::default()
6334 }
6335 .connect(vec![l1_url])
6336 .unwrap();
6337 l1_client.spawn_tasks().await;
6338
6339 let fetcher = Fetcher::new(
6340 Arc::new(NullStateCatchup::default()),
6341 Arc::new(Mutex::new(persistence.clone())),
6342 l1_client.clone(),
6343 ChainConfig {
6344 stake_table_contract: Some(st_addr),
6345 base_fee: 0.into(),
6346 ..Default::default()
6347 },
6348 );
6349
6350 let provider = l1_client.provider;
6351 let stake_table = StakeTableV2::new(st_addr, provider.clone());
6352
6353 let stake_table_init_block = stake_table
6354 .initializedAtBlock()
6355 .block(BlockId::finalized())
6356 .call()
6357 .await?
6358 .to::<u64>();
6359
6360 tracing::info!("stake table init block = {stake_table_init_block}");
6361
6362 let token_address = stake_table
6363 .token()
6364 .block(BlockId::finalized())
6365 .call()
6366 .await
6367 .context("Failed to get token address")?;
6368
6369 let token = EspToken::new(token_address, provider.clone());
6370
6371 let init_log = fetcher
6372 .scan_token_contract_initialized_event_log(stake_table_init_block, token.clone())
6373 .await
6374 .unwrap();
6375
6376 let init_block = init_log.block_number.context("missing block number")?;
6377 let init_tx_hash = init_log
6378 .transaction_hash
6379 .context("missing transaction hash")?;
6380
6381 let transfer_logs = token
6382 .Transfer_filter()
6383 .from_block(init_block)
6384 .to_block(init_block)
6385 .query()
6386 .await
6387 .unwrap();
6388
6389 let (mint_transfer, _) = transfer_logs
6390 .iter()
6391 .find(|(transfer, log)| {
6392 log.transaction_hash == Some(init_tx_hash) && transfer.from == Address::ZERO
6393 })
6394 .context("no mint transfer event in init tx")?;
6395
6396 assert!(mint_transfer.value > U256::ZERO);
6397
6398 Ok(())
6399 }
6400
6401 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6402 async fn test_tx_metadata() {
6403 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6404
6405 let url = format!("http://localhost:{port}").parse().unwrap();
6406 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
6407
6408 let storage = SqlDataSource::create_storage().await;
6409 let network_config = TestConfigBuilder::default().build();
6410 let config = TestNetworkConfigBuilder::default()
6411 .api_config(
6412 SqlDataSource::options(&storage, Options::with_port(port))
6413 .submit(Default::default())
6414 .explorer(Default::default()),
6415 )
6416 .network_config(network_config)
6417 .build();
6418 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
6419 let mut events = network.server.event_stream().await;
6420
6421 client.connect(None).await;
6422
6423 let namespace_counts = [(101, 1), (102, 2), (103, 3)];
6425 for (ns, count) in &namespace_counts {
6426 for i in 0..*count {
6427 let ns_id = NamespaceId::from(*ns as u64);
6428 let txn = Transaction::new(ns_id, vec![*ns, i]);
6429 client
6430 .post::<()>("submit/submit")
6431 .body_json(&txn)
6432 .unwrap()
6433 .send()
6434 .await
6435 .unwrap();
6436 let (block, _) = wait_for_decide_on_handle(&mut events, &txn).await;
6437
6438 let summary: BlockSummaryQueryData<SeqTypes> = client
6440 .get(&format!("availability/block/summary/{block}"))
6441 .send()
6442 .await
6443 .unwrap();
6444 let ns_info = summary.namespaces();
6445 assert_eq!(ns_info.len(), 1);
6446 assert_eq!(ns_info.keys().copied().collect::<Vec<_>>(), vec![ns_id]);
6447 assert_eq!(ns_info[&ns_id].num_transactions, 1);
6448 assert_eq!(ns_info[&ns_id].size, txn.size_in_block(true));
6449 }
6450 }
6451
6452 for (ns, count) in &namespace_counts {
6454 tracing::info!(ns, "list transactions in namespace");
6455
6456 let ns_id = NamespaceId::from(*ns as u64);
6457 let summaries: TransactionSummariesResponse<SeqTypes> = client
6458 .get(&format!(
6459 "explorer/transactions/latest/{count}/namespace/{ns_id}"
6460 ))
6461 .send()
6462 .await
6463 .unwrap();
6464 let txs = summaries.transaction_summaries;
6465 assert_eq!(txs.len(), *count as usize);
6466
6467 for i in 0..*count {
6469 let summary = &txs[i as usize];
6470 let expected = Transaction::new(ns_id, vec![*ns, count - i - 1]);
6471 assert_eq!(summary.rollups, vec![ns_id]);
6472 assert_eq!(summary.hash, expected.commit());
6473 }
6474 }
6475 }
6476
6477 use std::time::Instant;
6478
6479 use rand::thread_rng;
6480
6481 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6482 async fn test_aggregator_namespace_endpoints() {
6483 let mut rng = thread_rng();
6484
6485 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6486
6487 let url = format!("http://localhost:{port}").parse().unwrap();
6488 tracing::info!("Sequencer URL = {url}");
6489 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
6490
6491 let options = Options::with_port(port).submit(Default::default());
6492 const NUM_NODES: usize = 2;
6493 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6495
6496 let persistence_options: [_; NUM_NODES] = storage
6497 .iter()
6498 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6499 .collect::<Vec<_>>()
6500 .try_into()
6501 .unwrap();
6502
6503 let network_config = TestConfigBuilder::default().build();
6504
6505 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
6506 .api_config(SqlDataSource::options(&storage[0], options))
6507 .network_config(network_config)
6508 .persistences(persistence_options.clone())
6509 .build();
6510 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
6511 let mut events = network.server.event_stream().await;
6512 let start = Instant::now();
6513 let mut total_transactions = 0;
6514 let mut tx_heights = Vec::new();
6515 let mut sizes = HashMap::new();
6516 for namespace in 1..=4 {
6519 for _count in 0..namespace {
6520 let payload_len = rng.gen_range(4..=10);
6522 let payload: Vec<u8> = (0..payload_len).map(|_| rng.r#gen()).collect();
6523
6524 let txn = Transaction::new(NamespaceId::from(namespace as u32), payload);
6525
6526 client.connect(None).await;
6527
6528 let hash = client
6529 .post("submit/submit")
6530 .body_json(&txn)
6531 .unwrap()
6532 .send()
6533 .await
6534 .unwrap();
6535 assert_eq!(txn.commit(), hash);
6536
6537 let (height, size) = wait_for_decide_on_handle(&mut events, &txn).await;
6539 tx_heights.push(height);
6540 total_transactions += 1;
6541 *sizes.entry(namespace).or_insert(0) += size;
6542 }
6543 }
6544
6545 let duration = start.elapsed();
6546
6547 println!("Time elapsed to submit transactions: {duration:?}");
6548
6549 let last_tx_height = tx_heights.last().unwrap();
6550 for namespace in 1..=4 {
6551 let count = client
6552 .get::<u64>(&format!("node/transactions/count/namespace/{namespace}"))
6553 .send()
6554 .await
6555 .unwrap();
6556 assert_eq!(
6557 count, namespace as u64,
6558 "Incorrect transaction count for namespace {namespace}: expected {namespace}, got \
6559 {count}"
6560 );
6561
6562 let to_endpoint_count = client
6564 .get::<u64>(&format!(
6565 "node/transactions/count/namespace/{namespace}/{last_tx_height}"
6566 ))
6567 .send()
6568 .await
6569 .unwrap();
6570 assert_eq!(
6571 to_endpoint_count, namespace as u64,
6572 "Incorrect transaction count for range endpoint (to only) for namespace \
6573 {namespace}: expected {namespace}, got {to_endpoint_count}"
6574 );
6575
6576 let from_to_endpoint_count = client
6578 .get::<u64>(&format!(
6579 "node/transactions/count/namespace/{namespace}/0/{last_tx_height}"
6580 ))
6581 .send()
6582 .await
6583 .unwrap();
6584 assert_eq!(
6585 from_to_endpoint_count, namespace as u64,
6586 "Incorrect transaction count for range endpoint (from-to) for namespace \
6587 {namespace}: expected {namespace}, got {from_to_endpoint_count}"
6588 );
6589
6590 let ns_size = client
6591 .get::<usize>(&format!("node/payloads/size/namespace/{namespace}"))
6592 .send()
6593 .await
6594 .unwrap();
6595
6596 let expected_ns_size = *sizes.get(&namespace).unwrap();
6597 assert_eq!(
6598 ns_size, expected_ns_size,
6599 "Incorrect payload size for namespace {namespace}: expected {expected_ns_size}, \
6600 got {ns_size}"
6601 );
6602
6603 let ns_size_to = client
6604 .get::<usize>(&format!(
6605 "node/payloads/size/namespace/{namespace}/{last_tx_height}"
6606 ))
6607 .send()
6608 .await
6609 .unwrap();
6610 assert_eq!(
6611 ns_size_to, expected_ns_size,
6612 "Incorrect payload size for namespace {namespace} up to height {last_tx_height}: \
6613 expected {expected_ns_size}, got {ns_size_to}"
6614 );
6615
6616 let ns_size_from_to = client
6617 .get::<usize>(&format!(
6618 "node/payloads/size/namespace/{namespace}/0/{last_tx_height}"
6619 ))
6620 .send()
6621 .await
6622 .unwrap();
6623 assert_eq!(
6624 ns_size_from_to, expected_ns_size,
6625 "Incorrect payload size for namespace {namespace} from 0 to height \
6626 {last_tx_height}: expected {expected_ns_size}, got {ns_size_from_to}"
6627 );
6628 }
6629
6630 let total_tx_count = client
6631 .get::<u64>("node/transactions/count")
6632 .send()
6633 .await
6634 .unwrap();
6635 assert_eq!(
6636 total_tx_count, total_transactions,
6637 "Incorrect total transaction count: expected {total_transactions}, got \
6638 {total_tx_count}"
6639 );
6640
6641 let total_payload_size = client
6642 .get::<usize>("node/payloads/size")
6643 .send()
6644 .await
6645 .unwrap();
6646
6647 let expected_total_size: usize = sizes.values().copied().sum();
6648 assert_eq!(
6649 total_payload_size, expected_total_size,
6650 "Incorrect total payload size: expected {expected_total_size}, got \
6651 {total_payload_size}"
6652 );
6653 }
6654
6655 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6656 async fn test_stream_transactions_endpoint() {
6657 let mut rng = thread_rng();
6663
6664 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6665
6666 let url = format!("http://localhost:{port}").parse().unwrap();
6667 tracing::info!("Sequencer URL = {url}");
6668 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
6669
6670 let options = Options::with_port(port).submit(Default::default());
6671 const NUM_NODES: usize = 2;
6672 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6674
6675 let persistence_options: [_; NUM_NODES] = storage
6676 .iter()
6677 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6678 .collect::<Vec<_>>()
6679 .try_into()
6680 .unwrap();
6681
6682 let network_config = TestConfigBuilder::default().build();
6683
6684 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
6685 .api_config(SqlDataSource::options(&storage[0], options))
6686 .network_config(network_config)
6687 .persistences(persistence_options.clone())
6688 .build();
6689 let network = TestNetwork::new(config, MOCK_SEQUENCER_VERSIONS).await;
6690 let mut events = network.server.event_stream().await;
6691 let mut all_transactions = HashMap::new();
6692 let mut namespace_tx: HashMap<_, HashSet<_>> = HashMap::new();
6693
6694 for namespace in 1..=4 {
6697 for _count in 0..namespace {
6698 let payload_len = rng.gen_range(4..=10);
6699 let payload: Vec<u8> = (0..payload_len).map(|_| rng.r#gen()).collect();
6700
6701 let txn = Transaction::new(NamespaceId::from(namespace as u32), payload);
6702
6703 client.connect(None).await;
6704
6705 let hash = client
6706 .post("submit/submit")
6707 .body_json(&txn)
6708 .unwrap()
6709 .send()
6710 .await
6711 .unwrap();
6712 assert_eq!(txn.commit(), hash);
6713
6714 wait_for_decide_on_handle(&mut events, &txn).await;
6716 all_transactions.insert(txn.commit(), txn.clone());
6719 namespace_tx.entry(namespace).or_default().insert(txn);
6720 }
6721 }
6722
6723 let mut transactions = client
6724 .socket("availability/stream/transactions/0")
6725 .subscribe::<TransactionQueryData<SeqTypes>>()
6726 .await
6727 .expect("failed to subscribe to transactions endpoint");
6728
6729 let mut count = 0;
6730 while let Some(tx) = transactions.next().await {
6731 let tx = tx.unwrap();
6732 let expected = all_transactions
6733 .get(&tx.transaction().commit())
6734 .expect("txn not found ");
6735 assert_eq!(tx.transaction(), expected, "invalid transaction");
6736 count += 1;
6737
6738 if count == all_transactions.len() {
6739 break;
6740 }
6741 }
6742
6743 for (namespace, expected_ns_txns) in &namespace_tx {
6746 let mut api_namespace_txns = client
6747 .socket(&format!(
6748 "availability/stream/transactions/0/namespace/{namespace}",
6749 ))
6750 .subscribe::<TransactionQueryData<SeqTypes>>()
6751 .await
6752 .unwrap_or_else(|_| {
6753 panic!("failed to subscribe to transactions namespace {namespace}")
6754 });
6755
6756 let mut received = HashSet::new();
6757
6758 while let Some(res) = api_namespace_txns.next().await {
6759 let tx = res.expect("stream error");
6760 received.insert(tx.transaction().clone());
6761
6762 if received.len() == expected_ns_txns.len() {
6763 break;
6764 }
6765 }
6766
6767 assert_eq!(
6768 received, *expected_ns_txns,
6769 "Mismatched transactions for namespace {namespace}"
6770 );
6771 }
6772 }
6773
6774 #[rstest]
6775 #[case(POS_V3)]
6776 #[case(POS_V4)]
6777 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6778 async fn test_v3_and_v4_reward_tree_updates(#[case] upgrade: Upgrade) -> anyhow::Result<()> {
6779 const EPOCH_HEIGHT: u64 = 10;
6789
6790 let network_config = TestConfigBuilder::default()
6791 .epoch_height(EPOCH_HEIGHT)
6792 .build();
6793
6794 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6795
6796 tracing::info!("API PORT = {api_port}");
6797 const NUM_NODES: usize = 5;
6798
6799 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6800 let persistence: [_; NUM_NODES] = storage
6801 .iter()
6802 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6803 .collect::<Vec<_>>()
6804 .try_into()
6805 .unwrap();
6806
6807 let config = TestNetworkConfigBuilder::with_num_nodes()
6808 .api_config(SqlDataSource::options(
6809 &storage[0],
6810 Options::with_port(api_port).catchup(Default::default()),
6811 ))
6812 .network_config(network_config)
6813 .persistences(persistence.clone())
6814 .catchups(std::array::from_fn(|_| {
6815 StatePeers::<StaticVersion<0, 1>>::from_urls(
6816 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6817 Default::default(),
6818 Duration::from_secs(2),
6819 &NoMetrics,
6820 )
6821 }))
6822 .pos_hook(
6823 DelegationConfig::MultipleDelegators,
6824 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
6825 upgrade,
6826 )
6827 .await
6828 .unwrap()
6829 .build();
6830 let mut network = TestNetwork::new(config, upgrade).await;
6831
6832 let mut events = network.peers[2].event_stream().await;
6833 wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
6835
6836 let validated_state = network.server.decided_state().await;
6837 if upgrade.base == EPOCH_VERSION {
6838 let v1_tree = &validated_state.reward_merkle_tree_v1;
6839 assert!(v1_tree.num_leaves() > 0, "v1 reward tree tree is empty");
6840 let v2_tree = &validated_state.reward_merkle_tree_v2;
6841 assert!(
6842 v2_tree.num_leaves() == 0,
6843 "v2 reward tree tree is not empty"
6844 );
6845 } else {
6846 let v1_tree = &validated_state.reward_merkle_tree_v1;
6847 assert!(
6848 v1_tree.num_leaves() == 0,
6849 "v1 reward tree tree is not empty"
6850 );
6851 let v2_tree = &validated_state.reward_merkle_tree_v2;
6852 assert!(v2_tree.num_leaves() > 0, "v2 reward tree tree is empty");
6853 }
6854
6855 network.stop_consensus().await;
6856 Ok(())
6857 }
6858
6859 #[rstest]
6860 #[case(POS_V3)]
6861 #[case(POS_V4)]
6862 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6863 pub(crate) async fn test_state_cert_query(#[case] upgrade: Upgrade) {
6864 const TEST_EPOCH_HEIGHT: u64 = 10;
6865 const TEST_EPOCHS: u64 = 5;
6866
6867 let network_config = TestConfigBuilder::default()
6868 .epoch_height(TEST_EPOCH_HEIGHT)
6869 .build();
6870
6871 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
6872
6873 tracing::info!("API PORT = {api_port}");
6874 const NUM_NODES: usize = 2;
6875
6876 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
6877 let persistence: [_; NUM_NODES] = storage
6878 .iter()
6879 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
6880 .collect::<Vec<_>>()
6881 .try_into()
6882 .unwrap();
6883
6884 let config = TestNetworkConfigBuilder::with_num_nodes()
6885 .api_config(SqlDataSource::options(
6886 &storage[0],
6887 Options::with_port(api_port).catchup(Default::default()),
6888 ))
6889 .network_config(network_config)
6890 .persistences(persistence.clone())
6891 .catchups(std::array::from_fn(|_| {
6892 StatePeers::<StaticVersion<0, 1>>::from_urls(
6893 vec![format!("http://localhost:{api_port}").parse().unwrap()],
6894 Default::default(),
6895 Duration::from_secs(2),
6896 &NoMetrics,
6897 )
6898 }))
6899 .pos_hook(
6900 DelegationConfig::MultipleDelegators,
6901 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
6902 upgrade,
6903 )
6904 .await
6905 .unwrap()
6906 .build();
6907
6908 let network = TestNetwork::new(config, upgrade).await;
6909 let mut events = network.server.event_stream().await;
6910
6911 loop {
6913 let event = events.next().await.unwrap();
6914 tracing::info!("Received event from handle: {event:?}");
6915
6916 if let hotshot::types::EventType::Decide { leaf_chain, .. } = event.event {
6917 println!(
6918 "Decide event received: {:?}",
6919 leaf_chain.first().unwrap().leaf.height()
6920 );
6921 if let Some(first_leaf) = leaf_chain.first() {
6922 let height = first_leaf.leaf.height();
6923 tracing::info!("Decide event received at height: {height}");
6924
6925 if height >= TEST_EPOCHS * TEST_EPOCH_HEIGHT {
6926 break;
6927 }
6928 }
6929 }
6930 }
6931
6932 let client: Client<ServerError, StaticVersion<0, 1>> =
6934 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
6935 client.connect(Some(Duration::from_secs(10))).await;
6936
6937 for i in 3..=TEST_EPOCHS {
6939 let state_query_data_v2 = client
6942 .get::<StateCertQueryDataV2<SeqTypes>>(&format!("availability/state-cert-v2/{i}"))
6943 .send()
6944 .await
6945 .unwrap();
6946 let state_cert_v2 = state_query_data_v2.0.clone();
6947 tracing::info!("state_cert_v2: {state_cert_v2:?}");
6948 assert_eq!(state_cert_v2.epoch.u64(), i);
6949 assert_eq!(
6950 state_cert_v2.light_client_state.block_height,
6951 i * TEST_EPOCH_HEIGHT - 5
6952 );
6953 let block_height = state_cert_v2.light_client_state.block_height;
6954
6955 let header: Header = client
6956 .get(&format!("availability/header/{block_height}"))
6957 .send()
6958 .await
6959 .unwrap();
6960
6961 if header.version() == DRB_AND_HEADER_UPGRADE_VERSION {
6963 let auth_root = state_cert_v2.auth_root;
6964 let header_auth_root = header.auth_root().unwrap();
6965 if auth_root.is_zero() || header_auth_root.is_zero() {
6966 panic!("auth root shouldn't be zero");
6967 }
6968
6969 assert_eq!(auth_root, header_auth_root, "auth root mismatch");
6970 }
6971
6972 let state_query_data_v1 = client
6974 .get::<StateCertQueryDataV1<SeqTypes>>(&format!("availability/state-cert/{i}"))
6975 .send()
6976 .await
6977 .unwrap();
6978
6979 let state_cert_v1 = state_query_data_v1.0.clone();
6980 tracing::info!("state_cert_v1: {state_cert_v1:?}");
6981 assert_eq!(state_query_data_v1, state_query_data_v2.into());
6982 }
6983 }
6984
6985 #[rstest]
6991 #[case(POS_V3)]
6992 #[case(POS_V4)]
6993 #[test_log::test(tokio::test(flavor = "multi_thread"))]
6994 pub(crate) async fn test_state_cert_catchup(#[case] upgrade: Upgrade) {
6995 const EPOCH_HEIGHT: u64 = 10;
6996
6997 let network_config = TestConfigBuilder::default()
6998 .epoch_height(EPOCH_HEIGHT)
6999 .build();
7000
7001 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7002
7003 tracing::info!("API PORT = {api_port}");
7004 const NUM_NODES: usize = 5;
7005
7006 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7007 let persistence: [_; NUM_NODES] = storage
7008 .iter()
7009 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7010 .collect::<Vec<_>>()
7011 .try_into()
7012 .unwrap();
7013
7014 let config = TestNetworkConfigBuilder::with_num_nodes()
7015 .api_config(SqlDataSource::options(
7016 &storage[0],
7017 Options::with_port(api_port),
7018 ))
7019 .network_config(network_config)
7020 .persistences(persistence.clone())
7021 .catchups(std::array::from_fn(|_| {
7022 StatePeers::<StaticVersion<0, 1>>::from_urls(
7023 vec![format!("http://localhost:{api_port}").parse().unwrap()],
7024 Default::default(),
7025 Duration::from_secs(2),
7026 &NoMetrics,
7027 )
7028 }))
7029 .pos_hook(
7030 DelegationConfig::MultipleDelegators,
7031 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
7032 upgrade,
7033 )
7034 .await
7035 .unwrap()
7036 .build();
7037 let state = config.states()[0].clone();
7038 let mut network = TestNetwork::new(config, upgrade).await;
7039
7040 let mut events = network.peers[2].event_stream().await;
7041 wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
7043
7044 network.peers.remove(0);
7049
7050 let new_storage: hotshot_query_service::data_source::sql::testing::TmpDb =
7051 SqlDataSource::create_storage().await;
7052 let new_persistence: persistence::sql::Options =
7053 <SqlDataSource as TestableSequencerDataSource>::persistence_options(&new_storage);
7054
7055 let node_0_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7056 tracing::info!("node_0_port {node_0_port}");
7057 let opt = Options::with_port(node_0_port).query_sql(
7058 Query {
7059 peers: vec![format!("http://localhost:{api_port}").parse().unwrap()],
7060 },
7061 tmp_options(&new_storage),
7062 );
7063 let node_0 = opt
7064 .clone()
7065 .serve(|metrics, consumer, storage| {
7066 let cfg = network.cfg.clone();
7067 let new_persistence = new_persistence.clone();
7068 let state = state.clone();
7069 async move {
7070 Ok(cfg
7071 .init_node(
7072 1,
7073 state,
7074 new_persistence.clone(),
7075 Some(StatePeers::<StaticVersion<0, 1>>::from_urls(
7076 vec![format!("http://localhost:{api_port}").parse().unwrap()],
7077 Default::default(),
7078 Duration::from_secs(2),
7079 &NoMetrics,
7080 )),
7081 storage,
7082 &*metrics,
7083 test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST,
7084 consumer,
7085 upgrade,
7086 Default::default(),
7087 )
7088 .await)
7089 }
7090 .boxed()
7091 })
7092 .await
7093 .unwrap();
7094
7095 let mut events = node_0.event_stream().await;
7096 wait_for_epochs(&mut events, EPOCH_HEIGHT, 5).await;
7098
7099 let client: Client<ServerError, StaticVersion<0, 1>> =
7100 Client::new(format!("http://localhost:{node_0_port}").parse().unwrap());
7101 client.connect(Some(Duration::from_secs(60))).await;
7102
7103 for epoch in 3..=5 {
7104 let state_cert = client
7105 .get::<StateCertQueryDataV2<SeqTypes>>(&format!(
7106 "availability/state-cert-v2/{epoch}"
7107 ))
7108 .send()
7109 .await
7110 .unwrap();
7111 assert_eq!(state_cert.0.epoch.u64(), epoch);
7112 }
7113 }
7114
7115 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7116 async fn test_integration_commission_updates() -> anyhow::Result<()> {
7117 const NUM_NODES: usize = 3;
7118 const EPOCH_HEIGHT: u64 = 10;
7119
7120 let versions = POS_V4;
7122
7123 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7124
7125 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7127 let persistence: [_; NUM_NODES] = storage
7128 .iter()
7129 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7130 .collect::<Vec<_>>()
7131 .try_into()
7132 .unwrap();
7133
7134 let network_config = TestConfigBuilder::default()
7136 .epoch_height(EPOCH_HEIGHT)
7137 .build();
7138
7139 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
7141 .api_config(SqlDataSource::options(
7142 &storage[0],
7143 Options::with_port(api_port),
7144 ))
7145 .network_config(network_config.clone())
7146 .persistences(persistence.clone())
7147 .catchups(std::array::from_fn(|_| {
7148 StatePeers::<SequencerApiVersion>::from_urls(
7149 vec![format!("http://localhost:{api_port}").parse().unwrap()],
7150 Default::default(),
7151 Duration::from_secs(2),
7152 &NoMetrics,
7153 )
7154 }))
7155 .pos_hook(
7156 DelegationConfig::NoSelfDelegation,
7158 StakeTableContractVersion::V1, POS_V4,
7160 )
7161 .await
7162 .unwrap()
7163 .build();
7164
7165 let network = TestNetwork::new(config, versions).await;
7166 let provider = network.cfg.anvil().unwrap();
7167 let deployer_addr = network.cfg.signer().address();
7168 let mut contracts = network.contracts.unwrap();
7169 let st_addr = contracts.address(Contract::StakeTableProxy).unwrap();
7170 upgrade_stake_table_v2(
7171 provider,
7172 L1Client::new(vec![network.cfg.l1_url()])?,
7173 &mut contracts,
7174 deployer_addr,
7175 deployer_addr,
7176 )
7177 .await?;
7178
7179 let mut commissions = vec![];
7180 for (i, (validator, provider)) in
7181 network_config.validator_providers().into_iter().enumerate()
7182 {
7183 let commission = fetch_commission(provider.clone(), st_addr, validator).await?;
7184 let new_commission = match i {
7185 0 => 0u16,
7186 1 => commission.to_evm() + 500u16,
7187 2 => commission.to_evm() - 100u16,
7188 _ => unreachable!(),
7189 }
7190 .try_into()?;
7191 commissions.push((validator, commission, new_commission));
7192 tracing::info!(%validator, %commission, %new_commission, "Update commission");
7193 update_commission(provider, st_addr, new_commission)
7194 .await?
7195 .get_receipt()
7196 .await?;
7197 }
7198
7199 let current_epoch = network.peers[0]
7201 .decided_leaf()
7202 .await
7203 .epoch(EPOCH_HEIGHT)
7204 .unwrap();
7205 let target_epoch = current_epoch.u64() + 3;
7206 println!("target epoch for new stake table: {target_epoch}");
7207 let mut events = network.peers[0].event_stream().await;
7208 wait_for_epochs(&mut events, EPOCH_HEIGHT, target_epoch).await;
7209
7210 let client: Client<ServerError, SequencerApiVersion> =
7212 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
7213 let validators = client
7214 .get::<AuthenticatedValidatorMap>(&format!("node/validators/{}", target_epoch - 1))
7215 .send()
7216 .await
7217 .expect("validators");
7218 assert!(!validators.is_empty());
7219 for (val, old_comm, _) in commissions.clone() {
7220 assert_eq!(validators.get(&val).unwrap().commission, old_comm.to_evm());
7221 }
7222
7223 let client: Client<ServerError, SequencerApiVersion> =
7225 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
7226 let validators = client
7227 .get::<AuthenticatedValidatorMap>(&format!("node/validators/{target_epoch}"))
7228 .send()
7229 .await
7230 .expect("validators");
7231 assert!(!validators.is_empty());
7232 for (val, _, new_comm) in commissions.clone() {
7233 assert_eq!(validators.get(&val).unwrap().commission, new_comm.to_evm());
7234 }
7235
7236 let last_block_with_old_commissions = EPOCH_HEIGHT * (target_epoch - 1);
7237 let block_with_new_commissions = EPOCH_HEIGHT * target_epoch;
7238 let mut new_amounts = vec![];
7239 for (val, ..) in commissions {
7240 let before = client
7241 .get::<Option<RewardAmount>>(&format!(
7242 "reward-state-v2/reward-balance/{last_block_with_old_commissions}/{val}"
7243 ))
7244 .send()
7245 .await?
7246 .unwrap();
7247 let after = client
7248 .get::<Option<RewardAmount>>(&format!(
7249 "reward-state-v2/reward-balance/{block_with_new_commissions}/{val}"
7250 ))
7251 .send()
7252 .await?
7253 .unwrap();
7254 new_amounts.push(after - before);
7255 }
7256
7257 let tolerance = U256::from(10 * EPOCH_HEIGHT).into();
7258 assert!(new_amounts[0] < tolerance);
7260
7261 assert!(new_amounts[1] + new_amounts[2] > tolerance);
7263
7264 Ok(())
7265 }
7266
7267 #[rstest]
7268 #[case(POS_V3)]
7269 #[case(POS_V4)]
7270 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7271 async fn test_reward_proof_endpoint(#[case] upgrade: Upgrade) -> anyhow::Result<()> {
7272 const EPOCH_HEIGHT: u64 = 10;
7273 const NUM_NODES: usize = 5;
7274
7275 let network_config = TestConfigBuilder::default()
7276 .epoch_height(EPOCH_HEIGHT)
7277 .build();
7278
7279 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7280 println!("API PORT = {api_port}");
7281
7282 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7283 let persistence: [_; NUM_NODES] = storage
7284 .iter()
7285 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7286 .collect::<Vec<_>>()
7287 .try_into()
7288 .unwrap();
7289
7290 let config = TestNetworkConfigBuilder::with_num_nodes()
7291 .api_config(SqlDataSource::options(
7292 &storage[0],
7293 Options::with_port(api_port).catchup(Default::default()),
7294 ))
7295 .network_config(network_config)
7296 .persistences(persistence.clone())
7297 .catchups(std::array::from_fn(|_| {
7298 StatePeers::<StaticVersion<0, 1>>::from_urls(
7299 vec![format!("http://localhost:{api_port}").parse().unwrap()],
7300 Default::default(),
7301 Duration::from_secs(2),
7302 &NoMetrics,
7303 )
7304 }))
7305 .pos_hook(
7306 DelegationConfig::MultipleDelegators,
7307 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
7308 upgrade,
7309 )
7310 .await
7311 .unwrap()
7312 .build();
7313
7314 let mut network = TestNetwork::new(config, upgrade).await;
7315
7316 let mut events = network.server.event_stream().await;
7318 wait_for_epochs(&mut events, EPOCH_HEIGHT, 4).await;
7319
7320 let url = format!("http://localhost:{api_port}").parse().unwrap();
7321 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
7322
7323 let validated_state = network.server.decided_state().await;
7324 let decided_leaf = network.server.decided_leaf().await;
7325 let height = decided_leaf.height();
7326
7327 if upgrade.base == EPOCH_VERSION {
7329 wait_until_block_height(&client, "reward-state/block-height", height).await;
7331
7332 network.stop_consensus().await;
7333
7334 for (address, _) in validated_state.reward_merkle_tree_v1.iter() {
7335 let (_, expected_proof) = validated_state
7336 .reward_merkle_tree_v1
7337 .lookup(*address)
7338 .expect_ok()
7339 .unwrap();
7340
7341 let res = client
7342 .get::<RewardAccountQueryDataV1>(&format!(
7343 "reward-state/proof/{height}/{address}"
7344 ))
7345 .send()
7346 .await
7347 .unwrap();
7348
7349 match res.proof.proof {
7350 RewardMerkleProofV1::Presence(p) => {
7351 assert_eq!(
7352 p, expected_proof,
7353 "Proof mismatch for V1 at {height}, addr={address}"
7354 );
7355 },
7356 other => panic!(
7357 "Expected Present proof for V1 at {height}, addr={address}, got {other:?}"
7358 ),
7359 }
7360 }
7361 } else {
7362 wait_until_block_height(&client, "reward-state-v2/block-height", height).await;
7364
7365 network.stop_consensus().await;
7366
7367 for (address, _) in validated_state.reward_merkle_tree_v2.iter() {
7368 let (_, expected_proof) = validated_state
7369 .reward_merkle_tree_v2
7370 .lookup(*address)
7371 .expect_ok()
7372 .unwrap();
7373
7374 let res = client
7375 .get::<RewardAccountQueryDataV2>(&format!(
7376 "reward-state-v2/proof/{height}/{address}"
7377 ))
7378 .send()
7379 .await
7380 .unwrap();
7381
7382 match res.proof.proof.clone() {
7383 RewardMerkleProofV2::Presence(p) => {
7384 assert_eq!(
7385 p, expected_proof,
7386 "Proof mismatch for V2 at {height}, addr={address}"
7387 );
7388 },
7389 other => panic!(
7390 "Expected Present proof for V2 at {height}, addr={address}, got {other:?}"
7391 ),
7392 }
7393
7394 let reward_claim_input = client
7395 .get::<RewardClaimInput>(&format!(
7396 "reward-state-v2/reward-claim-input/{height}/{address}"
7397 ))
7398 .send()
7399 .await
7400 .unwrap();
7401
7402 assert_eq!(reward_claim_input, res.to_reward_claim_input()?);
7403 }
7404 }
7405
7406 Ok(())
7407 }
7408
7409 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7410 async fn test_all_validators_endpoint() -> anyhow::Result<()> {
7411 const EPOCH_HEIGHT: u64 = 20;
7412
7413 let network_config = TestConfigBuilder::default()
7414 .epoch_height(EPOCH_HEIGHT)
7415 .build();
7416
7417 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7418
7419 const NUM_NODES: usize = 5;
7420
7421 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7422 let persistence: [_; NUM_NODES] = storage
7423 .iter()
7424 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7425 .collect::<Vec<_>>()
7426 .try_into()
7427 .unwrap();
7428
7429 let config = TestNetworkConfigBuilder::with_num_nodes()
7430 .api_config(SqlDataSource::options(
7431 &storage[0],
7432 Options::with_port(api_port),
7433 ))
7434 .network_config(network_config)
7435 .persistences(persistence.clone())
7436 .catchups(std::array::from_fn(|_| {
7437 StatePeers::<StaticVersion<0, 1>>::from_urls(
7438 vec![format!("http://localhost:{api_port}").parse().unwrap()],
7439 Default::default(),
7440 Duration::from_secs(2),
7441 &NoMetrics,
7442 )
7443 }))
7444 .pos_hook(
7445 DelegationConfig::MultipleDelegators,
7446 Default::default(),
7447 POS_V4,
7448 )
7449 .await
7450 .unwrap()
7451 .build();
7452
7453 let network = TestNetwork::new(config, POS_V4).await;
7454 let client: Client<ServerError, SequencerApiVersion> =
7455 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
7456
7457 let err = client
7458 .get::<Vec<RegisteredValidator<PubKey>>>("node/all-validators/1/0/1001")
7459 .header("Accept", "application/json")
7460 .send()
7461 .await
7462 .unwrap_err();
7463
7464 assert_matches!(err, ServerError { status, message} if
7465 status == StatusCode::BAD_REQUEST
7466 && message.contains("Limit cannot be greater than 1000")
7467 );
7468
7469 let mut events = network.peers[0].event_stream().await;
7471 wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
7472
7473 {
7475 client
7476 .get::<Vec<RegisteredValidator<PubKey>>>("node/all-validators/1/0/100")
7477 .send()
7478 .await
7479 .unwrap()
7480 .is_empty();
7481
7482 client
7483 .get::<Vec<RegisteredValidator<PubKey>>>("node/all-validators/2/0/100")
7484 .send()
7485 .await
7486 .unwrap()
7487 .is_empty();
7488 }
7489
7490 let validators = client
7492 .get::<Vec<RegisteredValidator<PubKey>>>("node/all-validators/3/0/100")
7493 .send()
7494 .await
7495 .expect("validators");
7496
7497 assert!(!validators.is_empty());
7498
7499 Ok(())
7500 }
7501
7502 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7503 async fn test_reward_accounts_catchup_endpoint() -> anyhow::Result<()> {
7504 const EPOCH_HEIGHT: u64 = 10;
7505 const NUM_NODES: usize = 3;
7506
7507 let network_config = TestConfigBuilder::default()
7508 .epoch_height(EPOCH_HEIGHT)
7509 .build();
7510
7511 let api_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7512 println!("API PORT = {api_port}");
7513
7514 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7515 let persistence: [_; NUM_NODES] = storage
7516 .iter()
7517 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7518 .collect::<Vec<_>>()
7519 .try_into()
7520 .unwrap();
7521
7522 let config = TestNetworkConfigBuilder::with_num_nodes()
7523 .api_config(SqlDataSource::options(
7524 &storage[0],
7525 Options::with_port(api_port).catchup(Default::default()),
7526 ))
7527 .network_config(network_config)
7528 .persistences(persistence.clone())
7529 .catchups(std::array::from_fn(|_| {
7530 StatePeers::<StaticVersion<0, 1>>::from_urls(
7531 vec![format!("http://localhost:{api_port}").parse().unwrap()],
7532 Default::default(),
7533 Duration::from_secs(2),
7534 &NoMetrics,
7535 )
7536 }))
7537 .pos_hook(
7538 DelegationConfig::MultipleDelegators,
7539 hotshot_contract_adapter::stake_table::StakeTableContractVersion::V2,
7540 POS_V4,
7541 )
7542 .await
7543 .unwrap()
7544 .build();
7545
7546 let mut network = TestNetwork::new(config, POS_V4).await;
7547
7548 let client: Client<ServerError, StaticVersion<0, 1>> =
7549 Client::new(format!("http://localhost:{api_port}").parse().unwrap());
7550
7551 client.connect(None).await;
7552
7553 let mut events = network.server.event_stream().await;
7554 wait_for_epochs(&mut events, EPOCH_HEIGHT, 3).await;
7555
7556 network.stop_consensus().await;
7557 let height = network.server.decided_leaf().await.height();
7558 wait_until_block_height(&client, "reward-state-v2/block-height", height).await;
7559
7560 let err = client
7561 .get::<Vec<(RewardAccountV2, RewardAmount)>>(&format!(
7562 "reward-state-v2/reward-amounts/{height}/0/10001"
7563 ))
7564 .send()
7565 .await
7566 .unwrap_err();
7567
7568 assert_matches!(err, ServerError { status, .. } if
7569 status == StatusCode::BAD_REQUEST
7570
7571 );
7572
7573 let mut expected: Vec<_> = network
7574 .server
7575 .decided_state()
7576 .await
7577 .reward_merkle_tree_v2
7578 .iter()
7579 .map(|(addr, amt)| (*addr, *amt))
7580 .collect();
7581 expected.sort_by_key(|(acct, _)| std::cmp::Reverse(*acct));
7583
7584 tracing::info!("expected accounts = {expected:?}");
7585 let limit = expected.len().min(10_000) as u64;
7586 let offset = 0u64;
7587 let expected: Vec<_> = expected.into_iter().take(limit as usize).collect();
7588
7589 let res = client
7590 .get::<Vec<(RewardAccountV2, RewardAmount)>>(&format!(
7591 "reward-state-v2/reward-amounts/{height}/{offset}/{limit}"
7592 ))
7593 .send()
7594 .await
7595 .unwrap();
7596
7597 assert_eq!(res, expected);
7598
7599 Ok(())
7600 }
7601
7602 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7603 async fn test_namespace_query_compat_v0_2() {
7604 test_namespace_query_compat_helper(Upgrade::trivial(FEE_VERSION)).await;
7605 }
7606
7607 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7608 async fn test_namespace_query_compat_v0_3() {
7609 test_namespace_query_compat_helper(Upgrade::trivial(EPOCH_VERSION)).await;
7610 }
7611
7612 async fn test_namespace_query_compat_helper(upgrade: Upgrade) {
7613 const NUM_NODES: usize = 5;
7615
7616 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7617 let url: Url = format!("http://localhost:{port}").parse().unwrap();
7618
7619 let test_config = TestConfigBuilder::default().build();
7620 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
7621 .api_config(Options::from(options::Http {
7622 port,
7623 max_connections: None,
7624 }))
7625 .catchups(std::array::from_fn(|_| {
7626 StatePeers::<SequencerApiVersion>::from_urls(
7627 vec![url.clone()],
7628 Default::default(),
7629 Duration::from_secs(2),
7630 &NoMetrics,
7631 )
7632 }))
7633 .network_config(test_config)
7634 .build();
7635
7636 let mut network = TestNetwork::new(config, upgrade).await;
7637 let mut events = network.server.event_stream().await;
7638
7639 let ns = NamespaceId::from(10_000u64);
7641 let tx = Transaction::new(ns, vec![1, 2, 3]);
7642 network.server.submit_transaction(tx.clone()).await.unwrap();
7643 let block = wait_for_decide_on_handle(&mut events, &tx).await.0;
7644
7645 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
7647 client.connect(None).await;
7648
7649 let (header, common): (Header, VidCommonQueryData<SeqTypes>) = try_join!(
7650 client.get(&format!("availability/header/{block}")).send(),
7651 client
7652 .get(&format!("availability/vid/common/{block}"))
7653 .send()
7654 )
7655 .unwrap();
7656 let version = header.version();
7657
7658 for api_ver in ["/v1", ""] {
7661 tracing::info!("test namespace API version: {api_ver}");
7662
7663 let ns_proof: NamespaceProofQueryData = client
7664 .get(&format!(
7665 "{api_ver}/availability/block/{block}/namespace/{ns}"
7666 ))
7667 .send()
7668 .await
7669 .unwrap();
7670 let proof = ns_proof.proof.as_ref().unwrap();
7671 if version < EPOCH_VERSION {
7672 assert!(matches!(proof, NsProof::V0(..)));
7673 } else {
7674 assert!(matches!(proof, NsProof::V1(..)));
7675 }
7676 let (txs, ns_from_proof) = proof
7677 .verify(
7678 header.ns_table(),
7679 &header.payload_commitment(),
7680 common.common(),
7681 )
7682 .unwrap();
7683 assert_eq!(ns_from_proof, ns);
7684 assert_eq!(txs, ns_proof.transactions);
7685 assert_eq!(txs, std::slice::from_ref(&tx));
7686
7687 let ns_proofs: Vec<NamespaceProofQueryData> = client
7689 .get(&format!(
7690 "{api_ver}/availability/block/{}/{}/namespace/{ns}",
7691 block,
7692 block + 1
7693 ))
7694 .send()
7695 .await
7696 .unwrap();
7697 assert_eq!(&ns_proofs, std::slice::from_ref(&ns_proof));
7698
7699 let ns_proof: NamespaceProofQueryData = client
7701 .get(&format!(
7702 "{api_ver}/availability/block/{}/namespace/{ns}",
7703 block - 1
7704 ))
7705 .send()
7706 .await
7707 .unwrap();
7708 assert_eq!(ns_proof.proof, None);
7709 assert_eq!(ns_proof.transactions, vec![]);
7710
7711 let mut proofs = client
7713 .socket(&format!(
7714 "{api_ver}/availability/stream/blocks/0/namespace/{ns}"
7715 ))
7716 .subscribe()
7717 .await
7718 .unwrap();
7719 for i in 0.. {
7720 tracing::info!(i, "stream proof");
7721 let proof: NamespaceProofQueryData = proofs.next().await.unwrap().unwrap();
7722 if proof.proof.is_none() {
7723 tracing::info!("waiting for non-trivial proof from stream");
7724 continue;
7725 }
7726 assert_eq!(&proof.transactions, std::slice::from_ref(&tx));
7727 break;
7728 }
7729 }
7730
7731 tracing::info!("test namespace API version: v0");
7733 if version < EPOCH_VERSION {
7734 let ns_proof: ADVZNamespaceProofQueryData = client
7735 .get(&format!("v0/availability/block/{block}/namespace/{ns}"))
7736 .send()
7737 .await
7738 .unwrap();
7739 let proof = ns_proof.proof.as_ref().unwrap();
7740 let VidCommon::V0(common) = common.common() else {
7741 panic!("wrong VID common version");
7742 };
7743 let (txs, ns_from_proof) = proof
7744 .verify(header.ns_table(), &header.payload_commitment(), common)
7745 .unwrap();
7746 assert_eq!(ns_from_proof, ns);
7747 assert_eq!(txs, ns_proof.transactions);
7748 assert_eq!(&txs, std::slice::from_ref(&tx));
7749
7750 let ns_proofs: Vec<ADVZNamespaceProofQueryData> = client
7752 .get(&format!(
7753 "v0/availability/block/{}/{}/namespace/{ns}",
7754 block,
7755 block + 1
7756 ))
7757 .send()
7758 .await
7759 .unwrap();
7760 assert_eq!(&ns_proofs, std::slice::from_ref(&ns_proof));
7761 } else {
7762 client
7764 .get::<ADVZNamespaceProofQueryData>(&format!(
7765 "v0/availability/block/{block}/namespace/{ns}"
7766 ))
7767 .send()
7768 .await
7769 .unwrap_err();
7770 }
7771
7772 let ns_proof: ADVZNamespaceProofQueryData = client
7774 .get(&format!(
7775 "v0/availability/block/{}/namespace/{ns}",
7776 block - 1
7777 ))
7778 .send()
7779 .await
7780 .unwrap();
7781 assert_eq!(ns_proof.proof, None);
7782 assert_eq!(ns_proof.transactions, vec![]);
7783
7784 let mut proofs = client
7787 .socket(&format!("v0/availability/stream/blocks/0/namespace/{ns}"))
7788 .subscribe()
7789 .await
7790 .unwrap();
7791 for i in 0.. {
7792 tracing::info!(i, "stream proof");
7793 let proof: ADVZNamespaceProofQueryData = match proofs.next().await {
7794 Some(proof) => proof.unwrap(),
7795 None => {
7796 assert!(
7798 version >= EPOCH_VERSION,
7799 "legacy steam ended while still on legacy consensus"
7800 );
7801 break;
7802 },
7803 };
7804 if proof.proof.is_none() {
7805 tracing::info!("waiting for non-trivial proof from stream");
7806 continue;
7807 }
7808 assert_eq!(&proof.transactions, std::slice::from_ref(&tx));
7809 break;
7810 }
7811
7812 network.server.shut_down().await;
7813 }
7814
7815 #[test_log::test(tokio::test(flavor = "multi_thread"))]
7816 async fn test_light_client_completeness() {
7817 const NUM_NODES: usize = 1;
7821 const EPOCH_HEIGHT: u64 = 200;
7822
7823 let upgrade = Upgrade::new(LEGACY_VERSION, EPOCH_VERSION);
7824 let port = reserve_tcp_port().expect("OS should have ephemeral ports available");
7825 let url: Url = format!("http://localhost:{port}").parse().unwrap();
7826
7827 let test_config = TestConfigBuilder::default()
7828 .epoch_height(EPOCH_HEIGHT)
7829 .epoch_start_block(321)
7830 .set_upgrades(upgrade.target)
7831 .await
7832 .build();
7833
7834 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
7835 let persistence: [_; NUM_NODES] = storage
7836 .iter()
7837 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
7838 .collect::<Vec<_>>()
7839 .try_into()
7840 .unwrap();
7841
7842 let config = TestNetworkConfigBuilder::<NUM_NODES, _, _>::with_num_nodes()
7843 .api_config(
7844 SqlDataSource::options(&storage[0], Options::with_port(port))
7845 .light_client(Default::default()),
7846 )
7847 .persistences(persistence.clone())
7848 .catchups(std::array::from_fn(|_| {
7849 StatePeers::<SequencerApiVersion>::from_urls(
7850 vec![url.clone()],
7851 Default::default(),
7852 Duration::from_secs(2),
7853 &NoMetrics,
7854 )
7855 }))
7856 .network_config(test_config)
7857 .build();
7858
7859 let mut network = TestNetwork::new(config, upgrade).await;
7860 let client: Client<ServerError, StaticVersion<0, 1>> = Client::new(url);
7861 client.connect(None).await;
7862
7863 let mut actual_leaves = vec![];
7866 let mut actual_blocks = vec![];
7867 let mut leaves = client
7868 .socket("availability/stream/leaves/0")
7869 .subscribe::<LeafQueryData<SeqTypes>>()
7870 .await
7871 .unwrap()
7872 .zip(
7873 client
7874 .socket("availability/stream/blocks/0")
7875 .subscribe::<BlockQueryData<SeqTypes>>()
7876 .await
7877 .unwrap(),
7878 )
7879 .map(|(leaf, block)| {
7880 let leaf = leaf.unwrap();
7881 let block = block.unwrap();
7882 actual_leaves.push(leaf.clone());
7883 actual_blocks.push(block);
7884 leaf
7885 });
7886
7887 let (upgrade_height, first_epoch) = loop {
7889 let leaf: LeafQueryData<SeqTypes> = leaves.next().await.unwrap();
7890 if leaf.header().version() < EPOCH_VERSION {
7891 tracing::info!(version = %leaf.header().version(), height = leaf.header().height(), view = ?leaf.leaf().view_number(), "waiting for epoch upgrade");
7892 continue;
7893 }
7894 break (leaf.height(), leaf.leaf().epoch(EPOCH_HEIGHT).unwrap());
7895 };
7896 tracing::info!(upgrade_height, ?first_epoch, "epochs enabled");
7897
7898 let mut epoch_heights = [0; 2];
7901 for (i, epoch_height) in epoch_heights.iter_mut().enumerate() {
7902 let desired_epoch = first_epoch + (i as u64) + 1;
7903 *epoch_height = loop {
7904 let leaf = leaves.next().await.unwrap();
7905 let epoch = leaf.leaf().epoch(EPOCH_HEIGHT).unwrap();
7906 if epoch > desired_epoch {
7907 tracing::info!(
7908 height = leaf.height(),
7909 ?desired_epoch,
7910 ?epoch,
7911 "changed epoch"
7912 );
7913 break leaf.height();
7914 }
7915 tracing::info!(
7916 ?desired_epoch,
7917 height = leaf.header().height(),
7918 view = ?leaf.leaf().view_number(),
7919 "waiting for epoch change"
7920 );
7921 };
7922 }
7923
7924 let max_block = epoch_heights[1] + 1;
7926 loop {
7927 let leaf = leaves.next().await.unwrap();
7928 if leaf.height() > max_block {
7929 break;
7930 }
7931 tracing::info!(max_block, height = leaf.height(), "waiting for block");
7932 }
7933
7934 network.stop_consensus().await;
7937
7938 let heights =
7941 (0..=1)
7943 .chain(upgrade_height-1..=upgrade_height+1)
7945 .chain(epoch_heights[0]-1..=epoch_heights[0] + 1)
7947 .chain(epoch_heights[1]-1..=max_block);
7949
7950 let quorum = EpochChangeQuorum::new(EPOCH_HEIGHT);
7951 for i in heights {
7952 let leaf = &actual_leaves[i as usize];
7953 let block = &actual_blocks[i as usize];
7954 tracing::info!(i, ?leaf, ?block, "check leaf");
7955
7956 let client = &client;
7958 let proofs = try_join_all(
7959 [
7960 format!("light-client/leaf/{i}"),
7961 format!("light-client/leaf/hash/{}", leaf.hash()),
7962 format!("light-client/leaf/block-hash/{}", leaf.block_hash()),
7963 ]
7964 .into_iter()
7965 .map(|path| async move {
7966 tracing::info!(i, path, "fetch leaf proof");
7967 let proof = client.get::<LeafProof>(&path).send().await?;
7968 Ok::<_, anyhow::Error>((path, proof))
7969 }),
7970 )
7971 .await
7972 .unwrap();
7973
7974 for (path, proof) in proofs {
7976 tracing::info!(i, path, ?proof, "check leaf proof");
7977 assert_eq!(
7978 proof.verify(LeafProofHint::Quorum(&quorum)).await.unwrap(),
7979 *leaf
7980 );
7981 }
7982
7983 let root_height = i + 1;
7985 let root = actual_leaves[root_height as usize].header();
7986 let proofs = try_join_all(
7987 [
7988 format!("light-client/header/{root_height}/{i}"),
7989 format!(
7990 "light-client/header/{root_height}/hash/{}",
7991 leaf.block_hash()
7992 ),
7993 ]
7994 .into_iter()
7995 .map(|path| async move {
7996 tracing::info!(i, path, "get header proof");
7997 let proof = client.get::<HeaderProof>(&path).send().await?;
7998 Ok::<_, anyhow::Error>((path, proof))
7999 }),
8000 )
8001 .await
8002 .unwrap();
8003 for (path, proof) in proofs {
8004 tracing::info!(i, path, ?proof, "check header proof");
8005 assert_eq!(
8006 proof.verify_ref(root.block_merkle_tree_root()).unwrap(),
8007 leaf.header()
8008 );
8009 }
8010
8011 let proof = client
8013 .get::<PayloadProof>(&format!("light-client/payload/{i}"))
8014 .send()
8015 .await
8016 .unwrap();
8017 assert_eq!(proof.verify(leaf.header()).unwrap(), *block.payload());
8018 }
8019
8020 let events: Vec<StakeTableEvent> = client
8022 .get(&format!("light-client/stake-table/{}", first_epoch + 2))
8023 .send()
8024 .await
8025 .unwrap();
8026 let mut state_from_events = StakeTableState::default();
8027 for event in events {
8028 state_from_events.apply_event(event).unwrap().unwrap();
8029 }
8030
8031 assert_eq!(
8032 state_from_events.into_validators(),
8033 network
8034 .server
8035 .consensus()
8036 .read()
8037 .await
8038 .storage()
8039 .load_all_validators(first_epoch + 2, 0, 1_000_000)
8040 .await
8041 .unwrap()
8042 .into_iter()
8043 .map(|v| (v.account, v))
8044 .collect::<RegisteredValidatorMap>()
8045 );
8046
8047 let err = client
8049 .get::<Vec<StakeTableEvent>>(&format!("light-client/stake-table/{}", first_epoch + 1))
8050 .send()
8051 .await
8052 .unwrap_err();
8053 assert_eq!(err.status(), StatusCode::BAD_REQUEST);
8054 }
8055
8056 #[test_log::test(tokio::test(flavor = "multi_thread"))]
8058 async fn test_fetch_leaf_returns_exact_height() -> anyhow::Result<()> {
8059 const EPOCH_HEIGHT: u64 = 10;
8060 const NUM_NODES: usize = 5;
8061 const TARGET_HEIGHT: u64 = EPOCH_HEIGHT * 3 + 2;
8062
8063 let network_config = TestConfigBuilder::default()
8064 .epoch_height(EPOCH_HEIGHT)
8065 .build();
8066
8067 let port = reserve_tcp_port().expect("No ports free for query service");
8068
8069 let storage = join_all((0..NUM_NODES).map(|_| SqlDataSource::create_storage())).await;
8070 let persistence: [_; NUM_NODES] = storage
8071 .iter()
8072 .map(<SqlDataSource as TestableSequencerDataSource>::persistence_options)
8073 .collect::<Vec<_>>()
8074 .try_into()
8075 .unwrap();
8076
8077 let catchup_peers = std::array::from_fn(|_| {
8078 StatePeers::<StaticVersion<0, 1>>::from_urls(
8079 vec![format!("http://localhost:{port}").parse().unwrap()],
8080 Default::default(),
8081 Duration::from_secs(2),
8082 &NoMetrics,
8083 )
8084 });
8085
8086 let config = TestNetworkConfigBuilder::with_num_nodes()
8087 .api_config(SqlDataSource::options(
8088 &storage[0],
8089 Options::with_port(port),
8090 ))
8091 .network_config(network_config)
8092 .persistences(persistence)
8093 .catchups(catchup_peers)
8094 .pos_hook(
8095 DelegationConfig::MultipleDelegators,
8096 Default::default(),
8097 POS_V4,
8098 )
8099 .await?
8100 .build();
8101
8102 let network = TestNetwork::new(config, POS_V4).await;
8103
8104 let height_client: Client<ServerError, StaticVersion<0, 1>> =
8106 Client::new(format!("http://localhost:{port}").parse().unwrap());
8107 wait_until_block_height(&height_client, "node/block-height", TARGET_HEIGHT + 5).await;
8108
8109 let coordinator = network.server.node_state().coordinator;
8111 let epoch = EpochNumber::new(epoch_from_block_number(TARGET_HEIGHT, EPOCH_HEIGHT));
8112 let membership = coordinator.membership().read().await;
8113 let stake_table = membership.stake_table(Some(epoch));
8114 let success_threshold = membership.success_threshold(Some(epoch));
8115 drop(membership);
8116
8117 let catchup = StatePeers::<StaticVersion<0, 1>>::from_urls(
8119 vec![format!("http://localhost:{port}").parse().unwrap()],
8120 Default::default(),
8121 Duration::from_secs(5),
8122 &NoMetrics,
8123 );
8124
8125 let leaf = catchup
8126 .fetch_leaf(TARGET_HEIGHT, stake_table, success_threshold)
8127 .await?;
8128
8129 assert_eq!(
8130 leaf.height(),
8131 TARGET_HEIGHT,
8132 "fetch_leaf must return the leaf at exactly the requested height"
8133 );
8134
8135 Ok(())
8136 }
8137}