1use std::collections::HashMap;
12
13use alloy::primitives::{Address, U256};
14use anyhow::Context;
15use async_trait::async_trait;
16use espresso_types::{
17 PubKey,
18 v0_3::{ChainConfig, RegisteredValidator},
19};
20
21pub mod fs;
22pub mod no_storage;
23mod persistence_metrics;
24pub mod sql;
25
26#[derive(serde::Serialize, serde::Deserialize)]
29pub(crate) struct RegisteredValidatorNoX25519 {
30 pub account: Address,
31 pub stake_table_key: PubKey,
32 pub state_ver_key: hotshot_types::light_client::StateVerKey,
33 pub stake: U256,
34 pub commission: u16,
35 pub delegators: HashMap<Address, U256>,
36 pub authenticated: bool,
37}
38
39impl RegisteredValidatorNoX25519 {
40 pub fn migrate(self) -> RegisteredValidator<PubKey> {
41 RegisteredValidator {
42 account: self.account,
43 stake_table_key: self.stake_table_key,
44 state_ver_key: self.state_ver_key,
45 stake: self.stake,
46 commission: self.commission,
47 delegators: self.delegators,
48 authenticated: self.authenticated,
49 x25519_key: None,
50 p2p_addr: None,
51 }
52 }
53}
54
55fn migrate_network_config(
57 mut network_config: serde_json::Value,
58) -> anyhow::Result<serde_json::Value> {
59 let config = network_config
60 .get_mut("config")
61 .context("missing field `config`")?
62 .as_object_mut()
63 .context("`config` must be an object")?;
64
65 if !config.contains_key("builder_urls") {
66 let url = config
71 .remove("builder_url")
72 .context("missing field `builder_url`")?;
73 config.insert("builder_urls".into(), vec![url].into());
74 }
75
76 if !config.contains_key("start_proposing_view") {
82 config.insert("start_proposing_view".into(), 9007199254740991u64.into());
83 }
84 if !config.contains_key("stop_proposing_view") {
85 config.insert("stop_proposing_view".into(), 0.into());
86 }
87 if !config.contains_key("start_voting_view") {
88 config.insert("start_voting_view".into(), 9007199254740991u64.into());
89 }
90 if !config.contains_key("stop_voting_view") {
91 config.insert("stop_voting_view".into(), 0.into());
92 }
93 if !config.contains_key("start_proposing_time") {
94 config.insert("start_proposing_time".into(), 9007199254740991u64.into());
95 }
96 if !config.contains_key("stop_proposing_time") {
97 config.insert("stop_proposing_time".into(), 0.into());
98 }
99 if !config.contains_key("start_voting_time") {
100 config.insert("start_voting_time".into(), 9007199254740991u64.into());
101 }
102 if !config.contains_key("stop_voting_time") {
103 config.insert("stop_voting_time".into(), 0.into());
104 }
105
106 if !config.contains_key("epoch_height") {
109 config.insert("epoch_height".into(), 0.into());
110 }
111
112 if !config.contains_key("drb_difficulty") {
115 config.insert("drb_difficulty".into(), 0.into());
116 }
117 if !config.contains_key("drb_upgrade_difficulty") {
118 config.insert("drb_upgrade_difficulty".into(), 0.into());
119 }
120
121 if !config.contains_key("da_committees") {
123 config.insert("da_committees".into(), serde_json::json!([]));
124 }
125
126 Ok(network_config)
127}
128
129#[async_trait]
130pub trait ChainConfigPersistence: Sized + Send + Sync {
131 async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()>;
132}
133
134#[cfg(test)]
135mod tests {
136 use std::{cmp::max, collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration};
137
138 use alloy::{
139 network::EthereumWallet,
140 primitives::{Address, U256},
141 providers::{Provider, ProviderBuilder, ext::AnvilApi},
142 };
143 use anyhow::bail;
144 use async_lock::{Mutex, RwLock};
145 use async_trait::async_trait;
146 use committable::{Commitment, Committable};
147 use espresso_contract_deployer::{
148 Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS, builder::DeployerArgsBuilder,
149 network_config::light_client_genesis_from_stake_table,
150 };
151 use espresso_types::{
152 Event, L1Client, L1ClientOptions, Leaf, Leaf2, NodeState, PubKey, SeqTypes, ValidatedState,
153 traits::{
154 EventConsumer, EventsPersistenceRead, MembershipPersistence, NullEventConsumer,
155 PersistenceOptions, SequencerPersistence,
156 },
157 v0_3::{AuthenticatedValidator, EventKey, Fetcher, RegisteredValidator, StakeTableEvent},
158 };
159 use futures::{StreamExt, TryStreamExt, future::join_all};
160 use hotshot::{
161 InitializerEpochInfo,
162 types::{BLSPubKey, SignatureKey},
163 };
164 use hotshot_contract_adapter::{
165 sol_types::StakeTableV3::Delegated, stake_table::StakeTableContractVersion,
166 };
167 use hotshot_example_types::node_types::TEST_VERSIONS;
168 use hotshot_query_service::{availability::BlockQueryData, testing::mocks::MOCK_UPGRADE};
169 use hotshot_types::{
170 data::{
171 DaProposal2, EpochNumber, QuorumProposal2, QuorumProposalWrapper, VidCommitment,
172 ViewNumber, ns_table::parse_ns_table, vid_commitment, vid_disperse::AvidMDisperseShare,
173 },
174 event::{EventType, HotShotAction, LeafInfo},
175 light_client::StateKeyPair,
176 message::{Proposal, UpgradeLock, convert_proposal},
177 new_protocol::CoordinatorEvent,
178 simple_certificate::{
179 CertificatePair, NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2,
180 UpgradeCertificate,
181 },
182 simple_vote::{NextEpochQuorumData2, QuorumData2, UpgradeProposalData, VersionedVoteData},
183 traits::{EncodeBytes, block_contents::BlockHeader},
184 utils::EpochTransitionIndicator,
185 vid::avidm::{AvidMScheme, init_avidm_param},
186 vote::HasViewNumber,
187 };
188 use indexmap::IndexMap;
189 use staking_cli::demo::{DelegationConfig, StakingTransactions};
190 use surf_disco::Client;
191 use test_utils::reserve_tcp_port;
192 use tide_disco::error::ServerError;
193 use tokio::{spawn, time::sleep};
194 use vbs::version::Version;
195 use versions::{Upgrade, version};
196
197 use crate::{
198 RECENT_STAKE_TABLES_LIMIT, SequencerApiVersion,
199 api::{
200 Options,
201 test_helpers::{STAKE_TABLE_CAPACITY_FOR_TEST, TestNetwork, TestNetworkConfigBuilder},
202 },
203 catchup::NullStateCatchup,
204 testing::{TestConfigBuilder, staking_priv_keys},
205 };
206
207 #[async_trait]
208 pub trait TestablePersistence: SequencerPersistence + MembershipPersistence {
209 type Storage: Sync;
210
211 async fn tmp_storage() -> Self::Storage;
212 fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self>;
213
214 async fn connect(storage: &Self::Storage) -> Self {
215 Self::options(storage).create().await.unwrap()
216 }
217 }
218
219 #[rstest_reuse::template]
220 #[rstest::rstest]
221 #[case(PhantomData::<crate::persistence::sql::Persistence>)]
222 #[case(PhantomData::<crate::persistence::fs::Persistence>)]
223 #[test_log::test(tokio::test(flavor = "multi_thread"))]
224 pub fn persistence_types<P: TestablePersistence>(#[case] _p: PhantomData<P>) {}
225
226 #[derive(Clone, Debug, Default)]
227 struct EventCollector {
228 events: Arc<RwLock<Vec<Event>>>,
229 }
230
231 impl EventCollector {
232 async fn leaf_chain(&self) -> Vec<LeafInfo<SeqTypes>> {
233 self.events
234 .read()
235 .await
236 .iter()
237 .flat_map(|event| {
238 let EventType::Decide { leaf_chain, .. } = &event.event else {
239 panic!("expected decide event, got {event:?}");
240 };
241 leaf_chain.iter().cloned().rev()
242 })
243 .collect::<Vec<_>>()
244 }
245 }
246
247 #[async_trait]
248 impl EventConsumer for EventCollector {
249 async fn handle_event(&self, event: &CoordinatorEvent<SeqTypes>) -> anyhow::Result<()> {
250 if let CoordinatorEvent::LegacyEvent(event) = event {
251 self.events.write().await.push(event.clone());
252 }
253 Ok(())
254 }
255 }
256
257 #[derive(Clone, Copy, Debug)]
258 struct FailConsumer;
259
260 #[async_trait]
261 impl EventConsumer for FailConsumer {
262 async fn handle_event(&self, _: &CoordinatorEvent<SeqTypes>) -> anyhow::Result<()> {
263 bail!("mock error injection");
264 }
265 }
266
267 #[rstest_reuse::apply(persistence_types)]
268 pub async fn test_voted_view<P: TestablePersistence>(_p: PhantomData<P>) {
269 let tmp = P::tmp_storage().await;
270 let storage = P::connect(&tmp).await;
271
272 assert_eq!(storage.load_latest_acted_view().await.unwrap(), None);
274
275 let view1 = ViewNumber::genesis();
277 storage
278 .record_action(view1, None, HotShotAction::Vote)
279 .await
280 .unwrap();
281 assert_eq!(
282 storage.load_latest_acted_view().await.unwrap().unwrap(),
283 view1
284 );
285
286 let view2 = view1 + 1;
288 storage
289 .record_action(view2, None, HotShotAction::Vote)
290 .await
291 .unwrap();
292 assert_eq!(
293 storage.load_latest_acted_view().await.unwrap().unwrap(),
294 view2
295 );
296
297 storage
299 .record_action(view1, None, HotShotAction::Vote)
300 .await
301 .unwrap();
302 assert_eq!(
303 storage.load_latest_acted_view().await.unwrap().unwrap(),
304 view2
305 );
306 }
307
308 #[rstest_reuse::apply(persistence_types)]
309 pub async fn test_restart_view<P: TestablePersistence>(_p: PhantomData<P>) {
310 let tmp = P::tmp_storage().await;
311 let storage = P::connect(&tmp).await;
312
313 assert_eq!(storage.load_restart_view().await.unwrap(), None);
315
316 let view1 = ViewNumber::genesis();
318 storage
319 .record_action(view1, None, HotShotAction::Vote)
320 .await
321 .unwrap();
322 assert_eq!(
323 storage.load_restart_view().await.unwrap().unwrap(),
324 view1 + 1
325 );
326
327 let view2 = view1 + 1;
329 storage
330 .record_action(view2, None, HotShotAction::Vote)
331 .await
332 .unwrap();
333 assert_eq!(
334 storage.load_restart_view().await.unwrap().unwrap(),
335 view2 + 1
336 );
337
338 storage
340 .record_action(view1, None, HotShotAction::Vote)
341 .await
342 .unwrap();
343 assert_eq!(
344 storage.load_restart_view().await.unwrap().unwrap(),
345 view2 + 1
346 );
347
348 storage
350 .record_action(view2 + 1, None, HotShotAction::Propose)
351 .await
352 .unwrap();
353 assert_eq!(
354 storage.load_restart_view().await.unwrap().unwrap(),
355 view2 + 1
356 );
357
358 storage
360 .record_action(view2 + 1, None, HotShotAction::TimeoutVote)
361 .await
362 .unwrap();
363 assert_eq!(
364 storage.load_restart_view().await.unwrap().unwrap(),
365 view2 + 1
366 );
367 }
368
369 #[rstest_reuse::apply(persistence_types)]
370 pub async fn test_store_drb_input<P: TestablePersistence>(_p: PhantomData<P>) {
371 use hotshot_types::drb::DrbInput;
372
373 let tmp = P::tmp_storage().await;
374 let storage = P::connect(&tmp).await;
375 let difficulty_level = 10;
376
377 if storage.load_drb_input(10).await.is_ok() {
379 panic!("unexpected nonempty drb_input");
380 }
381
382 let drb_input_1 = DrbInput {
383 epoch: 10,
384 iteration: 10,
385 value: [0u8; 32],
386 difficulty_level,
387 };
388
389 let drb_input_2 = DrbInput {
390 epoch: 10,
391 iteration: 20,
392 value: [0u8; 32],
393 difficulty_level,
394 };
395
396 let drb_input_3 = DrbInput {
397 epoch: 10,
398 iteration: 30,
399 value: [0u8; 32],
400 difficulty_level,
401 };
402
403 let _ = storage.store_drb_input(drb_input_1.clone()).await;
404
405 assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_1);
406
407 let _ = storage.store_drb_input(drb_input_3.clone()).await;
408
409 assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
411
412 let _ = storage.store_drb_input(drb_input_2.clone()).await;
413
414 assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
416 }
417
418 #[rstest_reuse::apply(persistence_types)]
419 pub async fn test_epoch_info<P: TestablePersistence>(_p: PhantomData<P>) {
420 let tmp = P::tmp_storage().await;
421 let storage = P::connect(&tmp).await;
422
423 assert_eq!(storage.load_start_epoch_info().await.unwrap(), Vec::new());
425
426 storage
428 .store_drb_result(EpochNumber::new(1), [1; 32])
429 .await
430 .unwrap();
431 assert_eq!(
432 storage.load_start_epoch_info().await.unwrap(),
433 vec![InitializerEpochInfo::<SeqTypes> {
434 epoch: EpochNumber::new(1),
435 drb_result: [1; 32],
436 block_header: None,
437 }]
438 );
439
440 storage
442 .store_drb_result(EpochNumber::new(2), [3; 32])
443 .await
444 .unwrap();
445 assert_eq!(
446 storage.load_start_epoch_info().await.unwrap(),
447 vec![
448 InitializerEpochInfo::<SeqTypes> {
449 epoch: EpochNumber::new(1),
450 drb_result: [1; 32],
451 block_header: None,
452 },
453 InitializerEpochInfo::<SeqTypes> {
454 epoch: EpochNumber::new(2),
455 drb_result: [3; 32],
456 block_header: None,
457 }
458 ]
459 );
460
461 let instance_state = NodeState::mock();
463 let validated_state = hotshot_types::traits::ValidatedState::genesis(&instance_state).0;
464 let leaf: Leaf2 = Leaf::genesis(&validated_state, &instance_state, MOCK_UPGRADE.base)
465 .await
466 .into();
467 let header = leaf.block_header().clone();
468
469 storage
471 .store_epoch_root(EpochNumber::new(1), header.clone())
472 .await
473 .unwrap();
474 assert_eq!(
475 storage.load_start_epoch_info().await.unwrap(),
476 vec![
477 InitializerEpochInfo::<SeqTypes> {
478 epoch: EpochNumber::new(1),
479 drb_result: [1; 32],
480 block_header: Some(header.clone()),
481 },
482 InitializerEpochInfo::<SeqTypes> {
483 epoch: EpochNumber::new(2),
484 drb_result: [3; 32],
485 block_header: None,
486 }
487 ]
488 );
489
490 let total_epochs = RECENT_STAKE_TABLES_LIMIT + 10;
492 for i in 0..total_epochs {
493 let epoch = EpochNumber::new(i);
494 let drb = [i as u8; 32];
495 storage
496 .store_drb_result(epoch, drb)
497 .await
498 .unwrap_or_else(|_| panic!("Failed to store DRB result for epoch {i}"));
499 }
500
501 let results = storage.load_start_epoch_info().await.unwrap();
502
503 assert_eq!(
505 results.len(),
506 RECENT_STAKE_TABLES_LIMIT as usize,
507 "Should return only the most recent {RECENT_STAKE_TABLES_LIMIT} epochs",
508 );
509
510 for (i, info) in results.iter().enumerate() {
511 let expected_epoch =
512 EpochNumber::new(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64);
513 let expected_drb = [(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64) as u8; 32];
514 assert_eq!(info.epoch, expected_epoch, "invalid epoch at index {i}",);
515 assert_eq!(info.drb_result, expected_drb, "invalid DRB at index {i}",);
516 assert!(info.block_header.is_none(), "Expected no block header");
517 }
518 }
519
520 fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
521 LeafInfo {
522 leaf,
523 vid_share: None,
524 state: Default::default(),
525 delta: None,
526 state_cert: None,
527 }
528 }
529
530 #[rstest_reuse::apply(persistence_types)]
531 pub async fn test_append_and_decide<P: TestablePersistence>(_p: PhantomData<P>) {
532 let tmp = P::tmp_storage().await;
533 let storage = P::connect(&tmp).await;
534
535 assert_eq!(
537 storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
538 None
539 );
540
541 let leaf: Leaf2 = Leaf2::genesis(
542 &ValidatedState::default(),
543 &NodeState::mock(),
544 TEST_VERSIONS.test.base,
545 )
546 .await;
547 let leaf_payload = leaf.block_payload().unwrap();
548 let leaf_payload_bytes_arc = leaf_payload.encode();
549
550 let avidm_param = init_avidm_param(2).unwrap();
551 let weights = vec![1u32; 2];
552
553 let ns_table = parse_ns_table(
554 leaf_payload.byte_len().as_usize(),
555 &leaf_payload.ns_table().encode(),
556 );
557 let (payload_commitment, shares) =
558 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
559 .unwrap();
560
561 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
562 let signature = PubKey::sign(&privkey, &[]).unwrap();
563 let mut vid = AvidMDisperseShare::<SeqTypes> {
564 view_number: ViewNumber::new(0),
565 payload_commitment,
566 share: shares[0].clone(),
567 recipient_key: pubkey,
568 epoch: Some(EpochNumber::new(0)),
569 target_epoch: Some(EpochNumber::new(0)),
570 common: avidm_param,
571 };
572 let mut quorum_proposal = Proposal {
573 data: QuorumProposalWrapper::<SeqTypes> {
574 proposal: QuorumProposal2::<SeqTypes> {
575 epoch: None,
576 block_header: leaf.block_header().clone(),
577 view_number: ViewNumber::genesis(),
578 justify_qc: QuorumCertificate2::genesis(
579 &ValidatedState::default(),
580 &NodeState::mock(),
581 TEST_VERSIONS.test,
582 )
583 .await,
584 upgrade_certificate: None,
585 view_change_evidence: None,
586 next_drb_result: None,
587 next_epoch_justify_qc: None,
588 state_cert: None,
589 },
590 },
591 signature,
592 _pd: Default::default(),
593 };
594
595 let vid_share0 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
596
597 storage.append_vid(&vid_share0).await.unwrap();
598
599 assert_eq!(
600 storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
601 Some(vid_share0.clone())
602 );
603
604 vid.view_number = ViewNumber::new(1);
605
606 let vid_share1 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
607 storage.append_vid(&vid_share1).await.unwrap();
608
609 assert_eq!(
610 storage.load_vid_share(vid.view_number()).await.unwrap(),
611 Some(vid_share1.clone())
612 );
613
614 vid.view_number = ViewNumber::new(2);
615
616 let vid_share2 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
617 storage.append_vid(&vid_share2).await.unwrap();
618
619 assert_eq!(
620 storage.load_vid_share(vid.view_number()).await.unwrap(),
621 Some(vid_share2.clone())
622 );
623
624 vid.view_number = ViewNumber::new(3);
625
626 let vid_share3 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
627 storage.append_vid(&vid_share3).await.unwrap();
628
629 assert_eq!(
630 storage.load_vid_share(vid.view_number()).await.unwrap(),
631 Some(vid_share3.clone())
632 );
633
634 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
635 .expect("Failed to sign block payload");
636
637 let da_proposal_inner = DaProposal2::<SeqTypes> {
638 encoded_transactions: leaf_payload_bytes_arc.clone(),
639 metadata: leaf_payload.ns_table().clone(),
640 view_number: ViewNumber::new(0),
641 epoch: None,
642 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
643 };
644
645 let da_proposal = Proposal {
646 data: da_proposal_inner,
647 signature: block_payload_signature,
648 _pd: Default::default(),
649 };
650
651 let vid_commitment = vid_commitment(
652 &leaf_payload_bytes_arc,
653 &leaf.block_header().metadata().encode(),
654 2,
655 TEST_VERSIONS.test.base,
656 );
657
658 storage
659 .append_da2(&da_proposal, vid_commitment)
660 .await
661 .unwrap();
662
663 assert_eq!(
664 storage.load_da_proposal(ViewNumber::new(0)).await.unwrap(),
665 Some(da_proposal.clone())
666 );
667
668 let mut da_proposal1 = da_proposal.clone();
669 da_proposal1.data.view_number = ViewNumber::new(1);
670 storage
671 .append_da2(&da_proposal1.clone(), vid_commitment)
672 .await
673 .unwrap();
674
675 assert_eq!(
676 storage
677 .load_da_proposal(da_proposal1.data.view_number)
678 .await
679 .unwrap(),
680 Some(da_proposal1.clone())
681 );
682
683 let mut da_proposal2 = da_proposal1.clone();
684 da_proposal2.data.view_number = ViewNumber::new(2);
685 storage
686 .append_da2(&da_proposal2.clone(), vid_commitment)
687 .await
688 .unwrap();
689
690 assert_eq!(
691 storage
692 .load_da_proposal(da_proposal2.data.view_number)
693 .await
694 .unwrap(),
695 Some(da_proposal2.clone())
696 );
697
698 let mut da_proposal3 = da_proposal2.clone();
699 da_proposal3.data.view_number = ViewNumber::new(3);
700 storage
701 .append_da2(&da_proposal3.clone(), vid_commitment)
702 .await
703 .unwrap();
704
705 assert_eq!(
706 storage
707 .load_da_proposal(da_proposal3.data.view_number)
708 .await
709 .unwrap(),
710 Some(da_proposal3.clone())
711 );
712
713 let quorum_proposal1 = quorum_proposal.clone();
714
715 storage
716 .append_quorum_proposal2(&quorum_proposal1)
717 .await
718 .unwrap();
719
720 assert_eq!(
721 storage.load_quorum_proposals().await.unwrap(),
722 BTreeMap::from_iter([(ViewNumber::genesis(), quorum_proposal1.clone())])
723 );
724
725 quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
726 let quorum_proposal2 = quorum_proposal.clone();
727 storage
728 .append_quorum_proposal2(&quorum_proposal2)
729 .await
730 .unwrap();
731
732 assert_eq!(
733 storage.load_quorum_proposals().await.unwrap(),
734 BTreeMap::from_iter([
735 (ViewNumber::genesis(), quorum_proposal1.clone()),
736 (ViewNumber::new(1), quorum_proposal2.clone())
737 ])
738 );
739
740 quorum_proposal.data.proposal.view_number = ViewNumber::new(2);
741 quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(1);
742 let quorum_proposal3 = quorum_proposal.clone();
743 storage
744 .append_quorum_proposal2(&quorum_proposal3)
745 .await
746 .unwrap();
747
748 assert_eq!(
749 storage.load_quorum_proposals().await.unwrap(),
750 BTreeMap::from_iter([
751 (ViewNumber::genesis(), quorum_proposal1.clone()),
752 (ViewNumber::new(1), quorum_proposal2.clone()),
753 (ViewNumber::new(2), quorum_proposal3.clone())
754 ])
755 );
756
757 quorum_proposal.data.proposal.view_number = ViewNumber::new(3);
758 quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(2);
759
760 let quorum_proposal4 = quorum_proposal.clone();
762 storage
763 .append_quorum_proposal2(&quorum_proposal4)
764 .await
765 .unwrap();
766
767 assert_eq!(
768 storage.load_quorum_proposals().await.unwrap(),
769 BTreeMap::from_iter([
770 (ViewNumber::genesis(), quorum_proposal1.clone()),
771 (ViewNumber::new(1), quorum_proposal2.clone()),
772 (ViewNumber::new(2), quorum_proposal3.clone()),
773 (ViewNumber::new(3), quorum_proposal4.clone())
774 ])
775 );
776
777 let leaves = [
780 Leaf2::from_quorum_proposal(&quorum_proposal1.data),
781 Leaf2::from_quorum_proposal(&quorum_proposal2.data),
782 Leaf2::from_quorum_proposal(&quorum_proposal3.data),
783 Leaf2::from_quorum_proposal(&quorum_proposal4.data),
784 ];
785 let mut final_qc = leaves[3].justify_qc();
786 final_qc.view_number += 1;
787 final_qc.data.leaf_commit = Committable::commit(&leaf);
788 let qcs = [
789 leaves[1].justify_qc(),
790 leaves[2].justify_qc(),
791 leaves[3].justify_qc(),
792 final_qc,
793 ];
794
795 assert_eq!(
796 storage.load_anchor_view().await.unwrap(),
797 ViewNumber::genesis()
798 );
799
800 let consumer = EventCollector::default();
801 let leaf_chain = leaves
802 .iter()
803 .take(3)
804 .map(|leaf| leaf_info(leaf.clone()))
805 .zip(&qcs)
806 .collect::<Vec<_>>();
807 tracing::info!(?leaf_chain, "decide view 2");
808 storage
809 .append_decided_leaves(
810 ViewNumber::new(2),
811 leaf_chain
812 .iter()
813 .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change((*qc).clone()))),
814 None,
815 &consumer,
816 )
817 .await
818 .unwrap();
819 assert_eq!(
820 storage.load_anchor_view().await.unwrap(),
821 ViewNumber::new(2)
822 );
823
824 for i in 0..=2 {
825 assert_eq!(
826 storage.load_da_proposal(ViewNumber::new(i)).await.unwrap(),
827 None
828 );
829
830 assert_eq!(
831 storage.load_vid_share(ViewNumber::new(i)).await.unwrap(),
832 None
833 );
834 }
835
836 assert_eq!(
837 storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
838 Some(da_proposal3)
839 );
840
841 assert_eq!(
842 storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
843 Some(convert_proposal(vid_share3.clone()))
844 );
845
846 let proposals = storage.load_quorum_proposals().await.unwrap();
847 assert_eq!(
848 proposals,
849 BTreeMap::from_iter([(ViewNumber::new(3), quorum_proposal4)])
850 );
851
852 for (leaf, info) in leaves.iter().zip(consumer.leaf_chain().await.iter()) {
854 assert_eq!(info.leaf, *leaf);
855 let decided_vid_share = info.vid_share.as_ref().unwrap();
856 assert_eq!(decided_vid_share.view_number(), leaf.view_number());
857 }
858
859 assert_eq!(
861 storage.load_anchor_leaf().await.unwrap(),
862 Some((leaves[2].clone(), qcs[2].clone()))
863 );
864 assert_eq!(
865 storage.load_anchor_view().await.unwrap(),
866 leaves[2].view_number()
867 );
868
869 let consumer = EventCollector::default();
871 tracing::info!(leaf = ?leaves[3], qc = ?qcs[3], "decide view 3");
872 storage
873 .append_decided_leaves(
874 ViewNumber::new(3),
875 vec![(
876 &leaf_info(leaves[3].clone()),
877 CertificatePair::non_epoch_change(qcs[3].clone()),
878 )],
879 None,
880 &consumer,
881 )
882 .await
883 .unwrap();
884 assert_eq!(
885 storage.load_anchor_view().await.unwrap(),
886 ViewNumber::new(3)
887 );
888
889 let events = consumer.events.read().await;
891 assert_eq!(events.len(), 1);
892 assert_eq!(events[0].view_number, ViewNumber::new(3));
893 let EventType::Decide {
894 committing_qc,
895 leaf_chain,
896 ..
897 } = &events[0].event
898 else {
899 panic!("expected decide event, got {:?}", events[0]);
900 };
901 assert_eq!(*committing_qc.qc(), qcs[3]);
902 assert_eq!(leaf_chain.len(), 1);
903 let info = &leaf_chain[0];
904 assert_eq!(info.leaf, leaves[3]);
905
906 assert_eq!(
908 storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
909 None
910 );
911
912 assert_eq!(
913 storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
914 None
915 );
916 assert_eq!(
917 storage.load_quorum_proposals().await.unwrap(),
918 BTreeMap::new()
919 );
920 }
921
922 #[rstest_reuse::apply(persistence_types)]
923 pub async fn test_upgrade_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
924 let tmp = P::tmp_storage().await;
925 let storage = P::connect(&tmp).await;
926
927 assert_eq!(storage.load_upgrade_certificate().await.unwrap(), None);
929
930 let upgrade_data = UpgradeProposalData {
931 old_version: Version { major: 0, minor: 1 },
932 new_version: Version { major: 1, minor: 0 },
933 decide_by: ViewNumber::genesis(),
934 new_version_hash: Default::default(),
935 old_version_last_view: ViewNumber::genesis(),
936 new_version_first_view: ViewNumber::genesis(),
937 };
938
939 let decide_upgrade_certificate = UpgradeCertificate::<SeqTypes>::new(
940 upgrade_data.clone(),
941 upgrade_data.commit(),
942 ViewNumber::genesis(),
943 Default::default(),
944 Default::default(),
945 );
946 let res = storage
947 .store_upgrade_certificate(Some(decide_upgrade_certificate.clone()))
948 .await;
949 assert!(res.is_ok());
950
951 let res = storage.load_upgrade_certificate().await.unwrap();
952 let view_number = res.unwrap().view_number;
953 assert_eq!(view_number, ViewNumber::genesis());
954
955 let new_view_number_for_certificate = ViewNumber::new(50);
956 let mut new_upgrade_certificate = decide_upgrade_certificate.clone();
957 new_upgrade_certificate.view_number = new_view_number_for_certificate;
958
959 let res = storage
960 .store_upgrade_certificate(Some(new_upgrade_certificate.clone()))
961 .await;
962 assert!(res.is_ok());
963
964 let res = storage.load_upgrade_certificate().await.unwrap();
965 let view_number = res.unwrap().view_number;
966 assert_eq!(view_number, new_view_number_for_certificate);
967 }
968
969 #[rstest_reuse::apply(persistence_types)]
970 pub async fn test_next_epoch_quorum_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
971 let tmp = P::tmp_storage().await;
972 let storage = P::connect(&tmp).await;
973
974 assert_eq!(
976 storage.load_next_epoch_quorum_certificate().await.unwrap(),
977 None
978 );
979
980 let upgrade_lock = UpgradeLock::<SeqTypes>::new(TEST_VERSIONS.test);
981
982 let genesis_view = ViewNumber::genesis();
983
984 let leaf = Leaf2::genesis(
985 &ValidatedState::default(),
986 &NodeState::default(),
987 TEST_VERSIONS.test.base,
988 )
989 .await;
990 let data: NextEpochQuorumData2<SeqTypes> = QuorumData2 {
991 leaf_commit: leaf.commit(),
992 epoch: Some(EpochNumber::new(1)),
993 block_number: Some(leaf.height()),
994 }
995 .into();
996
997 let versioned_data =
998 VersionedVoteData::new_infallible(data.clone(), genesis_view, &upgrade_lock);
999
1000 let bytes: [u8; 32] = versioned_data.commit().into();
1001
1002 let next_epoch_qc = NextEpochQuorumCertificate2::new(
1003 data,
1004 Commitment::from_raw(bytes),
1005 genesis_view,
1006 None,
1007 PhantomData,
1008 );
1009
1010 let res = storage
1011 .store_next_epoch_quorum_certificate(next_epoch_qc.clone())
1012 .await;
1013 assert!(res.is_ok());
1014
1015 let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
1016 let view_number = res.unwrap().view_number;
1017 assert_eq!(view_number, ViewNumber::genesis());
1018
1019 let new_view_number_for_qc = ViewNumber::new(50);
1020 let mut new_qc = next_epoch_qc.clone();
1021 new_qc.view_number = new_view_number_for_qc;
1022
1023 let res = storage
1024 .store_next_epoch_quorum_certificate(new_qc.clone())
1025 .await;
1026 assert!(res.is_ok());
1027
1028 let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
1029 let view_number = res.unwrap().view_number;
1030 assert_eq!(view_number, new_view_number_for_qc);
1031 }
1032
1033 #[rstest_reuse::apply(persistence_types)]
1034 pub async fn test_decide_with_failing_event_consumer<P: TestablePersistence>(
1035 _p: PhantomData<P>,
1036 ) {
1037 let tmp = P::tmp_storage().await;
1038 let storage = P::connect(&tmp).await;
1039
1040 let mut chain = vec![];
1042
1043 let leaf: Leaf2 = Leaf::genesis(
1044 &ValidatedState::default(),
1045 &NodeState::mock(),
1046 MOCK_UPGRADE.base,
1047 )
1048 .await
1049 .into();
1050 let leaf_payload = leaf.block_payload().unwrap();
1051 let leaf_payload_bytes_arc = leaf_payload.encode();
1052 let avidm_param = init_avidm_param(2).unwrap();
1053 let weights = vec![1u32; 2];
1054 let ns_table = parse_ns_table(
1055 leaf_payload.byte_len().as_usize(),
1056 &leaf_payload.ns_table().encode(),
1057 );
1058 let (payload_commitment, shares) =
1059 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
1060 .unwrap();
1061
1062 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
1063 let mut vid = AvidMDisperseShare::<SeqTypes> {
1064 view_number: ViewNumber::new(0),
1065 payload_commitment,
1066 share: shares[0].clone(),
1067 recipient_key: pubkey,
1068 epoch: Some(EpochNumber::new(0)),
1069 target_epoch: Some(EpochNumber::new(0)),
1070 common: avidm_param,
1071 }
1072 .to_proposal(&privkey)
1073 .unwrap()
1074 .clone();
1075 let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1076 proposal: QuorumProposal2::<SeqTypes> {
1077 block_header: leaf.block_header().clone(),
1078 view_number: ViewNumber::genesis(),
1079 justify_qc: QuorumCertificate::genesis(
1080 &ValidatedState::default(),
1081 &NodeState::mock(),
1082 TEST_VERSIONS.test,
1083 )
1084 .await
1085 .to_qc2(),
1086 upgrade_certificate: None,
1087 view_change_evidence: None,
1088 next_drb_result: None,
1089 next_epoch_justify_qc: None,
1090 epoch: None,
1091 state_cert: None,
1092 },
1093 };
1094 let mut qc = QuorumCertificate2::genesis(
1095 &ValidatedState::default(),
1096 &NodeState::mock(),
1097 TEST_VERSIONS.test,
1098 )
1099 .await;
1100
1101 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
1102 .expect("Failed to sign block payload");
1103 let mut da_proposal = Proposal {
1104 data: DaProposal2::<SeqTypes> {
1105 encoded_transactions: leaf_payload_bytes_arc.clone(),
1106 metadata: leaf_payload.ns_table().clone(),
1107 view_number: ViewNumber::new(0),
1108 epoch: Some(EpochNumber::new(0)),
1109 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1110 },
1111 signature: block_payload_signature,
1112 _pd: Default::default(),
1113 };
1114
1115 let vid_commitment = vid_commitment(
1116 &leaf_payload_bytes_arc,
1117 &leaf.block_header().metadata().encode(),
1118 2,
1119 TEST_VERSIONS.test.base,
1120 );
1121
1122 for i in 0..4 {
1123 quorum_proposal.proposal.view_number = ViewNumber::new(i);
1124 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
1125 qc.view_number = leaf.view_number();
1126 qc.data.leaf_commit = Committable::commit(&leaf);
1127 vid.data.view_number = leaf.view_number();
1128 da_proposal.data.view_number = leaf.view_number();
1129 chain.push((leaf.clone(), qc.clone(), vid.clone(), da_proposal.clone()));
1130 }
1131
1132 for (_, _, vid, da) in &chain {
1134 tracing::info!(?da, ?vid, "insert proposal");
1135 storage.append_da2(da, vid_commitment).await.unwrap();
1136 storage
1137 .append_vid(&convert_proposal(vid.clone()))
1138 .await
1139 .unwrap();
1140 }
1141
1142 let leaf_chain = chain
1144 .iter()
1145 .take(2)
1146 .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1147 .collect::<Vec<_>>();
1148 tracing::info!("decide with event handling failure");
1149 storage
1150 .append_decided_leaves(
1151 ViewNumber::new(1),
1152 leaf_chain
1153 .iter()
1154 .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))),
1155 None,
1156 &FailConsumer,
1157 )
1158 .await
1159 .unwrap();
1160 for i in 0..4 {
1162 tracing::info!(i, "check proposal availability");
1163 assert!(
1164 storage
1165 .load_vid_share(ViewNumber::new(i))
1166 .await
1167 .unwrap()
1168 .is_some()
1169 );
1170 assert!(
1171 storage
1172 .load_da_proposal(ViewNumber::new(i))
1173 .await
1174 .unwrap()
1175 .is_some()
1176 );
1177 }
1178 tracing::info!("check anchor leaf updated");
1179 assert_eq!(
1180 storage
1181 .load_anchor_leaf()
1182 .await
1183 .unwrap()
1184 .unwrap()
1185 .0
1186 .view_number(),
1187 ViewNumber::new(1)
1188 );
1189 assert_eq!(
1190 storage.load_anchor_view().await.unwrap(),
1191 ViewNumber::new(1)
1192 );
1193
1194 let consumer = EventCollector::default();
1197 let leaf_chain = chain
1198 .iter()
1199 .skip(2)
1200 .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1201 .collect::<Vec<_>>();
1202 tracing::info!("decide successfully");
1203 storage
1204 .append_decided_leaves(
1205 ViewNumber::new(3),
1206 leaf_chain
1207 .iter()
1208 .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))),
1209 None,
1210 &consumer,
1211 )
1212 .await
1213 .unwrap();
1214 for i in 0..4 {
1216 tracing::info!(i, "check proposal garbage collected");
1217 assert!(
1218 storage
1219 .load_vid_share(ViewNumber::new(i))
1220 .await
1221 .unwrap()
1222 .is_none()
1223 );
1224 assert!(
1225 storage
1226 .load_da_proposal(ViewNumber::new(i))
1227 .await
1228 .unwrap()
1229 .is_none()
1230 );
1231 }
1232 tracing::info!("check anchor leaf updated");
1233 assert_eq!(
1234 storage
1235 .load_anchor_leaf()
1236 .await
1237 .unwrap()
1238 .unwrap()
1239 .0
1240 .view_number(),
1241 ViewNumber::new(3)
1242 );
1243 assert_eq!(
1244 storage.load_anchor_view().await.unwrap(),
1245 ViewNumber::new(3)
1246 );
1247
1248 tracing::info!("check decide event");
1250 let leaf_chain = consumer.leaf_chain().await;
1251 assert_eq!(leaf_chain.len(), 4, "{leaf_chain:#?}");
1252 for ((leaf, ..), info) in chain.iter().zip(leaf_chain.iter()) {
1253 assert_eq!(info.leaf, *leaf);
1254 let decided_vid_share = info.vid_share.as_ref().unwrap();
1255 assert_eq!(decided_vid_share.view_number(), leaf.view_number());
1256 assert!(info.leaf.block_payload().is_some());
1257 }
1258 }
1259
1260 #[rstest_reuse::apply(persistence_types)]
1261 pub async fn test_pruning<P: TestablePersistence>(_p: PhantomData<P>) {
1262 let tmp = P::tmp_storage().await;
1263
1264 let mut options = P::options(&tmp);
1265 options.set_view_retention(1);
1266 let storage = options.create().await.unwrap();
1267
1268 let leaf = Leaf::genesis(
1270 &ValidatedState::default(),
1271 &NodeState::mock(),
1272 MOCK_UPGRADE.base,
1273 )
1274 .await;
1275 let leaf_payload = leaf.block_payload().unwrap();
1276 let leaf_payload_bytes_arc = leaf_payload.encode();
1277 let avidm_param = init_avidm_param(2).unwrap();
1278 let weights = vec![1u32; 2];
1279
1280 let ns_table = parse_ns_table(
1281 leaf_payload.byte_len().as_usize(),
1282 &leaf_payload.ns_table().encode(),
1283 );
1284 let (payload_commitment, shares) =
1285 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
1286 .unwrap();
1287
1288 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
1289 let vid_share = convert_proposal(
1290 AvidMDisperseShare::<SeqTypes> {
1291 view_number: ViewNumber::new(0),
1292 payload_commitment,
1293 share: shares[0].clone(),
1294 recipient_key: pubkey,
1295 epoch: None,
1296 target_epoch: None,
1297 common: avidm_param,
1298 }
1299 .to_proposal(&privkey)
1300 .unwrap()
1301 .clone(),
1302 );
1303
1304 let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1305 proposal: QuorumProposal2::<SeqTypes> {
1306 block_header: leaf.block_header().clone(),
1307 view_number: ViewNumber::genesis(),
1308 justify_qc: QuorumCertificate::genesis(
1309 &ValidatedState::default(),
1310 &NodeState::mock(),
1311 TEST_VERSIONS.test,
1312 )
1313 .await
1314 .to_qc2(),
1315 upgrade_certificate: None,
1316 view_change_evidence: None,
1317 next_drb_result: None,
1318 next_epoch_justify_qc: None,
1319 epoch: None,
1320 state_cert: None,
1321 },
1322 };
1323 let quorum_proposal_signature =
1324 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
1325 .expect("Failed to sign quorum proposal");
1326 let quorum_proposal = Proposal {
1327 data: quorum_proposal,
1328 signature: quorum_proposal_signature,
1329 _pd: Default::default(),
1330 };
1331
1332 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
1333 .expect("Failed to sign block payload");
1334 let da_proposal = Proposal {
1335 data: DaProposal2::<SeqTypes> {
1336 encoded_transactions: leaf_payload_bytes_arc,
1337 metadata: leaf_payload.ns_table().clone(),
1338 view_number: ViewNumber::new(0),
1339 epoch: None,
1340 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1341 },
1342 signature: block_payload_signature,
1343 _pd: Default::default(),
1344 };
1345
1346 storage
1347 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
1348 .await
1349 .unwrap();
1350 storage.append_vid(&vid_share).await.unwrap();
1351 storage
1352 .append_quorum_proposal2(&quorum_proposal)
1353 .await
1354 .unwrap();
1355
1356 storage
1358 .append_decided_leaves(ViewNumber::new(1), [], None, &NullEventConsumer)
1359 .await
1360 .unwrap();
1361
1362 assert_eq!(
1365 storage
1366 .load_da_proposal(ViewNumber::new(0))
1367 .await
1368 .unwrap()
1369 .unwrap(),
1370 da_proposal
1371 );
1372 assert_eq!(
1373 storage
1374 .load_vid_share(ViewNumber::new(0))
1375 .await
1376 .unwrap()
1377 .unwrap(),
1378 vid_share
1379 );
1380 assert_eq!(
1381 storage
1382 .load_quorum_proposal(ViewNumber::new(0))
1383 .await
1384 .unwrap(),
1385 quorum_proposal
1386 );
1387
1388 storage
1390 .append_decided_leaves(ViewNumber::new(2), [], None, &NullEventConsumer)
1391 .await
1392 .unwrap();
1393 assert!(
1394 storage
1395 .load_da_proposal(ViewNumber::new(0))
1396 .await
1397 .unwrap()
1398 .is_none()
1399 );
1400 assert!(
1401 storage
1402 .load_vid_share(ViewNumber::new(0))
1403 .await
1404 .unwrap()
1405 .is_none()
1406 );
1407 assert!(
1408 storage
1409 .load_quorum_proposal(ViewNumber::new(0))
1410 .await
1411 .is_err()
1412 );
1413 }
1414
1415 async fn assert_events_eq<P: TestablePersistence>(
1416 persistence: &P,
1417 block: u64,
1418 stake_table_fetcher: &Fetcher,
1419 l1_client: &L1Client,
1420 stake_table_contract: Address,
1421 ) -> anyhow::Result<()> {
1422 let (stored_l1, events) = persistence.load_events(0, block).await?;
1424 assert!(!events.is_empty());
1425 assert!(stored_l1.is_some());
1426 assert!(events.iter().all(|((l1_block, _), _)| *l1_block <= block));
1427 let contract_events = Fetcher::fetch_events_from_contract(
1429 l1_client.clone(),
1430 stake_table_contract,
1431 None,
1432 block,
1433 )
1434 .await?;
1435 assert_eq!(
1436 contract_events, events,
1437 "Events from contract and persistence do not match"
1438 );
1439
1440 let fetched_events = stake_table_fetcher
1442 .fetch_and_store_stake_table_events(stake_table_contract, block)
1443 .await?;
1444 assert_eq!(fetched_events, events);
1445
1446 Ok(())
1447 }
1448
1449 #[rstest_reuse::apply(persistence_types)]
1452 pub async fn test_stake_table_fetching_from_persistence<P: TestablePersistence>(
1453 #[values(
1454 StakeTableContractVersion::V1,
1455 StakeTableContractVersion::V2,
1456 StakeTableContractVersion::V3
1457 )]
1458 stake_table_version: StakeTableContractVersion,
1459 _p: PhantomData<P>,
1460 ) -> anyhow::Result<()> {
1461 let epoch_height = 20;
1462
1463 let network_config = TestConfigBuilder::default()
1464 .epoch_height(epoch_height)
1465 .build();
1466
1467 let anvil_provider = network_config.anvil().unwrap();
1468
1469 let query_service_port =
1470 reserve_tcp_port().expect("OS should have ephemeral ports available");
1471 let query_api_options = Options::with_port(query_service_port);
1472
1473 const NODE_COUNT: usize = 2;
1474
1475 let storage = join_all((0..NODE_COUNT).map(|_| P::tmp_storage())).await;
1476 let persistence_options: [_; NODE_COUNT] = storage
1477 .iter()
1478 .map(P::options)
1479 .collect::<Vec<_>>()
1480 .try_into()
1481 .unwrap();
1482
1483 let persistence = persistence_options[0].clone().create().await.unwrap();
1484
1485 let l1_url = network_config.l1_url();
1487
1488 let upgrade = Upgrade::trivial(version(0, 3));
1489
1490 let testnet_config = TestNetworkConfigBuilder::with_num_nodes()
1491 .api_config(query_api_options)
1492 .network_config(network_config.clone())
1493 .persistences(persistence_options.clone())
1494 .pos_hook(
1495 DelegationConfig::MultipleDelegators,
1496 stake_table_version,
1497 upgrade,
1498 )
1499 .await
1500 .expect("Pos deployment failed")
1501 .build();
1502
1503 let test_network = TestNetwork::new(testnet_config, upgrade).await;
1505
1506 let client: Client<ServerError, SequencerApiVersion> = Client::new(
1507 format!("http://localhost:{query_service_port}")
1508 .parse()
1509 .unwrap(),
1510 );
1511 client.connect(None).await;
1512 tracing::info!(query_service_port, "server running");
1513
1514 let _initial_blocks = client
1516 .socket("availability/stream/blocks/0")
1517 .subscribe::<BlockQueryData<SeqTypes>>()
1518 .await
1519 .unwrap()
1520 .take(40)
1521 .try_collect::<Vec<_>>()
1522 .await
1523 .unwrap();
1524 let membership_coordinator = test_network
1526 .server
1527 .consensus_handle()
1528 .membership_coordinator()
1529 .await;
1530
1531 let l1_client = L1Client::new(vec![l1_url]).unwrap();
1532 let node_state = test_network.server.node_state();
1533 let chain_config = node_state.chain_config;
1534 let stake_table_contract = chain_config.stake_table_contract.unwrap();
1535
1536 let current_membership = membership_coordinator.membership();
1537 {
1538 let membership_state = current_membership;
1539 let stake_table_fetcher = membership_state.fetcher();
1540
1541 let block1 = anvil_provider
1542 .get_block_number()
1543 .await
1544 .expect("latest l1 block");
1545
1546 assert_events_eq(
1547 &persistence,
1548 block1,
1549 stake_table_fetcher,
1550 &l1_client,
1551 stake_table_contract,
1552 )
1553 .await?;
1554 }
1555 let _epoch_4_blocks = client
1556 .socket("availability/stream/blocks/0")
1557 .subscribe::<BlockQueryData<SeqTypes>>()
1558 .await
1559 .unwrap()
1560 .take(65)
1561 .try_collect::<Vec<_>>()
1562 .await
1563 .unwrap();
1564 let block2 = anvil_provider
1565 .get_block_number()
1566 .await
1567 .expect("latest l1 block");
1568
1569 {
1570 let membership_state = current_membership;
1571 let stake_table_fetcher = membership_state.fetcher();
1572
1573 assert_events_eq(
1574 &persistence,
1575 block2,
1576 stake_table_fetcher,
1577 &l1_client,
1578 stake_table_contract,
1579 )
1580 .await?;
1581 }
1582 Ok(())
1583 }
1584
1585 #[rstest_reuse::apply(persistence_types)]
1586 pub async fn test_stake_table_background_fetching<P: TestablePersistence>(
1587 #[values(
1588 StakeTableContractVersion::V1,
1589 StakeTableContractVersion::V2,
1590 StakeTableContractVersion::V3
1591 )]
1592 stake_table_version: StakeTableContractVersion,
1593 _p: PhantomData<P>,
1594 ) -> anyhow::Result<()> {
1595 use espresso_types::v0_3::ChainConfig;
1596 use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
1597
1598 let blocks_per_epoch = 10;
1599
1600 let network_config = TestConfigBuilder::<1>::default()
1601 .epoch_height(blocks_per_epoch)
1602 .build();
1603
1604 let anvil_provider = network_config.anvil().unwrap();
1605
1606 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1607 &network_config.hotshot_config().hotshot_stake_table(),
1608 STAKE_TABLE_CAPACITY_FOR_TEST,
1609 )
1610 .unwrap();
1611
1612 let (_, priv_keys): (Vec<_>, Vec<_>) = (0..20)
1613 .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed([1; 32], i as u64))
1614 .unzip();
1615 let state_key_pairs = (0..20)
1616 .map(|i| StateKeyPair::generate_from_seed_indexed([2; 32], i as u64))
1617 .collect::<Vec<_>>();
1618
1619 let validators = staking_priv_keys(&priv_keys, &state_key_pairs, 20);
1620
1621 let deployer = ProviderBuilder::new()
1622 .wallet(EthereumWallet::from(network_config.signer().clone()))
1623 .connect_http(network_config.l1_url().clone());
1624
1625 let mut contracts = Contracts::new();
1626 let args = DeployerArgsBuilder::default()
1627 .deployer(deployer.clone())
1628 .rpc_url(network_config.l1_url().clone())
1629 .mock_light_client(true)
1630 .genesis_lc_state(genesis_state)
1631 .genesis_st_state(genesis_stake)
1632 .blocks_per_epoch(blocks_per_epoch)
1633 .epoch_start_block(1)
1634 .exit_escrow_period(U256::from(max(
1635 blocks_per_epoch * 15 + 100,
1636 DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1637 )))
1638 .multisig_pauser(network_config.signer().address())
1639 .token_name("Espresso".to_string())
1640 .token_symbol("ESP".to_string())
1641 .initial_token_supply(U256::from(3590000000u64))
1642 .ops_timelock_delay(U256::from(0))
1643 .ops_timelock_admin(network_config.signer().address())
1644 .ops_timelock_proposers(vec![network_config.signer().address()])
1645 .ops_timelock_executors(vec![network_config.signer().address()])
1646 .safe_exit_timelock_delay(U256::from(10))
1647 .safe_exit_timelock_admin(network_config.signer().address())
1648 .safe_exit_timelock_proposers(vec![network_config.signer().address()])
1649 .safe_exit_timelock_executors(vec![network_config.signer().address()])
1650 .build()
1651 .unwrap();
1652
1653 match stake_table_version {
1654 StakeTableContractVersion::V1 => args.deploy_to_stake_table_v1(&mut contracts).await,
1655 StakeTableContractVersion::V2 => args.deploy_to_stake_table_v2(&mut contracts).await,
1656 StakeTableContractVersion::V3 => args.deploy_to_stake_table_v3(&mut contracts).await,
1657 }
1658 .expect("contracts deployed");
1659
1660 let st_addr = contracts
1661 .address(Contract::StakeTableProxy)
1662 .expect("StakeTableProxy deployed");
1663 let l1_url = network_config.l1_url().clone();
1664
1665 let mut planned_txns = StakingTransactions::create(
1666 l1_url.clone(),
1667 &deployer,
1668 st_addr,
1669 validators,
1670 None,
1671 DelegationConfig::MultipleDelegators,
1672 )
1673 .await
1674 .expect("stake table setup failed");
1675
1676 planned_txns
1677 .apply_prerequisites()
1678 .await
1679 .expect("prerequisites failed");
1680
1681 planned_txns.apply_one().await.expect("send tx failed");
1683
1684 anvil_provider
1686 .anvil_set_interval_mining(1)
1687 .await
1688 .expect("interval mining");
1689
1690 spawn({
1694 async move {
1695 {
1696 while let Some(receipt) =
1697 planned_txns.apply_one().await.expect("send tx failed")
1698 {
1699 tracing::debug!(?receipt, "transaction finalized");
1700 }
1701 }
1702 }
1703 });
1704
1705 let storage = P::tmp_storage().await;
1706 let persistence = P::options(&storage).create().await.unwrap();
1707
1708 let l1_client = L1ClientOptions {
1709 stake_table_update_interval: Duration::from_secs(7),
1710 l1_retry_delay: Duration::from_millis(10),
1711 l1_events_max_block_range: 10000,
1712 ..Default::default()
1713 }
1714 .connect(vec![l1_url])
1715 .unwrap();
1716 l1_client.spawn_tasks().await;
1717
1718 let fetcher = Fetcher::new(
1719 Arc::new(NullStateCatchup::default()),
1720 Arc::new(Mutex::new(persistence.clone())),
1721 l1_client.clone(),
1722 ChainConfig {
1723 stake_table_contract: Some(st_addr),
1724 base_fee: 0.into(),
1725 ..Default::default()
1726 },
1727 );
1728
1729 sleep(Duration::from_secs(20)).await;
1731
1732 fetcher.spawn_update_loop().await;
1733 let mut prev_l1_block = 0;
1734 let mut prev_events_len = 0;
1735 for _i in 0..10 {
1736 tokio::time::sleep(std::time::Duration::from_secs(8)).await;
1740
1741 let block = anvil_provider
1742 .get_block_number()
1743 .await
1744 .expect("latest l1 block");
1745
1746 let (read_offset, persisted_events) = persistence.load_events(0, block).await?;
1747 let read_offset = read_offset.unwrap();
1748 let l1_block = match read_offset {
1749 EventsPersistenceRead::Complete => block,
1750 EventsPersistenceRead::UntilL1Block(block) => block,
1751 };
1752
1753 tracing::info!("{l1_block:?}, persistence events = {persisted_events:?}.");
1754 assert!(persisted_events.len() > prev_events_len);
1755
1756 assert!(l1_block > prev_l1_block, "events not updated");
1757
1758 let contract_events =
1759 Fetcher::fetch_events_from_contract(l1_client.clone(), st_addr, None, l1_block)
1760 .await?;
1761 assert_eq!(persisted_events, contract_events);
1762
1763 prev_l1_block = l1_block;
1764 prev_events_len = persisted_events.len();
1765 }
1766
1767 Ok(())
1768 }
1769
1770 #[rstest_reuse::apply(persistence_types)]
1771 pub async fn test_membership_persistence<P: TestablePersistence>(
1772 _p: PhantomData<P>,
1773 ) -> anyhow::Result<()> {
1774 let tmp = P::tmp_storage().await;
1775 let mut opt = P::options(&tmp);
1776
1777 let storage = opt.create().await.unwrap();
1778
1779 let validator = AuthenticatedValidator::mock();
1780 let mut st = IndexMap::new();
1781 st.insert(validator.account, validator);
1782
1783 storage
1784 .store_stake(EpochNumber::new(10), st.clone(), None, None)
1785 .await?;
1786
1787 let (table, ..) = storage.load_stake(EpochNumber::new(10)).await?.unwrap();
1788 assert_eq!(st, table);
1789
1790 let val2 = AuthenticatedValidator::mock();
1791 let mut st2 = IndexMap::new();
1792 st2.insert(val2.account, val2);
1793 storage
1794 .store_stake(EpochNumber::new(11), st2.clone(), None, None)
1795 .await?;
1796
1797 let tables = storage.load_latest_stake(4).await?.unwrap();
1798 let mut iter = tables.iter();
1799 assert_eq!(
1800 Some(&(EpochNumber::new(11), (st2.clone(), None), None)),
1801 iter.next()
1802 );
1803 assert_eq!(Some(&(EpochNumber::new(10), (st, None), None)), iter.next());
1804 assert_eq!(None, iter.next());
1805
1806 for i in 0..=20 {
1807 storage
1808 .store_stake(EpochNumber::new(i), st2.clone(), None, None)
1809 .await?;
1810 }
1811
1812 let tables = storage.load_latest_stake(5).await?.unwrap();
1813 let mut iter = tables.iter();
1814 assert_eq!(
1815 Some(&(EpochNumber::new(20), (st2.clone(), None), None)),
1816 iter.next()
1817 );
1818 assert_eq!(
1819 Some(&(EpochNumber::new(19), (st2.clone(), None), None)),
1820 iter.next()
1821 );
1822 assert_eq!(
1823 Some(&(EpochNumber::new(18), (st2.clone(), None), None)),
1824 iter.next()
1825 );
1826 assert_eq!(
1827 Some(&(EpochNumber::new(17), (st2.clone(), None), None)),
1828 iter.next()
1829 );
1830 assert_eq!(
1831 Some(&(EpochNumber::new(16), (st2, None), None)),
1832 iter.next()
1833 );
1834 assert_eq!(None, iter.next());
1835
1836 Ok(())
1837 }
1838
1839 #[rstest_reuse::apply(persistence_types)]
1840 pub async fn test_delete_stake_tables<P: TestablePersistence>(
1841 _p: PhantomData<P>,
1842 ) -> anyhow::Result<()> {
1843 let tmp = P::tmp_storage().await;
1844 let mut opt = P::options(&tmp);
1845 let storage = opt.create().await.unwrap();
1846
1847 let event1 = StakeTableEvent::Delegate(Delegated {
1848 delegator: Address::ZERO,
1849 validator: Address::ZERO,
1850 amount: U256::from(100),
1851 });
1852 let event2 = StakeTableEvent::Delegate(Delegated {
1853 delegator: Address::ZERO,
1854 validator: Address::ZERO,
1855 amount: U256::from(200),
1856 });
1857
1858 let l1_block = 42u64;
1859 let events: Vec<(EventKey, StakeTableEvent)> =
1860 vec![((l1_block, 0), event1), ((l1_block, 1), event2)];
1861
1862 storage.store_events(l1_block, events.clone()).await?;
1863
1864 let (read_offset, loaded_events) = storage.load_events(0_u64, l1_block).await?;
1865 assert!(read_offset.is_some());
1866 assert_eq!(loaded_events.len(), 2);
1867 assert_eq!(loaded_events, events);
1868
1869 let v = RegisteredValidator::mock();
1871 let mut vmap = IndexMap::new();
1872 vmap.insert(v.account, v);
1873 storage
1874 .store_all_validators(EpochNumber::new(1), vmap.clone())
1875 .await?;
1876
1877 let loaded = storage
1878 .load_all_validators(EpochNumber::new(1), 0, 10)
1879 .await?;
1880 assert_eq!(loaded.len(), 1);
1881
1882 storage.delete_stake_tables().await?;
1883
1884 let (read_offset, loaded_events) = storage.load_events(0_u64, l1_block).await?;
1886 assert!(read_offset.is_none());
1887 assert!(loaded_events.is_empty());
1888
1889 let loaded = storage
1891 .load_all_validators(EpochNumber::new(1), 0, 10)
1892 .await
1893 .unwrap_or_default();
1894 assert!(loaded.is_empty());
1895
1896 let event3 = StakeTableEvent::Delegate(Delegated {
1897 delegator: Address::ZERO,
1898 validator: Address::ZERO,
1899 amount: U256::from(300),
1900 });
1901 let new_events: Vec<(EventKey, StakeTableEvent)> = vec![((l1_block, 0), event3)];
1902 storage.store_events(l1_block, new_events.clone()).await?;
1903
1904 let (read_offset, loaded_events) = storage.load_events(0_u64, l1_block).await?;
1905 assert!(read_offset.is_some());
1906 assert_eq!(loaded_events.len(), 1);
1907 assert_eq!(loaded_events, new_events);
1908
1909 Ok(())
1910 }
1911
1912 #[rstest_reuse::apply(persistence_types)]
1913 pub async fn test_store_and_load_all_validators<P: TestablePersistence>(
1914 _p: PhantomData<P>,
1915 ) -> anyhow::Result<()> {
1916 let tmp = P::tmp_storage().await;
1917 let mut opt = P::options(&tmp);
1918 let storage = opt.create().await.unwrap();
1919
1920 let mut vmap1 = IndexMap::new();
1921 for _i in 0..25 {
1922 let v = RegisteredValidator::mock();
1923 vmap1.insert(v.account, v);
1924 }
1925 storage
1926 .store_all_validators(EpochNumber::new(10), vmap1.clone())
1927 .await?;
1928
1929 let mut expected_all: Vec<_> = vmap1.clone().into_values().collect();
1930 expected_all.sort_by_key(|v| v.account);
1931
1932 let loaded_all = storage
1934 .load_all_validators(EpochNumber::new(10), 0, 100)
1935 .await?;
1936 assert_eq!(expected_all, loaded_all);
1938
1939 let loaded_first_10 = storage
1941 .load_all_validators(EpochNumber::new(10), 0, 10)
1942 .await?;
1943
1944 assert_eq!(expected_all[..10], loaded_first_10);
1945
1946 let loaded_next_10 = storage
1948 .load_all_validators(EpochNumber::new(10), 10, 10)
1949 .await?;
1950
1951 assert_eq!(expected_all[10..20], loaded_next_10);
1952
1953 let loaded_last_5 = storage
1955 .load_all_validators(EpochNumber::new(10), 20, 10)
1956 .await?;
1957
1958 assert_eq!(expected_all[20..], loaded_last_5);
1959
1960 let loaded_empty = storage
1962 .load_all_validators(EpochNumber::new(10), 100, 10)
1963 .await?;
1964 assert!(loaded_empty.is_empty());
1965
1966 let validator2 = RegisteredValidator::mock();
1968 let mut vmap2 = IndexMap::new();
1969 vmap2.insert(validator2.account, validator2.clone());
1970
1971 storage
1972 .store_all_validators(EpochNumber::new(11), vmap2.clone())
1973 .await?;
1974
1975 let mut expected_epoch11: Vec<_> = vmap2.clone().into_values().collect();
1976 expected_epoch11.sort_by_key(|v| v.account);
1977
1978 let loaded2 = storage
1979 .load_all_validators(EpochNumber::new(11), 0, 100)
1980 .await?;
1981
1982 assert_eq!(expected_epoch11, loaded2);
1983
1984 let loaded1_again = storage
1986 .load_all_validators(EpochNumber::new(10), 0, 100)
1987 .await?;
1988
1989 assert_eq!(expected_all, loaded1_again);
1990
1991 Ok(())
1992 }
1993
1994 #[rstest_reuse::apply(persistence_types)]
1995 pub async fn test_non_consecutive_decide<P: TestablePersistence>(_p: PhantomData<P>) {
1996 let tmp = P::tmp_storage().await;
1997 let storage = P::connect(&tmp).await;
1998
1999 let genesis_leaf: Leaf2 = Leaf2::genesis(
2000 &ValidatedState::default(),
2001 &NodeState::mock(),
2002 TEST_VERSIONS.test.base,
2003 )
2004 .await;
2005 let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
2006 proposal: QuorumProposal2::<SeqTypes> {
2007 epoch: None,
2008 block_header: genesis_leaf.block_header().clone(),
2009 view_number: genesis_leaf.view_number(),
2010 justify_qc: QuorumCertificate2::genesis(
2011 &ValidatedState::default(),
2012 &NodeState::mock(),
2013 TEST_VERSIONS.test,
2014 )
2015 .await,
2016 upgrade_certificate: None,
2017 view_change_evidence: None,
2018 next_drb_result: None,
2019 next_epoch_justify_qc: None,
2020 state_cert: None,
2021 },
2022 };
2023
2024 let leaf0 = Leaf2::from_quorum_proposal(&quorum_proposal);
2025
2026 quorum_proposal.proposal.view_number = ViewNumber::new(2);
2027 *quorum_proposal.proposal.block_header.height_mut() = 2;
2028 quorum_proposal.proposal.justify_qc.view_number = ViewNumber::new(1);
2029 let leaf2 = Leaf2::from_quorum_proposal(&quorum_proposal);
2030
2031 let mut qc0 = leaf0.justify_qc();
2032 qc0.data.leaf_commit = Committable::commit(&leaf0);
2033
2034 let mut qc2 = leaf2.justify_qc();
2035 qc2.view_number += 1;
2036 qc2.data.leaf_commit = Committable::commit(&leaf2);
2037
2038 let mut deciding_qc = qc2.clone();
2039 deciding_qc.view_number += 1;
2040
2041 storage
2043 .append_decided_leaves(
2044 ViewNumber::new(0),
2045 [(
2046 &leaf_info(leaf0.clone()),
2047 CertificatePair::non_epoch_change(qc0),
2048 )],
2049 None,
2050 &FailConsumer,
2051 )
2052 .await
2053 .unwrap();
2054
2055 let consumer = EventCollector::default();
2059 storage
2060 .append_decided_leaves(
2061 ViewNumber::new(2),
2062 [(
2063 &leaf_info(leaf2.clone()),
2064 CertificatePair::non_epoch_change(qc2),
2065 )],
2066 Some(Arc::new(CertificatePair::non_epoch_change(
2067 deciding_qc.clone(),
2068 ))),
2069 &consumer,
2070 )
2071 .await
2072 .unwrap();
2073
2074 let events = consumer.events.read().await;
2075 assert_eq!(events.len(), 2);
2076
2077 let EventType::Decide {
2078 leaf_chain: leaf_chain0,
2079 deciding_qc: deciding_qc0,
2080 ..
2081 } = &events[0].event
2082 else {
2083 panic!("expected decide event, got {:?}", events[0].event);
2084 };
2085 assert_eq!(leaf_chain0.len(), 1);
2086 assert_eq!(leaf_chain0[0].leaf, leaf0);
2087 assert_eq!(*deciding_qc0, None);
2088
2089 let EventType::Decide {
2090 leaf_chain: leaf_chain2,
2091 deciding_qc: deciding_qc2,
2092 ..
2093 } = &events[1].event
2094 else {
2095 panic!("expected decide event, got {:?}", events[1].event);
2096 };
2097 assert_eq!(leaf_chain2.len(), 1);
2098 assert_eq!(leaf_chain2[0].leaf, leaf2);
2099 assert_eq!(deciding_qc2.as_ref().unwrap().qc(), &deciding_qc);
2100 }
2101}