1use std::collections::HashMap;
12
13use alloy::primitives::{Address, U256};
14use anyhow::Context;
15use async_trait::async_trait;
16use espresso_types::{
17 PubKey,
18 v0_3::{ChainConfig, RegisteredValidator},
19};
20
21pub mod fs;
22pub mod no_storage;
23mod persistence_metrics;
24pub mod sql;
25
26#[derive(serde::Serialize, serde::Deserialize)]
29pub(crate) struct RegisteredValidatorNoX25519 {
30 pub account: Address,
31 pub stake_table_key: PubKey,
32 pub state_ver_key: hotshot_types::light_client::StateVerKey,
33 pub stake: U256,
34 pub commission: u16,
35 pub delegators: HashMap<Address, U256>,
36 pub authenticated: bool,
37}
38
39impl RegisteredValidatorNoX25519 {
40 pub fn migrate(self) -> RegisteredValidator<PubKey> {
41 RegisteredValidator {
42 account: self.account,
43 stake_table_key: self.stake_table_key,
44 state_ver_key: self.state_ver_key,
45 stake: self.stake,
46 commission: self.commission,
47 delegators: self.delegators,
48 authenticated: self.authenticated,
49 x25519_key: None,
50 p2p_addr: None,
51 }
52 }
53}
54
55fn migrate_network_config(
57 mut network_config: serde_json::Value,
58) -> anyhow::Result<serde_json::Value> {
59 let config = network_config
60 .get_mut("config")
61 .context("missing field `config`")?
62 .as_object_mut()
63 .context("`config` must be an object")?;
64
65 if !config.contains_key("builder_urls") {
66 let url = config
71 .remove("builder_url")
72 .context("missing field `builder_url`")?;
73 config.insert("builder_urls".into(), vec![url].into());
74 }
75
76 if !config.contains_key("start_proposing_view") {
82 config.insert("start_proposing_view".into(), 9007199254740991u64.into());
83 }
84 if !config.contains_key("stop_proposing_view") {
85 config.insert("stop_proposing_view".into(), 0.into());
86 }
87 if !config.contains_key("start_voting_view") {
88 config.insert("start_voting_view".into(), 9007199254740991u64.into());
89 }
90 if !config.contains_key("stop_voting_view") {
91 config.insert("stop_voting_view".into(), 0.into());
92 }
93 if !config.contains_key("start_proposing_time") {
94 config.insert("start_proposing_time".into(), 9007199254740991u64.into());
95 }
96 if !config.contains_key("stop_proposing_time") {
97 config.insert("stop_proposing_time".into(), 0.into());
98 }
99 if !config.contains_key("start_voting_time") {
100 config.insert("start_voting_time".into(), 9007199254740991u64.into());
101 }
102 if !config.contains_key("stop_voting_time") {
103 config.insert("stop_voting_time".into(), 0.into());
104 }
105
106 if !config.contains_key("epoch_height") {
109 config.insert("epoch_height".into(), 0.into());
110 }
111
112 if !config.contains_key("drb_difficulty") {
115 config.insert("drb_difficulty".into(), 0.into());
116 }
117 if !config.contains_key("drb_upgrade_difficulty") {
118 config.insert("drb_upgrade_difficulty".into(), 0.into());
119 }
120
121 if !config.contains_key("da_committees") {
123 config.insert("da_committees".into(), serde_json::json!([]));
124 }
125
126 Ok(network_config)
127}
128
129#[async_trait]
130pub trait ChainConfigPersistence: Sized + Send + Sync {
131 async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()>;
132}
133
134#[cfg(test)]
135mod tests {
136 use std::{cmp::max, collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration};
137
138 use alloy::{
139 network::EthereumWallet,
140 primitives::{Address, U256},
141 providers::{Provider, ProviderBuilder, ext::AnvilApi},
142 };
143 use anyhow::bail;
144 use async_lock::{Mutex, RwLock};
145 use async_trait::async_trait;
146 use committable::{Commitment, Committable};
147 use espresso_contract_deployer::{
148 Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS, builder::DeployerArgsBuilder,
149 network_config::light_client_genesis_from_stake_table,
150 };
151 use espresso_types::{
152 Event, L1Client, L1ClientOptions, Leaf, Leaf2, NodeState, PubKey, SeqTypes, ValidatedState,
153 traits::{
154 EventConsumer, EventsPersistenceRead, MembershipPersistence, NullEventConsumer,
155 PersistenceOptions, SequencerPersistence,
156 },
157 v0_3::{AuthenticatedValidator, EventKey, Fetcher, RegisteredValidator, StakeTableEvent},
158 };
159 use futures::{StreamExt, TryStreamExt, future::join_all};
160 use hotshot::{
161 InitializerEpochInfo,
162 types::{BLSPubKey, SignatureKey},
163 };
164 use hotshot_contract_adapter::{
165 sol_types::StakeTableV2::Delegated, stake_table::StakeTableContractVersion,
166 };
167 use hotshot_example_types::node_types::TEST_VERSIONS;
168 use hotshot_query_service::{availability::BlockQueryData, testing::mocks::MOCK_UPGRADE};
169 use hotshot_types::{
170 data::{
171 DaProposal2, EpochNumber, QuorumProposal2, QuorumProposalWrapper, VidCommitment,
172 ViewNumber, ns_table::parse_ns_table, vid_commitment, vid_disperse::AvidMDisperseShare,
173 },
174 event::{EventType, HotShotAction, LeafInfo},
175 light_client::StateKeyPair,
176 message::{Proposal, UpgradeLock, convert_proposal},
177 simple_certificate::{
178 CertificatePair, NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2,
179 UpgradeCertificate,
180 },
181 simple_vote::{NextEpochQuorumData2, QuorumData2, UpgradeProposalData, VersionedVoteData},
182 traits::{EncodeBytes, block_contents::BlockHeader},
183 utils::EpochTransitionIndicator,
184 vid::avidm::{AvidMScheme, init_avidm_param},
185 vote::HasViewNumber,
186 };
187 use indexmap::IndexMap;
188 use staking_cli::demo::{DelegationConfig, StakingTransactions};
189 use surf_disco::Client;
190 use test_utils::reserve_tcp_port;
191 use tide_disco::error::ServerError;
192 use tokio::{spawn, time::sleep};
193 use vbs::version::Version;
194 use versions::{Upgrade, version};
195
196 use crate::{
197 RECENT_STAKE_TABLES_LIMIT, SequencerApiVersion,
198 api::{
199 Options,
200 test_helpers::{STAKE_TABLE_CAPACITY_FOR_TEST, TestNetwork, TestNetworkConfigBuilder},
201 },
202 catchup::NullStateCatchup,
203 testing::{TestConfigBuilder, staking_priv_keys},
204 };
205
206 #[async_trait]
207 pub trait TestablePersistence: SequencerPersistence + MembershipPersistence {
208 type Storage: Sync;
209
210 async fn tmp_storage() -> Self::Storage;
211 fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self>;
212
213 async fn connect(storage: &Self::Storage) -> Self {
214 Self::options(storage).create().await.unwrap()
215 }
216 }
217
218 #[rstest_reuse::template]
219 #[rstest::rstest]
220 #[case(PhantomData::<crate::persistence::sql::Persistence>)]
221 #[case(PhantomData::<crate::persistence::fs::Persistence>)]
222 #[test_log::test(tokio::test(flavor = "multi_thread"))]
223 pub fn persistence_types<P: TestablePersistence>(#[case] _p: PhantomData<P>) {}
224
225 #[derive(Clone, Debug, Default)]
226 struct EventCollector {
227 events: Arc<RwLock<Vec<Event>>>,
228 }
229
230 impl EventCollector {
231 async fn leaf_chain(&self) -> Vec<LeafInfo<SeqTypes>> {
232 self.events
233 .read()
234 .await
235 .iter()
236 .flat_map(|event| {
237 let EventType::Decide { leaf_chain, .. } = &event.event else {
238 panic!("expected decide event, got {event:?}");
239 };
240 leaf_chain.iter().cloned().rev()
241 })
242 .collect::<Vec<_>>()
243 }
244 }
245
246 #[async_trait]
247 impl EventConsumer for EventCollector {
248 async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
249 self.events.write().await.push(event.clone());
250 Ok(())
251 }
252 }
253
254 #[derive(Clone, Copy, Debug)]
255 struct FailConsumer;
256
257 #[async_trait]
258 impl EventConsumer for FailConsumer {
259 async fn handle_event(&self, _: &Event) -> anyhow::Result<()> {
260 bail!("mock error injection");
261 }
262 }
263
264 #[rstest_reuse::apply(persistence_types)]
265 pub async fn test_voted_view<P: TestablePersistence>(_p: PhantomData<P>) {
266 let tmp = P::tmp_storage().await;
267 let storage = P::connect(&tmp).await;
268
269 assert_eq!(storage.load_latest_acted_view().await.unwrap(), None);
271
272 let view1 = ViewNumber::genesis();
274 storage
275 .record_action(view1, None, HotShotAction::Vote)
276 .await
277 .unwrap();
278 assert_eq!(
279 storage.load_latest_acted_view().await.unwrap().unwrap(),
280 view1
281 );
282
283 let view2 = view1 + 1;
285 storage
286 .record_action(view2, None, HotShotAction::Vote)
287 .await
288 .unwrap();
289 assert_eq!(
290 storage.load_latest_acted_view().await.unwrap().unwrap(),
291 view2
292 );
293
294 storage
296 .record_action(view1, None, HotShotAction::Vote)
297 .await
298 .unwrap();
299 assert_eq!(
300 storage.load_latest_acted_view().await.unwrap().unwrap(),
301 view2
302 );
303 }
304
305 #[rstest_reuse::apply(persistence_types)]
306 pub async fn test_restart_view<P: TestablePersistence>(_p: PhantomData<P>) {
307 let tmp = P::tmp_storage().await;
308 let storage = P::connect(&tmp).await;
309
310 assert_eq!(storage.load_restart_view().await.unwrap(), None);
312
313 let view1 = ViewNumber::genesis();
315 storage
316 .record_action(view1, None, HotShotAction::Vote)
317 .await
318 .unwrap();
319 assert_eq!(
320 storage.load_restart_view().await.unwrap().unwrap(),
321 view1 + 1
322 );
323
324 let view2 = view1 + 1;
326 storage
327 .record_action(view2, None, HotShotAction::Vote)
328 .await
329 .unwrap();
330 assert_eq!(
331 storage.load_restart_view().await.unwrap().unwrap(),
332 view2 + 1
333 );
334
335 storage
337 .record_action(view1, None, HotShotAction::Vote)
338 .await
339 .unwrap();
340 assert_eq!(
341 storage.load_restart_view().await.unwrap().unwrap(),
342 view2 + 1
343 );
344
345 storage
347 .record_action(view2 + 1, None, HotShotAction::Propose)
348 .await
349 .unwrap();
350 assert_eq!(
351 storage.load_restart_view().await.unwrap().unwrap(),
352 view2 + 1
353 );
354
355 storage
357 .record_action(view2 + 1, None, HotShotAction::TimeoutVote)
358 .await
359 .unwrap();
360 assert_eq!(
361 storage.load_restart_view().await.unwrap().unwrap(),
362 view2 + 1
363 );
364 }
365
366 #[rstest_reuse::apply(persistence_types)]
367 pub async fn test_store_drb_input<P: TestablePersistence>(_p: PhantomData<P>) {
368 use hotshot_types::drb::DrbInput;
369
370 let tmp = P::tmp_storage().await;
371 let storage = P::connect(&tmp).await;
372 let difficulty_level = 10;
373
374 if storage.load_drb_input(10).await.is_ok() {
376 panic!("unexpected nonempty drb_input");
377 }
378
379 let drb_input_1 = DrbInput {
380 epoch: 10,
381 iteration: 10,
382 value: [0u8; 32],
383 difficulty_level,
384 };
385
386 let drb_input_2 = DrbInput {
387 epoch: 10,
388 iteration: 20,
389 value: [0u8; 32],
390 difficulty_level,
391 };
392
393 let drb_input_3 = DrbInput {
394 epoch: 10,
395 iteration: 30,
396 value: [0u8; 32],
397 difficulty_level,
398 };
399
400 let _ = storage.store_drb_input(drb_input_1.clone()).await;
401
402 assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_1);
403
404 let _ = storage.store_drb_input(drb_input_3.clone()).await;
405
406 assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
408
409 let _ = storage.store_drb_input(drb_input_2.clone()).await;
410
411 assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
413 }
414
415 #[rstest_reuse::apply(persistence_types)]
416 pub async fn test_epoch_info<P: TestablePersistence>(_p: PhantomData<P>) {
417 let tmp = P::tmp_storage().await;
418 let storage = P::connect(&tmp).await;
419
420 assert_eq!(storage.load_start_epoch_info().await.unwrap(), Vec::new());
422
423 storage
425 .store_drb_result(EpochNumber::new(1), [1; 32])
426 .await
427 .unwrap();
428 assert_eq!(
429 storage.load_start_epoch_info().await.unwrap(),
430 vec![InitializerEpochInfo::<SeqTypes> {
431 epoch: EpochNumber::new(1),
432 drb_result: [1; 32],
433 block_header: None,
434 }]
435 );
436
437 storage
439 .store_drb_result(EpochNumber::new(2), [3; 32])
440 .await
441 .unwrap();
442 assert_eq!(
443 storage.load_start_epoch_info().await.unwrap(),
444 vec![
445 InitializerEpochInfo::<SeqTypes> {
446 epoch: EpochNumber::new(1),
447 drb_result: [1; 32],
448 block_header: None,
449 },
450 InitializerEpochInfo::<SeqTypes> {
451 epoch: EpochNumber::new(2),
452 drb_result: [3; 32],
453 block_header: None,
454 }
455 ]
456 );
457
458 let instance_state = NodeState::mock();
460 let validated_state = hotshot_types::traits::ValidatedState::genesis(&instance_state).0;
461 let leaf: Leaf2 = Leaf::genesis(&validated_state, &instance_state, MOCK_UPGRADE.base)
462 .await
463 .into();
464 let header = leaf.block_header().clone();
465
466 storage
468 .store_epoch_root(EpochNumber::new(1), header.clone())
469 .await
470 .unwrap();
471 assert_eq!(
472 storage.load_start_epoch_info().await.unwrap(),
473 vec![
474 InitializerEpochInfo::<SeqTypes> {
475 epoch: EpochNumber::new(1),
476 drb_result: [1; 32],
477 block_header: Some(header.clone()),
478 },
479 InitializerEpochInfo::<SeqTypes> {
480 epoch: EpochNumber::new(2),
481 drb_result: [3; 32],
482 block_header: None,
483 }
484 ]
485 );
486
487 let total_epochs = RECENT_STAKE_TABLES_LIMIT + 10;
489 for i in 0..total_epochs {
490 let epoch = EpochNumber::new(i);
491 let drb = [i as u8; 32];
492 storage
493 .store_drb_result(epoch, drb)
494 .await
495 .unwrap_or_else(|_| panic!("Failed to store DRB result for epoch {i}"));
496 }
497
498 let results = storage.load_start_epoch_info().await.unwrap();
499
500 assert_eq!(
502 results.len(),
503 RECENT_STAKE_TABLES_LIMIT as usize,
504 "Should return only the most recent {RECENT_STAKE_TABLES_LIMIT} epochs",
505 );
506
507 for (i, info) in results.iter().enumerate() {
508 let expected_epoch =
509 EpochNumber::new(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64);
510 let expected_drb = [(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64) as u8; 32];
511 assert_eq!(info.epoch, expected_epoch, "invalid epoch at index {i}",);
512 assert_eq!(info.drb_result, expected_drb, "invalid DRB at index {i}",);
513 assert!(info.block_header.is_none(), "Expected no block header");
514 }
515 }
516
517 fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
518 LeafInfo {
519 leaf,
520 vid_share: None,
521 state: Default::default(),
522 delta: None,
523 state_cert: None,
524 }
525 }
526
527 #[rstest_reuse::apply(persistence_types)]
528 pub async fn test_append_and_decide<P: TestablePersistence>(_p: PhantomData<P>) {
529 let tmp = P::tmp_storage().await;
530 let storage = P::connect(&tmp).await;
531
532 assert_eq!(
534 storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
535 None
536 );
537
538 let leaf: Leaf2 = Leaf2::genesis(
539 &ValidatedState::default(),
540 &NodeState::mock(),
541 TEST_VERSIONS.test.base,
542 )
543 .await;
544 let leaf_payload = leaf.block_payload().unwrap();
545 let leaf_payload_bytes_arc = leaf_payload.encode();
546
547 let avidm_param = init_avidm_param(2).unwrap();
548 let weights = vec![1u32; 2];
549
550 let ns_table = parse_ns_table(
551 leaf_payload.byte_len().as_usize(),
552 &leaf_payload.ns_table().encode(),
553 );
554 let (payload_commitment, shares) =
555 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
556 .unwrap();
557
558 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
559 let signature = PubKey::sign(&privkey, &[]).unwrap();
560 let mut vid = AvidMDisperseShare::<SeqTypes> {
561 view_number: ViewNumber::new(0),
562 payload_commitment,
563 share: shares[0].clone(),
564 recipient_key: pubkey,
565 epoch: Some(EpochNumber::new(0)),
566 target_epoch: Some(EpochNumber::new(0)),
567 common: avidm_param,
568 };
569 let mut quorum_proposal = Proposal {
570 data: QuorumProposalWrapper::<SeqTypes> {
571 proposal: QuorumProposal2::<SeqTypes> {
572 epoch: None,
573 block_header: leaf.block_header().clone(),
574 view_number: ViewNumber::genesis(),
575 justify_qc: QuorumCertificate2::genesis(
576 &ValidatedState::default(),
577 &NodeState::mock(),
578 TEST_VERSIONS.test,
579 )
580 .await,
581 upgrade_certificate: None,
582 view_change_evidence: None,
583 next_drb_result: None,
584 next_epoch_justify_qc: None,
585 state_cert: None,
586 },
587 },
588 signature,
589 _pd: Default::default(),
590 };
591
592 let vid_share0 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
593
594 storage.append_vid(&vid_share0).await.unwrap();
595
596 assert_eq!(
597 storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
598 Some(vid_share0.clone())
599 );
600
601 vid.view_number = ViewNumber::new(1);
602
603 let vid_share1 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
604 storage.append_vid(&vid_share1).await.unwrap();
605
606 assert_eq!(
607 storage.load_vid_share(vid.view_number()).await.unwrap(),
608 Some(vid_share1.clone())
609 );
610
611 vid.view_number = ViewNumber::new(2);
612
613 let vid_share2 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
614 storage.append_vid(&vid_share2).await.unwrap();
615
616 assert_eq!(
617 storage.load_vid_share(vid.view_number()).await.unwrap(),
618 Some(vid_share2.clone())
619 );
620
621 vid.view_number = ViewNumber::new(3);
622
623 let vid_share3 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
624 storage.append_vid(&vid_share3).await.unwrap();
625
626 assert_eq!(
627 storage.load_vid_share(vid.view_number()).await.unwrap(),
628 Some(vid_share3.clone())
629 );
630
631 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
632 .expect("Failed to sign block payload");
633
634 let da_proposal_inner = DaProposal2::<SeqTypes> {
635 encoded_transactions: leaf_payload_bytes_arc.clone(),
636 metadata: leaf_payload.ns_table().clone(),
637 view_number: ViewNumber::new(0),
638 epoch: None,
639 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
640 };
641
642 let da_proposal = Proposal {
643 data: da_proposal_inner,
644 signature: block_payload_signature,
645 _pd: Default::default(),
646 };
647
648 let vid_commitment = vid_commitment(
649 &leaf_payload_bytes_arc,
650 &leaf.block_header().metadata().encode(),
651 2,
652 TEST_VERSIONS.test.base,
653 );
654
655 storage
656 .append_da2(&da_proposal, vid_commitment)
657 .await
658 .unwrap();
659
660 assert_eq!(
661 storage.load_da_proposal(ViewNumber::new(0)).await.unwrap(),
662 Some(da_proposal.clone())
663 );
664
665 let mut da_proposal1 = da_proposal.clone();
666 da_proposal1.data.view_number = ViewNumber::new(1);
667 storage
668 .append_da2(&da_proposal1.clone(), vid_commitment)
669 .await
670 .unwrap();
671
672 assert_eq!(
673 storage
674 .load_da_proposal(da_proposal1.data.view_number)
675 .await
676 .unwrap(),
677 Some(da_proposal1.clone())
678 );
679
680 let mut da_proposal2 = da_proposal1.clone();
681 da_proposal2.data.view_number = ViewNumber::new(2);
682 storage
683 .append_da2(&da_proposal2.clone(), vid_commitment)
684 .await
685 .unwrap();
686
687 assert_eq!(
688 storage
689 .load_da_proposal(da_proposal2.data.view_number)
690 .await
691 .unwrap(),
692 Some(da_proposal2.clone())
693 );
694
695 let mut da_proposal3 = da_proposal2.clone();
696 da_proposal3.data.view_number = ViewNumber::new(3);
697 storage
698 .append_da2(&da_proposal3.clone(), vid_commitment)
699 .await
700 .unwrap();
701
702 assert_eq!(
703 storage
704 .load_da_proposal(da_proposal3.data.view_number)
705 .await
706 .unwrap(),
707 Some(da_proposal3.clone())
708 );
709
710 let quorum_proposal1 = quorum_proposal.clone();
711
712 storage
713 .append_quorum_proposal2(&quorum_proposal1)
714 .await
715 .unwrap();
716
717 assert_eq!(
718 storage.load_quorum_proposals().await.unwrap(),
719 BTreeMap::from_iter([(ViewNumber::genesis(), quorum_proposal1.clone())])
720 );
721
722 quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
723 let quorum_proposal2 = quorum_proposal.clone();
724 storage
725 .append_quorum_proposal2(&quorum_proposal2)
726 .await
727 .unwrap();
728
729 assert_eq!(
730 storage.load_quorum_proposals().await.unwrap(),
731 BTreeMap::from_iter([
732 (ViewNumber::genesis(), quorum_proposal1.clone()),
733 (ViewNumber::new(1), quorum_proposal2.clone())
734 ])
735 );
736
737 quorum_proposal.data.proposal.view_number = ViewNumber::new(2);
738 quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(1);
739 let quorum_proposal3 = quorum_proposal.clone();
740 storage
741 .append_quorum_proposal2(&quorum_proposal3)
742 .await
743 .unwrap();
744
745 assert_eq!(
746 storage.load_quorum_proposals().await.unwrap(),
747 BTreeMap::from_iter([
748 (ViewNumber::genesis(), quorum_proposal1.clone()),
749 (ViewNumber::new(1), quorum_proposal2.clone()),
750 (ViewNumber::new(2), quorum_proposal3.clone())
751 ])
752 );
753
754 quorum_proposal.data.proposal.view_number = ViewNumber::new(3);
755 quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(2);
756
757 let quorum_proposal4 = quorum_proposal.clone();
759 storage
760 .append_quorum_proposal2(&quorum_proposal4)
761 .await
762 .unwrap();
763
764 assert_eq!(
765 storage.load_quorum_proposals().await.unwrap(),
766 BTreeMap::from_iter([
767 (ViewNumber::genesis(), quorum_proposal1.clone()),
768 (ViewNumber::new(1), quorum_proposal2.clone()),
769 (ViewNumber::new(2), quorum_proposal3.clone()),
770 (ViewNumber::new(3), quorum_proposal4.clone())
771 ])
772 );
773
774 let leaves = [
777 Leaf2::from_quorum_proposal(&quorum_proposal1.data),
778 Leaf2::from_quorum_proposal(&quorum_proposal2.data),
779 Leaf2::from_quorum_proposal(&quorum_proposal3.data),
780 Leaf2::from_quorum_proposal(&quorum_proposal4.data),
781 ];
782 let mut final_qc = leaves[3].justify_qc();
783 final_qc.view_number += 1;
784 final_qc.data.leaf_commit = Committable::commit(&leaf);
785 let qcs = [
786 leaves[1].justify_qc(),
787 leaves[2].justify_qc(),
788 leaves[3].justify_qc(),
789 final_qc,
790 ];
791
792 assert_eq!(
793 storage.load_anchor_view().await.unwrap(),
794 ViewNumber::genesis()
795 );
796
797 let consumer = EventCollector::default();
798 let leaf_chain = leaves
799 .iter()
800 .take(3)
801 .map(|leaf| leaf_info(leaf.clone()))
802 .zip(&qcs)
803 .collect::<Vec<_>>();
804 tracing::info!(?leaf_chain, "decide view 2");
805 storage
806 .append_decided_leaves(
807 ViewNumber::new(2),
808 leaf_chain
809 .iter()
810 .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change((*qc).clone()))),
811 None,
812 &consumer,
813 )
814 .await
815 .unwrap();
816 assert_eq!(
817 storage.load_anchor_view().await.unwrap(),
818 ViewNumber::new(2)
819 );
820
821 for i in 0..=2 {
822 assert_eq!(
823 storage.load_da_proposal(ViewNumber::new(i)).await.unwrap(),
824 None
825 );
826
827 assert_eq!(
828 storage.load_vid_share(ViewNumber::new(i)).await.unwrap(),
829 None
830 );
831 }
832
833 assert_eq!(
834 storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
835 Some(da_proposal3)
836 );
837
838 assert_eq!(
839 storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
840 Some(convert_proposal(vid_share3.clone()))
841 );
842
843 let proposals = storage.load_quorum_proposals().await.unwrap();
844 assert_eq!(
845 proposals,
846 BTreeMap::from_iter([(ViewNumber::new(3), quorum_proposal4)])
847 );
848
849 for (leaf, info) in leaves.iter().zip(consumer.leaf_chain().await.iter()) {
851 assert_eq!(info.leaf, *leaf);
852 let decided_vid_share = info.vid_share.as_ref().unwrap();
853 assert_eq!(decided_vid_share.view_number(), leaf.view_number());
854 }
855
856 assert_eq!(
858 storage.load_anchor_leaf().await.unwrap(),
859 Some((leaves[2].clone(), qcs[2].clone()))
860 );
861 assert_eq!(
862 storage.load_anchor_view().await.unwrap(),
863 leaves[2].view_number()
864 );
865
866 let consumer = EventCollector::default();
868 tracing::info!(leaf = ?leaves[3], qc = ?qcs[3], "decide view 3");
869 storage
870 .append_decided_leaves(
871 ViewNumber::new(3),
872 vec![(
873 &leaf_info(leaves[3].clone()),
874 CertificatePair::non_epoch_change(qcs[3].clone()),
875 )],
876 None,
877 &consumer,
878 )
879 .await
880 .unwrap();
881 assert_eq!(
882 storage.load_anchor_view().await.unwrap(),
883 ViewNumber::new(3)
884 );
885
886 let events = consumer.events.read().await;
888 assert_eq!(events.len(), 1);
889 assert_eq!(events[0].view_number, ViewNumber::new(3));
890 let EventType::Decide {
891 committing_qc,
892 leaf_chain,
893 ..
894 } = &events[0].event
895 else {
896 panic!("expected decide event, got {:?}", events[0]);
897 };
898 assert_eq!(*committing_qc.qc(), qcs[3]);
899 assert_eq!(leaf_chain.len(), 1);
900 let info = &leaf_chain[0];
901 assert_eq!(info.leaf, leaves[3]);
902
903 assert_eq!(
905 storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
906 None
907 );
908
909 assert_eq!(
910 storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
911 None
912 );
913 assert_eq!(
914 storage.load_quorum_proposals().await.unwrap(),
915 BTreeMap::new()
916 );
917 }
918
919 #[rstest_reuse::apply(persistence_types)]
920 pub async fn test_upgrade_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
921 let tmp = P::tmp_storage().await;
922 let storage = P::connect(&tmp).await;
923
924 assert_eq!(storage.load_upgrade_certificate().await.unwrap(), None);
926
927 let upgrade_data = UpgradeProposalData {
928 old_version: Version { major: 0, minor: 1 },
929 new_version: Version { major: 1, minor: 0 },
930 decide_by: ViewNumber::genesis(),
931 new_version_hash: Default::default(),
932 old_version_last_view: ViewNumber::genesis(),
933 new_version_first_view: ViewNumber::genesis(),
934 };
935
936 let decide_upgrade_certificate = UpgradeCertificate::<SeqTypes>::new(
937 upgrade_data.clone(),
938 upgrade_data.commit(),
939 ViewNumber::genesis(),
940 Default::default(),
941 Default::default(),
942 );
943 let res = storage
944 .store_upgrade_certificate(Some(decide_upgrade_certificate.clone()))
945 .await;
946 assert!(res.is_ok());
947
948 let res = storage.load_upgrade_certificate().await.unwrap();
949 let view_number = res.unwrap().view_number;
950 assert_eq!(view_number, ViewNumber::genesis());
951
952 let new_view_number_for_certificate = ViewNumber::new(50);
953 let mut new_upgrade_certificate = decide_upgrade_certificate.clone();
954 new_upgrade_certificate.view_number = new_view_number_for_certificate;
955
956 let res = storage
957 .store_upgrade_certificate(Some(new_upgrade_certificate.clone()))
958 .await;
959 assert!(res.is_ok());
960
961 let res = storage.load_upgrade_certificate().await.unwrap();
962 let view_number = res.unwrap().view_number;
963 assert_eq!(view_number, new_view_number_for_certificate);
964 }
965
966 #[rstest_reuse::apply(persistence_types)]
967 pub async fn test_next_epoch_quorum_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
968 let tmp = P::tmp_storage().await;
969 let storage = P::connect(&tmp).await;
970
971 assert_eq!(
973 storage.load_next_epoch_quorum_certificate().await.unwrap(),
974 None
975 );
976
977 let upgrade_lock = UpgradeLock::<SeqTypes>::new(TEST_VERSIONS.test);
978
979 let genesis_view = ViewNumber::genesis();
980
981 let leaf = Leaf2::genesis(
982 &ValidatedState::default(),
983 &NodeState::default(),
984 TEST_VERSIONS.test.base,
985 )
986 .await;
987 let data: NextEpochQuorumData2<SeqTypes> = QuorumData2 {
988 leaf_commit: leaf.commit(),
989 epoch: Some(EpochNumber::new(1)),
990 block_number: Some(leaf.height()),
991 }
992 .into();
993
994 let versioned_data =
995 VersionedVoteData::new_infallible(data.clone(), genesis_view, &upgrade_lock);
996
997 let bytes: [u8; 32] = versioned_data.commit().into();
998
999 let next_epoch_qc = NextEpochQuorumCertificate2::new(
1000 data,
1001 Commitment::from_raw(bytes),
1002 genesis_view,
1003 None,
1004 PhantomData,
1005 );
1006
1007 let res = storage
1008 .store_next_epoch_quorum_certificate(next_epoch_qc.clone())
1009 .await;
1010 assert!(res.is_ok());
1011
1012 let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
1013 let view_number = res.unwrap().view_number;
1014 assert_eq!(view_number, ViewNumber::genesis());
1015
1016 let new_view_number_for_qc = ViewNumber::new(50);
1017 let mut new_qc = next_epoch_qc.clone();
1018 new_qc.view_number = new_view_number_for_qc;
1019
1020 let res = storage
1021 .store_next_epoch_quorum_certificate(new_qc.clone())
1022 .await;
1023 assert!(res.is_ok());
1024
1025 let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
1026 let view_number = res.unwrap().view_number;
1027 assert_eq!(view_number, new_view_number_for_qc);
1028 }
1029
1030 #[rstest_reuse::apply(persistence_types)]
1031 pub async fn test_decide_with_failing_event_consumer<P: TestablePersistence>(
1032 _p: PhantomData<P>,
1033 ) {
1034 let tmp = P::tmp_storage().await;
1035 let storage = P::connect(&tmp).await;
1036
1037 let mut chain = vec![];
1039
1040 let leaf: Leaf2 = Leaf::genesis(
1041 &ValidatedState::default(),
1042 &NodeState::mock(),
1043 MOCK_UPGRADE.base,
1044 )
1045 .await
1046 .into();
1047 let leaf_payload = leaf.block_payload().unwrap();
1048 let leaf_payload_bytes_arc = leaf_payload.encode();
1049 let avidm_param = init_avidm_param(2).unwrap();
1050 let weights = vec![1u32; 2];
1051 let ns_table = parse_ns_table(
1052 leaf_payload.byte_len().as_usize(),
1053 &leaf_payload.ns_table().encode(),
1054 );
1055 let (payload_commitment, shares) =
1056 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
1057 .unwrap();
1058
1059 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
1060 let mut vid = AvidMDisperseShare::<SeqTypes> {
1061 view_number: ViewNumber::new(0),
1062 payload_commitment,
1063 share: shares[0].clone(),
1064 recipient_key: pubkey,
1065 epoch: Some(EpochNumber::new(0)),
1066 target_epoch: Some(EpochNumber::new(0)),
1067 common: avidm_param,
1068 }
1069 .to_proposal(&privkey)
1070 .unwrap()
1071 .clone();
1072 let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1073 proposal: QuorumProposal2::<SeqTypes> {
1074 block_header: leaf.block_header().clone(),
1075 view_number: ViewNumber::genesis(),
1076 justify_qc: QuorumCertificate::genesis(
1077 &ValidatedState::default(),
1078 &NodeState::mock(),
1079 TEST_VERSIONS.test,
1080 )
1081 .await
1082 .to_qc2(),
1083 upgrade_certificate: None,
1084 view_change_evidence: None,
1085 next_drb_result: None,
1086 next_epoch_justify_qc: None,
1087 epoch: None,
1088 state_cert: None,
1089 },
1090 };
1091 let mut qc = QuorumCertificate2::genesis(
1092 &ValidatedState::default(),
1093 &NodeState::mock(),
1094 TEST_VERSIONS.test,
1095 )
1096 .await;
1097
1098 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
1099 .expect("Failed to sign block payload");
1100 let mut da_proposal = Proposal {
1101 data: DaProposal2::<SeqTypes> {
1102 encoded_transactions: leaf_payload_bytes_arc.clone(),
1103 metadata: leaf_payload.ns_table().clone(),
1104 view_number: ViewNumber::new(0),
1105 epoch: Some(EpochNumber::new(0)),
1106 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1107 },
1108 signature: block_payload_signature,
1109 _pd: Default::default(),
1110 };
1111
1112 let vid_commitment = vid_commitment(
1113 &leaf_payload_bytes_arc,
1114 &leaf.block_header().metadata().encode(),
1115 2,
1116 TEST_VERSIONS.test.base,
1117 );
1118
1119 for i in 0..4 {
1120 quorum_proposal.proposal.view_number = ViewNumber::new(i);
1121 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
1122 qc.view_number = leaf.view_number();
1123 qc.data.leaf_commit = Committable::commit(&leaf);
1124 vid.data.view_number = leaf.view_number();
1125 da_proposal.data.view_number = leaf.view_number();
1126 chain.push((leaf.clone(), qc.clone(), vid.clone(), da_proposal.clone()));
1127 }
1128
1129 for (_, _, vid, da) in &chain {
1131 tracing::info!(?da, ?vid, "insert proposal");
1132 storage.append_da2(da, vid_commitment).await.unwrap();
1133 storage
1134 .append_vid(&convert_proposal(vid.clone()))
1135 .await
1136 .unwrap();
1137 }
1138
1139 let leaf_chain = chain
1141 .iter()
1142 .take(2)
1143 .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1144 .collect::<Vec<_>>();
1145 tracing::info!("decide with event handling failure");
1146 storage
1147 .append_decided_leaves(
1148 ViewNumber::new(1),
1149 leaf_chain
1150 .iter()
1151 .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))),
1152 None,
1153 &FailConsumer,
1154 )
1155 .await
1156 .unwrap();
1157 for i in 0..4 {
1159 tracing::info!(i, "check proposal availability");
1160 assert!(
1161 storage
1162 .load_vid_share(ViewNumber::new(i))
1163 .await
1164 .unwrap()
1165 .is_some()
1166 );
1167 assert!(
1168 storage
1169 .load_da_proposal(ViewNumber::new(i))
1170 .await
1171 .unwrap()
1172 .is_some()
1173 );
1174 }
1175 tracing::info!("check anchor leaf updated");
1176 assert_eq!(
1177 storage
1178 .load_anchor_leaf()
1179 .await
1180 .unwrap()
1181 .unwrap()
1182 .0
1183 .view_number(),
1184 ViewNumber::new(1)
1185 );
1186 assert_eq!(
1187 storage.load_anchor_view().await.unwrap(),
1188 ViewNumber::new(1)
1189 );
1190
1191 let consumer = EventCollector::default();
1194 let leaf_chain = chain
1195 .iter()
1196 .skip(2)
1197 .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1198 .collect::<Vec<_>>();
1199 tracing::info!("decide successfully");
1200 storage
1201 .append_decided_leaves(
1202 ViewNumber::new(3),
1203 leaf_chain
1204 .iter()
1205 .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))),
1206 None,
1207 &consumer,
1208 )
1209 .await
1210 .unwrap();
1211 for i in 0..4 {
1213 tracing::info!(i, "check proposal garbage collected");
1214 assert!(
1215 storage
1216 .load_vid_share(ViewNumber::new(i))
1217 .await
1218 .unwrap()
1219 .is_none()
1220 );
1221 assert!(
1222 storage
1223 .load_da_proposal(ViewNumber::new(i))
1224 .await
1225 .unwrap()
1226 .is_none()
1227 );
1228 }
1229 tracing::info!("check anchor leaf updated");
1230 assert_eq!(
1231 storage
1232 .load_anchor_leaf()
1233 .await
1234 .unwrap()
1235 .unwrap()
1236 .0
1237 .view_number(),
1238 ViewNumber::new(3)
1239 );
1240 assert_eq!(
1241 storage.load_anchor_view().await.unwrap(),
1242 ViewNumber::new(3)
1243 );
1244
1245 tracing::info!("check decide event");
1247 let leaf_chain = consumer.leaf_chain().await;
1248 assert_eq!(leaf_chain.len(), 4, "{leaf_chain:#?}");
1249 for ((leaf, ..), info) in chain.iter().zip(leaf_chain.iter()) {
1250 assert_eq!(info.leaf, *leaf);
1251 let decided_vid_share = info.vid_share.as_ref().unwrap();
1252 assert_eq!(decided_vid_share.view_number(), leaf.view_number());
1253 assert!(info.leaf.block_payload().is_some());
1254 }
1255 }
1256
1257 #[rstest_reuse::apply(persistence_types)]
1258 pub async fn test_pruning<P: TestablePersistence>(_p: PhantomData<P>) {
1259 let tmp = P::tmp_storage().await;
1260
1261 let mut options = P::options(&tmp);
1262 options.set_view_retention(1);
1263 let storage = options.create().await.unwrap();
1264
1265 let leaf = Leaf::genesis(
1267 &ValidatedState::default(),
1268 &NodeState::mock(),
1269 MOCK_UPGRADE.base,
1270 )
1271 .await;
1272 let leaf_payload = leaf.block_payload().unwrap();
1273 let leaf_payload_bytes_arc = leaf_payload.encode();
1274 let avidm_param = init_avidm_param(2).unwrap();
1275 let weights = vec![1u32; 2];
1276
1277 let ns_table = parse_ns_table(
1278 leaf_payload.byte_len().as_usize(),
1279 &leaf_payload.ns_table().encode(),
1280 );
1281 let (payload_commitment, shares) =
1282 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
1283 .unwrap();
1284
1285 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
1286 let vid_share = convert_proposal(
1287 AvidMDisperseShare::<SeqTypes> {
1288 view_number: ViewNumber::new(0),
1289 payload_commitment,
1290 share: shares[0].clone(),
1291 recipient_key: pubkey,
1292 epoch: None,
1293 target_epoch: None,
1294 common: avidm_param,
1295 }
1296 .to_proposal(&privkey)
1297 .unwrap()
1298 .clone(),
1299 );
1300
1301 let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1302 proposal: QuorumProposal2::<SeqTypes> {
1303 block_header: leaf.block_header().clone(),
1304 view_number: ViewNumber::genesis(),
1305 justify_qc: QuorumCertificate::genesis(
1306 &ValidatedState::default(),
1307 &NodeState::mock(),
1308 TEST_VERSIONS.test,
1309 )
1310 .await
1311 .to_qc2(),
1312 upgrade_certificate: None,
1313 view_change_evidence: None,
1314 next_drb_result: None,
1315 next_epoch_justify_qc: None,
1316 epoch: None,
1317 state_cert: None,
1318 },
1319 };
1320 let quorum_proposal_signature =
1321 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
1322 .expect("Failed to sign quorum proposal");
1323 let quorum_proposal = Proposal {
1324 data: quorum_proposal,
1325 signature: quorum_proposal_signature,
1326 _pd: Default::default(),
1327 };
1328
1329 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
1330 .expect("Failed to sign block payload");
1331 let da_proposal = Proposal {
1332 data: DaProposal2::<SeqTypes> {
1333 encoded_transactions: leaf_payload_bytes_arc,
1334 metadata: leaf_payload.ns_table().clone(),
1335 view_number: ViewNumber::new(0),
1336 epoch: None,
1337 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1338 },
1339 signature: block_payload_signature,
1340 _pd: Default::default(),
1341 };
1342
1343 storage
1344 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
1345 .await
1346 .unwrap();
1347 storage.append_vid(&vid_share).await.unwrap();
1348 storage
1349 .append_quorum_proposal2(&quorum_proposal)
1350 .await
1351 .unwrap();
1352
1353 storage
1355 .append_decided_leaves(ViewNumber::new(1), [], None, &NullEventConsumer)
1356 .await
1357 .unwrap();
1358
1359 assert_eq!(
1362 storage
1363 .load_da_proposal(ViewNumber::new(0))
1364 .await
1365 .unwrap()
1366 .unwrap(),
1367 da_proposal
1368 );
1369 assert_eq!(
1370 storage
1371 .load_vid_share(ViewNumber::new(0))
1372 .await
1373 .unwrap()
1374 .unwrap(),
1375 vid_share
1376 );
1377 assert_eq!(
1378 storage
1379 .load_quorum_proposal(ViewNumber::new(0))
1380 .await
1381 .unwrap(),
1382 quorum_proposal
1383 );
1384
1385 storage
1387 .append_decided_leaves(ViewNumber::new(2), [], None, &NullEventConsumer)
1388 .await
1389 .unwrap();
1390 assert!(
1391 storage
1392 .load_da_proposal(ViewNumber::new(0))
1393 .await
1394 .unwrap()
1395 .is_none()
1396 );
1397 assert!(
1398 storage
1399 .load_vid_share(ViewNumber::new(0))
1400 .await
1401 .unwrap()
1402 .is_none()
1403 );
1404 assert!(
1405 storage
1406 .load_quorum_proposal(ViewNumber::new(0))
1407 .await
1408 .is_err()
1409 );
1410 }
1411
1412 async fn assert_events_eq<P: TestablePersistence>(
1413 persistence: &P,
1414 block: u64,
1415 stake_table_fetcher: &Fetcher,
1416 l1_client: &L1Client,
1417 stake_table_contract: Address,
1418 ) -> anyhow::Result<()> {
1419 let (stored_l1, events) = persistence.load_events(0, block).await?;
1421 assert!(!events.is_empty());
1422 assert!(stored_l1.is_some());
1423 assert!(events.iter().all(|((l1_block, _), _)| *l1_block <= block));
1424 let contract_events = Fetcher::fetch_events_from_contract(
1426 l1_client.clone(),
1427 stake_table_contract,
1428 None,
1429 block,
1430 )
1431 .await?;
1432 assert_eq!(
1433 contract_events, events,
1434 "Events from contract and persistence do not match"
1435 );
1436
1437 let fetched_events = stake_table_fetcher
1439 .fetch_and_store_stake_table_events(stake_table_contract, block)
1440 .await?;
1441 assert_eq!(fetched_events, events);
1442
1443 Ok(())
1444 }
1445
1446 #[rstest_reuse::apply(persistence_types)]
1449 pub async fn test_stake_table_fetching_from_persistence<P: TestablePersistence>(
1450 #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
1451 stake_table_version: StakeTableContractVersion,
1452 _p: PhantomData<P>,
1453 ) -> anyhow::Result<()> {
1454 let epoch_height = 20;
1455
1456 let network_config = TestConfigBuilder::default()
1457 .epoch_height(epoch_height)
1458 .build();
1459
1460 let anvil_provider = network_config.anvil().unwrap();
1461
1462 let query_service_port =
1463 reserve_tcp_port().expect("OS should have ephemeral ports available");
1464 let query_api_options = Options::with_port(query_service_port);
1465
1466 const NODE_COUNT: usize = 2;
1467
1468 let storage = join_all((0..NODE_COUNT).map(|_| P::tmp_storage())).await;
1469 let persistence_options: [_; NODE_COUNT] = storage
1470 .iter()
1471 .map(P::options)
1472 .collect::<Vec<_>>()
1473 .try_into()
1474 .unwrap();
1475
1476 let persistence = persistence_options[0].clone().create().await.unwrap();
1477
1478 let l1_url = network_config.l1_url();
1480
1481 let upgrade = Upgrade::trivial(version(0, 3));
1482
1483 let testnet_config = TestNetworkConfigBuilder::with_num_nodes()
1484 .api_config(query_api_options)
1485 .network_config(network_config.clone())
1486 .persistences(persistence_options.clone())
1487 .pos_hook(
1488 DelegationConfig::MultipleDelegators,
1489 stake_table_version,
1490 upgrade,
1491 )
1492 .await
1493 .expect("Pos deployment failed")
1494 .build();
1495
1496 let test_network = TestNetwork::new(testnet_config, upgrade).await;
1498
1499 let client: Client<ServerError, SequencerApiVersion> = Client::new(
1500 format!("http://localhost:{query_service_port}")
1501 .parse()
1502 .unwrap(),
1503 );
1504 client.connect(None).await;
1505 tracing::info!(query_service_port, "server running");
1506
1507 let _initial_blocks = client
1509 .socket("availability/stream/blocks/0")
1510 .subscribe::<BlockQueryData<SeqTypes>>()
1511 .await
1512 .unwrap()
1513 .take(40)
1514 .try_collect::<Vec<_>>()
1515 .await
1516 .unwrap();
1517 let membership_coordinator = test_network
1519 .server
1520 .consensus()
1521 .read()
1522 .await
1523 .membership_coordinator
1524 .clone();
1525
1526 let l1_client = L1Client::new(vec![l1_url]).unwrap();
1527 let node_state = test_network.server.node_state();
1528 let chain_config = node_state.chain_config;
1529 let stake_table_contract = chain_config.stake_table_contract.unwrap();
1530
1531 let current_membership = membership_coordinator.membership();
1532 {
1533 let membership_state = current_membership.read().await;
1534 let stake_table_fetcher = membership_state.fetcher();
1535
1536 let block1 = anvil_provider
1537 .get_block_number()
1538 .await
1539 .expect("latest l1 block");
1540
1541 assert_events_eq(
1542 &persistence,
1543 block1,
1544 stake_table_fetcher,
1545 &l1_client,
1546 stake_table_contract,
1547 )
1548 .await?;
1549 }
1550 let _epoch_4_blocks = client
1551 .socket("availability/stream/blocks/0")
1552 .subscribe::<BlockQueryData<SeqTypes>>()
1553 .await
1554 .unwrap()
1555 .take(65)
1556 .try_collect::<Vec<_>>()
1557 .await
1558 .unwrap();
1559 let block2 = anvil_provider
1560 .get_block_number()
1561 .await
1562 .expect("latest l1 block");
1563
1564 {
1565 let membership_state = current_membership.read().await;
1566 let stake_table_fetcher = membership_state.fetcher();
1567
1568 assert_events_eq(
1569 &persistence,
1570 block2,
1571 stake_table_fetcher,
1572 &l1_client,
1573 stake_table_contract,
1574 )
1575 .await?;
1576 }
1577 Ok(())
1578 }
1579
1580 #[rstest_reuse::apply(persistence_types)]
1581 pub async fn test_stake_table_background_fetching<P: TestablePersistence>(
1582 #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
1583 stake_table_version: StakeTableContractVersion,
1584 _p: PhantomData<P>,
1585 ) -> anyhow::Result<()> {
1586 use espresso_types::v0_3::ChainConfig;
1587 use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
1588
1589 let blocks_per_epoch = 10;
1590
1591 let network_config = TestConfigBuilder::<1>::default()
1592 .epoch_height(blocks_per_epoch)
1593 .build();
1594
1595 let anvil_provider = network_config.anvil().unwrap();
1596
1597 let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1598 &network_config.hotshot_config().hotshot_stake_table(),
1599 STAKE_TABLE_CAPACITY_FOR_TEST,
1600 )
1601 .unwrap();
1602
1603 let (_, priv_keys): (Vec<_>, Vec<_>) = (0..20)
1604 .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed([1; 32], i as u64))
1605 .unzip();
1606 let state_key_pairs = (0..20)
1607 .map(|i| StateKeyPair::generate_from_seed_indexed([2; 32], i as u64))
1608 .collect::<Vec<_>>();
1609
1610 let validators = staking_priv_keys(&priv_keys, &state_key_pairs, 20);
1611
1612 let deployer = ProviderBuilder::new()
1613 .wallet(EthereumWallet::from(network_config.signer().clone()))
1614 .connect_http(network_config.l1_url().clone());
1615
1616 let mut contracts = Contracts::new();
1617 let args = DeployerArgsBuilder::default()
1618 .deployer(deployer.clone())
1619 .rpc_url(network_config.l1_url().clone())
1620 .mock_light_client(true)
1621 .genesis_lc_state(genesis_state)
1622 .genesis_st_state(genesis_stake)
1623 .blocks_per_epoch(blocks_per_epoch)
1624 .epoch_start_block(1)
1625 .exit_escrow_period(U256::from(max(
1626 blocks_per_epoch * 15 + 100,
1627 DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1628 )))
1629 .multisig_pauser(network_config.signer().address())
1630 .token_name("Espresso".to_string())
1631 .token_symbol("ESP".to_string())
1632 .initial_token_supply(U256::from(3590000000u64))
1633 .ops_timelock_delay(U256::from(0))
1634 .ops_timelock_admin(network_config.signer().address())
1635 .ops_timelock_proposers(vec![network_config.signer().address()])
1636 .ops_timelock_executors(vec![network_config.signer().address()])
1637 .safe_exit_timelock_delay(U256::from(10))
1638 .safe_exit_timelock_admin(network_config.signer().address())
1639 .safe_exit_timelock_proposers(vec![network_config.signer().address()])
1640 .safe_exit_timelock_executors(vec![network_config.signer().address()])
1641 .build()
1642 .unwrap();
1643
1644 match stake_table_version {
1645 StakeTableContractVersion::V1 => args.deploy_to_stake_table_v1(&mut contracts).await,
1646 StakeTableContractVersion::V2 => args.deploy_all(&mut contracts).await,
1647 }
1648 .expect("contracts deployed");
1649
1650 let st_addr = contracts
1651 .address(Contract::StakeTableProxy)
1652 .expect("StakeTableProxy deployed");
1653 let l1_url = network_config.l1_url().clone();
1654
1655 let mut planned_txns = StakingTransactions::create(
1656 l1_url.clone(),
1657 &deployer,
1658 st_addr,
1659 validators,
1660 None,
1661 DelegationConfig::MultipleDelegators,
1662 )
1663 .await
1664 .expect("stake table setup failed");
1665
1666 planned_txns
1667 .apply_prerequisites()
1668 .await
1669 .expect("prerequisites failed");
1670
1671 planned_txns.apply_one().await.expect("send tx failed");
1673
1674 anvil_provider
1676 .anvil_set_interval_mining(1)
1677 .await
1678 .expect("interval mining");
1679
1680 spawn({
1684 async move {
1685 {
1686 while let Some(receipt) =
1687 planned_txns.apply_one().await.expect("send tx failed")
1688 {
1689 tracing::debug!(?receipt, "transaction finalized");
1690 }
1691 }
1692 }
1693 });
1694
1695 let storage = P::tmp_storage().await;
1696 let persistence = P::options(&storage).create().await.unwrap();
1697
1698 let l1_client = L1ClientOptions {
1699 stake_table_update_interval: Duration::from_secs(7),
1700 l1_retry_delay: Duration::from_millis(10),
1701 l1_events_max_block_range: 10000,
1702 ..Default::default()
1703 }
1704 .connect(vec![l1_url])
1705 .unwrap();
1706 l1_client.spawn_tasks().await;
1707
1708 let fetcher = Fetcher::new(
1709 Arc::new(NullStateCatchup::default()),
1710 Arc::new(Mutex::new(persistence.clone())),
1711 l1_client.clone(),
1712 ChainConfig {
1713 stake_table_contract: Some(st_addr),
1714 base_fee: 0.into(),
1715 ..Default::default()
1716 },
1717 );
1718
1719 sleep(Duration::from_secs(20)).await;
1721
1722 fetcher.spawn_update_loop().await;
1723 let mut prev_l1_block = 0;
1724 let mut prev_events_len = 0;
1725 for _i in 0..10 {
1726 tokio::time::sleep(std::time::Duration::from_secs(8)).await;
1730
1731 let block = anvil_provider
1732 .get_block_number()
1733 .await
1734 .expect("latest l1 block");
1735
1736 let (read_offset, persisted_events) = persistence.load_events(0, block).await?;
1737 let read_offset = read_offset.unwrap();
1738 let l1_block = match read_offset {
1739 EventsPersistenceRead::Complete => block,
1740 EventsPersistenceRead::UntilL1Block(block) => block,
1741 };
1742
1743 tracing::info!("{l1_block:?}, persistence events = {persisted_events:?}.");
1744 assert!(persisted_events.len() > prev_events_len);
1745
1746 assert!(l1_block > prev_l1_block, "events not updated");
1747
1748 let contract_events =
1749 Fetcher::fetch_events_from_contract(l1_client.clone(), st_addr, None, l1_block)
1750 .await?;
1751 assert_eq!(persisted_events, contract_events);
1752
1753 prev_l1_block = l1_block;
1754 prev_events_len = persisted_events.len();
1755 }
1756
1757 Ok(())
1758 }
1759
1760 #[rstest_reuse::apply(persistence_types)]
1761 pub async fn test_membership_persistence<P: TestablePersistence>(
1762 _p: PhantomData<P>,
1763 ) -> anyhow::Result<()> {
1764 let tmp = P::tmp_storage().await;
1765 let mut opt = P::options(&tmp);
1766
1767 let storage = opt.create().await.unwrap();
1768
1769 let validator = AuthenticatedValidator::mock();
1770 let mut st = IndexMap::new();
1771 st.insert(validator.account, validator);
1772
1773 storage
1774 .store_stake(EpochNumber::new(10), st.clone(), None, None)
1775 .await?;
1776
1777 let (table, ..) = storage.load_stake(EpochNumber::new(10)).await?.unwrap();
1778 assert_eq!(st, table);
1779
1780 let val2 = AuthenticatedValidator::mock();
1781 let mut st2 = IndexMap::new();
1782 st2.insert(val2.account, val2);
1783 storage
1784 .store_stake(EpochNumber::new(11), st2.clone(), None, None)
1785 .await?;
1786
1787 let tables = storage.load_latest_stake(4).await?.unwrap();
1788 let mut iter = tables.iter();
1789 assert_eq!(
1790 Some(&(EpochNumber::new(11), (st2.clone(), None), None)),
1791 iter.next()
1792 );
1793 assert_eq!(Some(&(EpochNumber::new(10), (st, None), None)), iter.next());
1794 assert_eq!(None, iter.next());
1795
1796 for i in 0..=20 {
1797 storage
1798 .store_stake(EpochNumber::new(i), st2.clone(), None, None)
1799 .await?;
1800 }
1801
1802 let tables = storage.load_latest_stake(5).await?.unwrap();
1803 let mut iter = tables.iter();
1804 assert_eq!(
1805 Some(&(EpochNumber::new(20), (st2.clone(), None), None)),
1806 iter.next()
1807 );
1808 assert_eq!(
1809 Some(&(EpochNumber::new(19), (st2.clone(), None), None)),
1810 iter.next()
1811 );
1812 assert_eq!(
1813 Some(&(EpochNumber::new(18), (st2.clone(), None), None)),
1814 iter.next()
1815 );
1816 assert_eq!(
1817 Some(&(EpochNumber::new(17), (st2.clone(), None), None)),
1818 iter.next()
1819 );
1820 assert_eq!(
1821 Some(&(EpochNumber::new(16), (st2, None), None)),
1822 iter.next()
1823 );
1824 assert_eq!(None, iter.next());
1825
1826 Ok(())
1827 }
1828
1829 #[rstest_reuse::apply(persistence_types)]
1830 pub async fn test_delete_stake_tables<P: TestablePersistence>(
1831 _p: PhantomData<P>,
1832 ) -> anyhow::Result<()> {
1833 let tmp = P::tmp_storage().await;
1834 let mut opt = P::options(&tmp);
1835 let storage = opt.create().await.unwrap();
1836
1837 let event1 = StakeTableEvent::Delegate(Delegated {
1838 delegator: Address::ZERO,
1839 validator: Address::ZERO,
1840 amount: U256::from(100),
1841 });
1842 let event2 = StakeTableEvent::Delegate(Delegated {
1843 delegator: Address::ZERO,
1844 validator: Address::ZERO,
1845 amount: U256::from(200),
1846 });
1847
1848 let l1_block = 42u64;
1849 let events: Vec<(EventKey, StakeTableEvent)> =
1850 vec![((l1_block, 0), event1), ((l1_block, 1), event2)];
1851
1852 storage.store_events(l1_block, events.clone()).await?;
1853
1854 let (read_offset, loaded_events) = storage.load_events(0_u64, l1_block).await?;
1855 assert!(read_offset.is_some());
1856 assert_eq!(loaded_events.len(), 2);
1857 assert_eq!(loaded_events, events);
1858
1859 let v = RegisteredValidator::mock();
1861 let mut vmap = IndexMap::new();
1862 vmap.insert(v.account, v);
1863 storage
1864 .store_all_validators(EpochNumber::new(1), vmap.clone())
1865 .await?;
1866
1867 let loaded = storage
1868 .load_all_validators(EpochNumber::new(1), 0, 10)
1869 .await?;
1870 assert_eq!(loaded.len(), 1);
1871
1872 storage.delete_stake_tables().await?;
1873
1874 let (read_offset, loaded_events) = storage.load_events(0_u64, l1_block).await?;
1876 assert!(read_offset.is_none());
1877 assert!(loaded_events.is_empty());
1878
1879 let loaded = storage
1881 .load_all_validators(EpochNumber::new(1), 0, 10)
1882 .await
1883 .unwrap_or_default();
1884 assert!(loaded.is_empty());
1885
1886 let event3 = StakeTableEvent::Delegate(Delegated {
1887 delegator: Address::ZERO,
1888 validator: Address::ZERO,
1889 amount: U256::from(300),
1890 });
1891 let new_events: Vec<(EventKey, StakeTableEvent)> = vec![((l1_block, 0), event3)];
1892 storage.store_events(l1_block, new_events.clone()).await?;
1893
1894 let (read_offset, loaded_events) = storage.load_events(0_u64, l1_block).await?;
1895 assert!(read_offset.is_some());
1896 assert_eq!(loaded_events.len(), 1);
1897 assert_eq!(loaded_events, new_events);
1898
1899 Ok(())
1900 }
1901
1902 #[rstest_reuse::apply(persistence_types)]
1903 pub async fn test_store_and_load_all_validators<P: TestablePersistence>(
1904 _p: PhantomData<P>,
1905 ) -> anyhow::Result<()> {
1906 let tmp = P::tmp_storage().await;
1907 let mut opt = P::options(&tmp);
1908 let storage = opt.create().await.unwrap();
1909
1910 let mut vmap1 = IndexMap::new();
1911 for _i in 0..25 {
1912 let v = RegisteredValidator::mock();
1913 vmap1.insert(v.account, v);
1914 }
1915 storage
1916 .store_all_validators(EpochNumber::new(10), vmap1.clone())
1917 .await?;
1918
1919 let mut expected_all: Vec<_> = vmap1.clone().into_values().collect();
1920 expected_all.sort_by_key(|v| v.account);
1921
1922 let loaded_all = storage
1924 .load_all_validators(EpochNumber::new(10), 0, 100)
1925 .await?;
1926 assert_eq!(expected_all, loaded_all);
1928
1929 let loaded_first_10 = storage
1931 .load_all_validators(EpochNumber::new(10), 0, 10)
1932 .await?;
1933
1934 assert_eq!(expected_all[..10], loaded_first_10);
1935
1936 let loaded_next_10 = storage
1938 .load_all_validators(EpochNumber::new(10), 10, 10)
1939 .await?;
1940
1941 assert_eq!(expected_all[10..20], loaded_next_10);
1942
1943 let loaded_last_5 = storage
1945 .load_all_validators(EpochNumber::new(10), 20, 10)
1946 .await?;
1947
1948 assert_eq!(expected_all[20..], loaded_last_5);
1949
1950 let loaded_empty = storage
1952 .load_all_validators(EpochNumber::new(10), 100, 10)
1953 .await?;
1954 assert!(loaded_empty.is_empty());
1955
1956 let validator2 = RegisteredValidator::mock();
1958 let mut vmap2 = IndexMap::new();
1959 vmap2.insert(validator2.account, validator2.clone());
1960
1961 storage
1962 .store_all_validators(EpochNumber::new(11), vmap2.clone())
1963 .await?;
1964
1965 let mut expected_epoch11: Vec<_> = vmap2.clone().into_values().collect();
1966 expected_epoch11.sort_by_key(|v| v.account);
1967
1968 let loaded2 = storage
1969 .load_all_validators(EpochNumber::new(11), 0, 100)
1970 .await?;
1971
1972 assert_eq!(expected_epoch11, loaded2);
1973
1974 let loaded1_again = storage
1976 .load_all_validators(EpochNumber::new(10), 0, 100)
1977 .await?;
1978
1979 assert_eq!(expected_all, loaded1_again);
1980
1981 Ok(())
1982 }
1983
1984 #[rstest_reuse::apply(persistence_types)]
1985 pub async fn test_non_consecutive_decide<P: TestablePersistence>(_p: PhantomData<P>) {
1986 let tmp = P::tmp_storage().await;
1987 let storage = P::connect(&tmp).await;
1988
1989 let genesis_leaf: Leaf2 = Leaf2::genesis(
1990 &ValidatedState::default(),
1991 &NodeState::mock(),
1992 TEST_VERSIONS.test.base,
1993 )
1994 .await;
1995 let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1996 proposal: QuorumProposal2::<SeqTypes> {
1997 epoch: None,
1998 block_header: genesis_leaf.block_header().clone(),
1999 view_number: genesis_leaf.view_number(),
2000 justify_qc: QuorumCertificate2::genesis(
2001 &ValidatedState::default(),
2002 &NodeState::mock(),
2003 TEST_VERSIONS.test,
2004 )
2005 .await,
2006 upgrade_certificate: None,
2007 view_change_evidence: None,
2008 next_drb_result: None,
2009 next_epoch_justify_qc: None,
2010 state_cert: None,
2011 },
2012 };
2013
2014 let leaf0 = Leaf2::from_quorum_proposal(&quorum_proposal);
2015
2016 quorum_proposal.proposal.view_number = ViewNumber::new(2);
2017 *quorum_proposal.proposal.block_header.height_mut() = 2;
2018 quorum_proposal.proposal.justify_qc.view_number = ViewNumber::new(1);
2019 let leaf2 = Leaf2::from_quorum_proposal(&quorum_proposal);
2020
2021 let mut qc0 = leaf0.justify_qc();
2022 qc0.data.leaf_commit = Committable::commit(&leaf0);
2023
2024 let mut qc2 = leaf2.justify_qc();
2025 qc2.view_number += 1;
2026 qc2.data.leaf_commit = Committable::commit(&leaf2);
2027
2028 let mut deciding_qc = qc2.clone();
2029 deciding_qc.view_number += 1;
2030
2031 storage
2033 .append_decided_leaves(
2034 ViewNumber::new(0),
2035 [(
2036 &leaf_info(leaf0.clone()),
2037 CertificatePair::non_epoch_change(qc0),
2038 )],
2039 None,
2040 &FailConsumer,
2041 )
2042 .await
2043 .unwrap();
2044
2045 let consumer = EventCollector::default();
2049 storage
2050 .append_decided_leaves(
2051 ViewNumber::new(2),
2052 [(
2053 &leaf_info(leaf2.clone()),
2054 CertificatePair::non_epoch_change(qc2),
2055 )],
2056 Some(Arc::new(CertificatePair::non_epoch_change(
2057 deciding_qc.clone(),
2058 ))),
2059 &consumer,
2060 )
2061 .await
2062 .unwrap();
2063
2064 let events = consumer.events.read().await;
2065 assert_eq!(events.len(), 2);
2066
2067 let EventType::Decide {
2068 leaf_chain: leaf_chain0,
2069 deciding_qc: deciding_qc0,
2070 ..
2071 } = &events[0].event
2072 else {
2073 panic!("expected decide event, got {:?}", events[0].event);
2074 };
2075 assert_eq!(leaf_chain0.len(), 1);
2076 assert_eq!(leaf_chain0[0].leaf, leaf0);
2077 assert_eq!(*deciding_qc0, None);
2078
2079 let EventType::Decide {
2080 leaf_chain: leaf_chain2,
2081 deciding_qc: deciding_qc2,
2082 ..
2083 } = &events[1].event
2084 else {
2085 panic!("expected decide event, got {:?}", events[1].event);
2086 };
2087 assert_eq!(leaf_chain2.len(), 1);
2088 assert_eq!(leaf_chain2[0].leaf, leaf2);
2089 assert_eq!(deciding_qc2.as_ref().unwrap().qc(), &deciding_qc);
2090 }
2091}