espresso_node/
persistence.rs

1//! Sequencer node persistence.
2//!
3//! This module implements the persistence required for a sequencer node to rejoin the network and
4//! resume participating in consensus, in the event that its process crashes or is killed and loses
5//! all in-memory state.
6//!
7//! This is distinct from the query service persistent storage found in the `api` module, which is
8//! an extension that node operators can opt into. This module defines the minimum level of
9//! persistence which is _required_ to run a node.
10
11use std::collections::HashMap;
12
13use alloy::primitives::{Address, U256};
14use anyhow::Context;
15use async_trait::async_trait;
16use espresso_types::{
17    PubKey,
18    v0_3::{ChainConfig, RegisteredValidator},
19};
20
21pub mod fs;
22pub mod no_storage;
23mod persistence_metrics;
24pub mod sql;
25
26/// RegisteredValidator without x25519_key/p2p_addr fields.
27/// Used for migrating data written before x25519 support was added.
28#[derive(serde::Serialize, serde::Deserialize)]
29pub(crate) struct RegisteredValidatorNoX25519 {
30    pub account: Address,
31    pub stake_table_key: PubKey,
32    pub state_ver_key: hotshot_types::light_client::StateVerKey,
33    pub stake: U256,
34    pub commission: u16,
35    pub delegators: HashMap<Address, U256>,
36    pub authenticated: bool,
37}
38
39impl RegisteredValidatorNoX25519 {
40    pub fn migrate(self) -> RegisteredValidator<PubKey> {
41        RegisteredValidator {
42            account: self.account,
43            stake_table_key: self.stake_table_key,
44            state_ver_key: self.state_ver_key,
45            stake: self.stake,
46            commission: self.commission,
47            delegators: self.delegators,
48            authenticated: self.authenticated,
49            x25519_key: None,
50            p2p_addr: None,
51        }
52    }
53}
54
55/// Update a `NetworkConfig` that may have originally been persisted with an old version.
56fn migrate_network_config(
57    mut network_config: serde_json::Value,
58) -> anyhow::Result<serde_json::Value> {
59    let config = network_config
60        .get_mut("config")
61        .context("missing field `config`")?
62        .as_object_mut()
63        .context("`config` must be an object")?;
64
65    if !config.contains_key("builder_urls") {
66        // When multi-builder support was added, the configuration field `builder_url: Url` was
67        // replaced by an array `builder_urls: Vec<Url>`. If the saved config has no `builder_urls`
68        // field, it is older than this change. Populate `builder_urls` with a singleton array
69        // formed from the old value of `builder_url`, and delete the no longer used `builder_url`.
70        let url = config
71            .remove("builder_url")
72            .context("missing field `builder_url`")?;
73        config.insert("builder_urls".into(), vec![url].into());
74    }
75
76    // HotShotConfig was upgraded to include parameters for proposing and voting on upgrades.
77    // Configs which were persisted before this upgrade may be missing these parameters. This
78    // migration initializes them with a default. By default, we use JS MAX_SAFE_INTEGER for the
79    // start parameters so that nodes will never do an upgrade, unless explicitly configured
80    // otherwise.
81    if !config.contains_key("start_proposing_view") {
82        config.insert("start_proposing_view".into(), 9007199254740991u64.into());
83    }
84    if !config.contains_key("stop_proposing_view") {
85        config.insert("stop_proposing_view".into(), 0.into());
86    }
87    if !config.contains_key("start_voting_view") {
88        config.insert("start_voting_view".into(), 9007199254740991u64.into());
89    }
90    if !config.contains_key("stop_voting_view") {
91        config.insert("stop_voting_view".into(), 0.into());
92    }
93    if !config.contains_key("start_proposing_time") {
94        config.insert("start_proposing_time".into(), 9007199254740991u64.into());
95    }
96    if !config.contains_key("stop_proposing_time") {
97        config.insert("stop_proposing_time".into(), 0.into());
98    }
99    if !config.contains_key("start_voting_time") {
100        config.insert("start_voting_time".into(), 9007199254740991u64.into());
101    }
102    if !config.contains_key("stop_voting_time") {
103        config.insert("stop_voting_time".into(), 0.into());
104    }
105
106    // HotShotConfig was upgraded to include an `epoch_height` parameter. Initialize with a default
107    // if missing.
108    if !config.contains_key("epoch_height") {
109        config.insert("epoch_height".into(), 0.into());
110    }
111
112    // HotShotConfig was upgraded to include `drb_difficulty` and `drb_upgrade_difficulty` parameters. Initialize with a default
113    // if missing.
114    if !config.contains_key("drb_difficulty") {
115        config.insert("drb_difficulty".into(), 0.into());
116    }
117    if !config.contains_key("drb_upgrade_difficulty") {
118        config.insert("drb_upgrade_difficulty".into(), 0.into());
119    }
120
121    // HotShotConfig was upgraded to include `da_committeees`. Initialize with an empty `da_committees` if missing.
122    if !config.contains_key("da_committees") {
123        config.insert("da_committees".into(), serde_json::json!([]));
124    }
125
126    Ok(network_config)
127}
128
129#[async_trait]
130pub trait ChainConfigPersistence: Sized + Send + Sync {
131    async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()>;
132}
133
134#[cfg(test)]
135mod tests {
136    use std::{cmp::max, collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration};
137
138    use alloy::{
139        network::EthereumWallet,
140        primitives::{Address, U256},
141        providers::{Provider, ProviderBuilder, ext::AnvilApi},
142    };
143    use anyhow::bail;
144    use async_lock::{Mutex, RwLock};
145    use async_trait::async_trait;
146    use committable::{Commitment, Committable};
147    use espresso_contract_deployer::{
148        Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS, builder::DeployerArgsBuilder,
149        network_config::light_client_genesis_from_stake_table,
150    };
151    use espresso_types::{
152        Event, L1Client, L1ClientOptions, Leaf, Leaf2, NodeState, PubKey, SeqTypes, ValidatedState,
153        traits::{
154            EventConsumer, EventsPersistenceRead, MembershipPersistence, NullEventConsumer,
155            PersistenceOptions, SequencerPersistence,
156        },
157        v0_3::{AuthenticatedValidator, EventKey, Fetcher, RegisteredValidator, StakeTableEvent},
158    };
159    use futures::{StreamExt, TryStreamExt, future::join_all};
160    use hotshot::{
161        InitializerEpochInfo,
162        types::{BLSPubKey, SignatureKey},
163    };
164    use hotshot_contract_adapter::{
165        sol_types::StakeTableV2::Delegated, stake_table::StakeTableContractVersion,
166    };
167    use hotshot_example_types::node_types::TEST_VERSIONS;
168    use hotshot_query_service::{availability::BlockQueryData, testing::mocks::MOCK_UPGRADE};
169    use hotshot_types::{
170        data::{
171            DaProposal2, EpochNumber, QuorumProposal2, QuorumProposalWrapper, VidCommitment,
172            ViewNumber, ns_table::parse_ns_table, vid_commitment, vid_disperse::AvidMDisperseShare,
173        },
174        event::{EventType, HotShotAction, LeafInfo},
175        light_client::StateKeyPair,
176        message::{Proposal, UpgradeLock, convert_proposal},
177        simple_certificate::{
178            CertificatePair, NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2,
179            UpgradeCertificate,
180        },
181        simple_vote::{NextEpochQuorumData2, QuorumData2, UpgradeProposalData, VersionedVoteData},
182        traits::{EncodeBytes, block_contents::BlockHeader},
183        utils::EpochTransitionIndicator,
184        vid::avidm::{AvidMScheme, init_avidm_param},
185        vote::HasViewNumber,
186    };
187    use indexmap::IndexMap;
188    use staking_cli::demo::{DelegationConfig, StakingTransactions};
189    use surf_disco::Client;
190    use test_utils::reserve_tcp_port;
191    use tide_disco::error::ServerError;
192    use tokio::{spawn, time::sleep};
193    use vbs::version::Version;
194    use versions::{Upgrade, version};
195
196    use crate::{
197        RECENT_STAKE_TABLES_LIMIT, SequencerApiVersion,
198        api::{
199            Options,
200            test_helpers::{STAKE_TABLE_CAPACITY_FOR_TEST, TestNetwork, TestNetworkConfigBuilder},
201        },
202        catchup::NullStateCatchup,
203        testing::{TestConfigBuilder, staking_priv_keys},
204    };
205
206    #[async_trait]
207    pub trait TestablePersistence: SequencerPersistence + MembershipPersistence {
208        type Storage: Sync;
209
210        async fn tmp_storage() -> Self::Storage;
211        fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self>;
212
213        async fn connect(storage: &Self::Storage) -> Self {
214            Self::options(storage).create().await.unwrap()
215        }
216    }
217
218    #[rstest_reuse::template]
219    #[rstest::rstest]
220    #[case(PhantomData::<crate::persistence::sql::Persistence>)]
221    #[case(PhantomData::<crate::persistence::fs::Persistence>)]
222    #[test_log::test(tokio::test(flavor = "multi_thread"))]
223    pub fn persistence_types<P: TestablePersistence>(#[case] _p: PhantomData<P>) {}
224
225    #[derive(Clone, Debug, Default)]
226    struct EventCollector {
227        events: Arc<RwLock<Vec<Event>>>,
228    }
229
230    impl EventCollector {
231        async fn leaf_chain(&self) -> Vec<LeafInfo<SeqTypes>> {
232            self.events
233                .read()
234                .await
235                .iter()
236                .flat_map(|event| {
237                    let EventType::Decide { leaf_chain, .. } = &event.event else {
238                        panic!("expected decide event, got {event:?}");
239                    };
240                    leaf_chain.iter().cloned().rev()
241                })
242                .collect::<Vec<_>>()
243        }
244    }
245
246    #[async_trait]
247    impl EventConsumer for EventCollector {
248        async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
249            self.events.write().await.push(event.clone());
250            Ok(())
251        }
252    }
253
254    #[derive(Clone, Copy, Debug)]
255    struct FailConsumer;
256
257    #[async_trait]
258    impl EventConsumer for FailConsumer {
259        async fn handle_event(&self, _: &Event) -> anyhow::Result<()> {
260            bail!("mock error injection");
261        }
262    }
263
264    #[rstest_reuse::apply(persistence_types)]
265    pub async fn test_voted_view<P: TestablePersistence>(_p: PhantomData<P>) {
266        let tmp = P::tmp_storage().await;
267        let storage = P::connect(&tmp).await;
268
269        // Initially, there is no saved view.
270        assert_eq!(storage.load_latest_acted_view().await.unwrap(), None);
271
272        // Store a view.
273        let view1 = ViewNumber::genesis();
274        storage
275            .record_action(view1, None, HotShotAction::Vote)
276            .await
277            .unwrap();
278        assert_eq!(
279            storage.load_latest_acted_view().await.unwrap().unwrap(),
280            view1
281        );
282
283        // Store a newer view, make sure storage gets updated.
284        let view2 = view1 + 1;
285        storage
286            .record_action(view2, None, HotShotAction::Vote)
287            .await
288            .unwrap();
289        assert_eq!(
290            storage.load_latest_acted_view().await.unwrap().unwrap(),
291            view2
292        );
293
294        // Store an old view, make sure storage is unchanged.
295        storage
296            .record_action(view1, None, HotShotAction::Vote)
297            .await
298            .unwrap();
299        assert_eq!(
300            storage.load_latest_acted_view().await.unwrap().unwrap(),
301            view2
302        );
303    }
304
305    #[rstest_reuse::apply(persistence_types)]
306    pub async fn test_restart_view<P: TestablePersistence>(_p: PhantomData<P>) {
307        let tmp = P::tmp_storage().await;
308        let storage = P::connect(&tmp).await;
309
310        // Initially, there is no saved view.
311        assert_eq!(storage.load_restart_view().await.unwrap(), None);
312
313        // Store a view.
314        let view1 = ViewNumber::genesis();
315        storage
316            .record_action(view1, None, HotShotAction::Vote)
317            .await
318            .unwrap();
319        assert_eq!(
320            storage.load_restart_view().await.unwrap().unwrap(),
321            view1 + 1
322        );
323
324        // Store a newer view, make sure storage gets updated.
325        let view2 = view1 + 1;
326        storage
327            .record_action(view2, None, HotShotAction::Vote)
328            .await
329            .unwrap();
330        assert_eq!(
331            storage.load_restart_view().await.unwrap().unwrap(),
332            view2 + 1
333        );
334
335        // Store an old view, make sure storage is unchanged.
336        storage
337            .record_action(view1, None, HotShotAction::Vote)
338            .await
339            .unwrap();
340        assert_eq!(
341            storage.load_restart_view().await.unwrap().unwrap(),
342            view2 + 1
343        );
344
345        // store a higher proposed view, make sure storage is unchanged.
346        storage
347            .record_action(view2 + 1, None, HotShotAction::Propose)
348            .await
349            .unwrap();
350        assert_eq!(
351            storage.load_restart_view().await.unwrap().unwrap(),
352            view2 + 1
353        );
354
355        // store a higher timeout vote view, make sure storage is unchanged.
356        storage
357            .record_action(view2 + 1, None, HotShotAction::TimeoutVote)
358            .await
359            .unwrap();
360        assert_eq!(
361            storage.load_restart_view().await.unwrap().unwrap(),
362            view2 + 1
363        );
364    }
365
366    #[rstest_reuse::apply(persistence_types)]
367    pub async fn test_store_drb_input<P: TestablePersistence>(_p: PhantomData<P>) {
368        use hotshot_types::drb::DrbInput;
369
370        let tmp = P::tmp_storage().await;
371        let storage = P::connect(&tmp).await;
372        let difficulty_level = 10;
373
374        // Initially, there is no saved info.
375        if storage.load_drb_input(10).await.is_ok() {
376            panic!("unexpected nonempty drb_input");
377        }
378
379        let drb_input_1 = DrbInput {
380            epoch: 10,
381            iteration: 10,
382            value: [0u8; 32],
383            difficulty_level,
384        };
385
386        let drb_input_2 = DrbInput {
387            epoch: 10,
388            iteration: 20,
389            value: [0u8; 32],
390            difficulty_level,
391        };
392
393        let drb_input_3 = DrbInput {
394            epoch: 10,
395            iteration: 30,
396            value: [0u8; 32],
397            difficulty_level,
398        };
399
400        let _ = storage.store_drb_input(drb_input_1.clone()).await;
401
402        assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_1);
403
404        let _ = storage.store_drb_input(drb_input_3.clone()).await;
405
406        // check that the drb input is overwritten
407        assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
408
409        let _ = storage.store_drb_input(drb_input_2.clone()).await;
410
411        // check that the drb input is not overwritten by the older value
412        assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
413    }
414
415    #[rstest_reuse::apply(persistence_types)]
416    pub async fn test_epoch_info<P: TestablePersistence>(_p: PhantomData<P>) {
417        let tmp = P::tmp_storage().await;
418        let storage = P::connect(&tmp).await;
419
420        // Initially, there is no saved info.
421        assert_eq!(storage.load_start_epoch_info().await.unwrap(), Vec::new());
422
423        // Store a drb result.
424        storage
425            .store_drb_result(EpochNumber::new(1), [1; 32])
426            .await
427            .unwrap();
428        assert_eq!(
429            storage.load_start_epoch_info().await.unwrap(),
430            vec![InitializerEpochInfo::<SeqTypes> {
431                epoch: EpochNumber::new(1),
432                drb_result: [1; 32],
433                block_header: None,
434            }]
435        );
436
437        // Store a second DRB result
438        storage
439            .store_drb_result(EpochNumber::new(2), [3; 32])
440            .await
441            .unwrap();
442        assert_eq!(
443            storage.load_start_epoch_info().await.unwrap(),
444            vec![
445                InitializerEpochInfo::<SeqTypes> {
446                    epoch: EpochNumber::new(1),
447                    drb_result: [1; 32],
448                    block_header: None,
449                },
450                InitializerEpochInfo::<SeqTypes> {
451                    epoch: EpochNumber::new(2),
452                    drb_result: [3; 32],
453                    block_header: None,
454                }
455            ]
456        );
457
458        // Make a header
459        let instance_state = NodeState::mock();
460        let validated_state = hotshot_types::traits::ValidatedState::genesis(&instance_state).0;
461        let leaf: Leaf2 = Leaf::genesis(&validated_state, &instance_state, MOCK_UPGRADE.base)
462            .await
463            .into();
464        let header = leaf.block_header().clone();
465
466        // Test storing the header
467        storage
468            .store_epoch_root(EpochNumber::new(1), header.clone())
469            .await
470            .unwrap();
471        assert_eq!(
472            storage.load_start_epoch_info().await.unwrap(),
473            vec![
474                InitializerEpochInfo::<SeqTypes> {
475                    epoch: EpochNumber::new(1),
476                    drb_result: [1; 32],
477                    block_header: Some(header.clone()),
478                },
479                InitializerEpochInfo::<SeqTypes> {
480                    epoch: EpochNumber::new(2),
481                    drb_result: [3; 32],
482                    block_header: None,
483                }
484            ]
485        );
486
487        // Store more than the limit
488        let total_epochs = RECENT_STAKE_TABLES_LIMIT + 10;
489        for i in 0..total_epochs {
490            let epoch = EpochNumber::new(i);
491            let drb = [i as u8; 32];
492            storage
493                .store_drb_result(epoch, drb)
494                .await
495                .unwrap_or_else(|_| panic!("Failed to store DRB result for epoch {i}"));
496        }
497
498        let results = storage.load_start_epoch_info().await.unwrap();
499
500        // Check that only the most recent RECENT_STAKE_TABLES_LIMIT epochs are returned
501        assert_eq!(
502            results.len(),
503            RECENT_STAKE_TABLES_LIMIT as usize,
504            "Should return only the most recent {RECENT_STAKE_TABLES_LIMIT} epochs",
505        );
506
507        for (i, info) in results.iter().enumerate() {
508            let expected_epoch =
509                EpochNumber::new(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64);
510            let expected_drb = [(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64) as u8; 32];
511            assert_eq!(info.epoch, expected_epoch, "invalid epoch at index {i}",);
512            assert_eq!(info.drb_result, expected_drb, "invalid DRB at index {i}",);
513            assert!(info.block_header.is_none(), "Expected no block header");
514        }
515    }
516
517    fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
518        LeafInfo {
519            leaf,
520            vid_share: None,
521            state: Default::default(),
522            delta: None,
523            state_cert: None,
524        }
525    }
526
527    #[rstest_reuse::apply(persistence_types)]
528    pub async fn test_append_and_decide<P: TestablePersistence>(_p: PhantomData<P>) {
529        let tmp = P::tmp_storage().await;
530        let storage = P::connect(&tmp).await;
531
532        // Test append VID
533        assert_eq!(
534            storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
535            None
536        );
537
538        let leaf: Leaf2 = Leaf2::genesis(
539            &ValidatedState::default(),
540            &NodeState::mock(),
541            TEST_VERSIONS.test.base,
542        )
543        .await;
544        let leaf_payload = leaf.block_payload().unwrap();
545        let leaf_payload_bytes_arc = leaf_payload.encode();
546
547        let avidm_param = init_avidm_param(2).unwrap();
548        let weights = vec![1u32; 2];
549
550        let ns_table = parse_ns_table(
551            leaf_payload.byte_len().as_usize(),
552            &leaf_payload.ns_table().encode(),
553        );
554        let (payload_commitment, shares) =
555            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
556                .unwrap();
557
558        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
559        let signature = PubKey::sign(&privkey, &[]).unwrap();
560        let mut vid = AvidMDisperseShare::<SeqTypes> {
561            view_number: ViewNumber::new(0),
562            payload_commitment,
563            share: shares[0].clone(),
564            recipient_key: pubkey,
565            epoch: Some(EpochNumber::new(0)),
566            target_epoch: Some(EpochNumber::new(0)),
567            common: avidm_param,
568        };
569        let mut quorum_proposal = Proposal {
570            data: QuorumProposalWrapper::<SeqTypes> {
571                proposal: QuorumProposal2::<SeqTypes> {
572                    epoch: None,
573                    block_header: leaf.block_header().clone(),
574                    view_number: ViewNumber::genesis(),
575                    justify_qc: QuorumCertificate2::genesis(
576                        &ValidatedState::default(),
577                        &NodeState::mock(),
578                        TEST_VERSIONS.test,
579                    )
580                    .await,
581                    upgrade_certificate: None,
582                    view_change_evidence: None,
583                    next_drb_result: None,
584                    next_epoch_justify_qc: None,
585                    state_cert: None,
586                },
587            },
588            signature,
589            _pd: Default::default(),
590        };
591
592        let vid_share0 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
593
594        storage.append_vid(&vid_share0).await.unwrap();
595
596        assert_eq!(
597            storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
598            Some(vid_share0.clone())
599        );
600
601        vid.view_number = ViewNumber::new(1);
602
603        let vid_share1 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
604        storage.append_vid(&vid_share1).await.unwrap();
605
606        assert_eq!(
607            storage.load_vid_share(vid.view_number()).await.unwrap(),
608            Some(vid_share1.clone())
609        );
610
611        vid.view_number = ViewNumber::new(2);
612
613        let vid_share2 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
614        storage.append_vid(&vid_share2).await.unwrap();
615
616        assert_eq!(
617            storage.load_vid_share(vid.view_number()).await.unwrap(),
618            Some(vid_share2.clone())
619        );
620
621        vid.view_number = ViewNumber::new(3);
622
623        let vid_share3 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
624        storage.append_vid(&vid_share3).await.unwrap();
625
626        assert_eq!(
627            storage.load_vid_share(vid.view_number()).await.unwrap(),
628            Some(vid_share3.clone())
629        );
630
631        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
632            .expect("Failed to sign block payload");
633
634        let da_proposal_inner = DaProposal2::<SeqTypes> {
635            encoded_transactions: leaf_payload_bytes_arc.clone(),
636            metadata: leaf_payload.ns_table().clone(),
637            view_number: ViewNumber::new(0),
638            epoch: None,
639            epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
640        };
641
642        let da_proposal = Proposal {
643            data: da_proposal_inner,
644            signature: block_payload_signature,
645            _pd: Default::default(),
646        };
647
648        let vid_commitment = vid_commitment(
649            &leaf_payload_bytes_arc,
650            &leaf.block_header().metadata().encode(),
651            2,
652            TEST_VERSIONS.test.base,
653        );
654
655        storage
656            .append_da2(&da_proposal, vid_commitment)
657            .await
658            .unwrap();
659
660        assert_eq!(
661            storage.load_da_proposal(ViewNumber::new(0)).await.unwrap(),
662            Some(da_proposal.clone())
663        );
664
665        let mut da_proposal1 = da_proposal.clone();
666        da_proposal1.data.view_number = ViewNumber::new(1);
667        storage
668            .append_da2(&da_proposal1.clone(), vid_commitment)
669            .await
670            .unwrap();
671
672        assert_eq!(
673            storage
674                .load_da_proposal(da_proposal1.data.view_number)
675                .await
676                .unwrap(),
677            Some(da_proposal1.clone())
678        );
679
680        let mut da_proposal2 = da_proposal1.clone();
681        da_proposal2.data.view_number = ViewNumber::new(2);
682        storage
683            .append_da2(&da_proposal2.clone(), vid_commitment)
684            .await
685            .unwrap();
686
687        assert_eq!(
688            storage
689                .load_da_proposal(da_proposal2.data.view_number)
690                .await
691                .unwrap(),
692            Some(da_proposal2.clone())
693        );
694
695        let mut da_proposal3 = da_proposal2.clone();
696        da_proposal3.data.view_number = ViewNumber::new(3);
697        storage
698            .append_da2(&da_proposal3.clone(), vid_commitment)
699            .await
700            .unwrap();
701
702        assert_eq!(
703            storage
704                .load_da_proposal(da_proposal3.data.view_number)
705                .await
706                .unwrap(),
707            Some(da_proposal3.clone())
708        );
709
710        let quorum_proposal1 = quorum_proposal.clone();
711
712        storage
713            .append_quorum_proposal2(&quorum_proposal1)
714            .await
715            .unwrap();
716
717        assert_eq!(
718            storage.load_quorum_proposals().await.unwrap(),
719            BTreeMap::from_iter([(ViewNumber::genesis(), quorum_proposal1.clone())])
720        );
721
722        quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
723        let quorum_proposal2 = quorum_proposal.clone();
724        storage
725            .append_quorum_proposal2(&quorum_proposal2)
726            .await
727            .unwrap();
728
729        assert_eq!(
730            storage.load_quorum_proposals().await.unwrap(),
731            BTreeMap::from_iter([
732                (ViewNumber::genesis(), quorum_proposal1.clone()),
733                (ViewNumber::new(1), quorum_proposal2.clone())
734            ])
735        );
736
737        quorum_proposal.data.proposal.view_number = ViewNumber::new(2);
738        quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(1);
739        let quorum_proposal3 = quorum_proposal.clone();
740        storage
741            .append_quorum_proposal2(&quorum_proposal3)
742            .await
743            .unwrap();
744
745        assert_eq!(
746            storage.load_quorum_proposals().await.unwrap(),
747            BTreeMap::from_iter([
748                (ViewNumber::genesis(), quorum_proposal1.clone()),
749                (ViewNumber::new(1), quorum_proposal2.clone()),
750                (ViewNumber::new(2), quorum_proposal3.clone())
751            ])
752        );
753
754        quorum_proposal.data.proposal.view_number = ViewNumber::new(3);
755        quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(2);
756
757        // This one should stick around after GC runs.
758        let quorum_proposal4 = quorum_proposal.clone();
759        storage
760            .append_quorum_proposal2(&quorum_proposal4)
761            .await
762            .unwrap();
763
764        assert_eq!(
765            storage.load_quorum_proposals().await.unwrap(),
766            BTreeMap::from_iter([
767                (ViewNumber::genesis(), quorum_proposal1.clone()),
768                (ViewNumber::new(1), quorum_proposal2.clone()),
769                (ViewNumber::new(2), quorum_proposal3.clone()),
770                (ViewNumber::new(3), quorum_proposal4.clone())
771            ])
772        );
773
774        // Test decide and garbage collection. Pass in a leaf chain with no VID shares or payloads,
775        // so we have to fetch the missing data from storage.
776        let leaves = [
777            Leaf2::from_quorum_proposal(&quorum_proposal1.data),
778            Leaf2::from_quorum_proposal(&quorum_proposal2.data),
779            Leaf2::from_quorum_proposal(&quorum_proposal3.data),
780            Leaf2::from_quorum_proposal(&quorum_proposal4.data),
781        ];
782        let mut final_qc = leaves[3].justify_qc();
783        final_qc.view_number += 1;
784        final_qc.data.leaf_commit = Committable::commit(&leaf);
785        let qcs = [
786            leaves[1].justify_qc(),
787            leaves[2].justify_qc(),
788            leaves[3].justify_qc(),
789            final_qc,
790        ];
791
792        assert_eq!(
793            storage.load_anchor_view().await.unwrap(),
794            ViewNumber::genesis()
795        );
796
797        let consumer = EventCollector::default();
798        let leaf_chain = leaves
799            .iter()
800            .take(3)
801            .map(|leaf| leaf_info(leaf.clone()))
802            .zip(&qcs)
803            .collect::<Vec<_>>();
804        tracing::info!(?leaf_chain, "decide view 2");
805        storage
806            .append_decided_leaves(
807                ViewNumber::new(2),
808                leaf_chain
809                    .iter()
810                    .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change((*qc).clone()))),
811                None,
812                &consumer,
813            )
814            .await
815            .unwrap();
816        assert_eq!(
817            storage.load_anchor_view().await.unwrap(),
818            ViewNumber::new(2)
819        );
820
821        for i in 0..=2 {
822            assert_eq!(
823                storage.load_da_proposal(ViewNumber::new(i)).await.unwrap(),
824                None
825            );
826
827            assert_eq!(
828                storage.load_vid_share(ViewNumber::new(i)).await.unwrap(),
829                None
830            );
831        }
832
833        assert_eq!(
834            storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
835            Some(da_proposal3)
836        );
837
838        assert_eq!(
839            storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
840            Some(convert_proposal(vid_share3.clone()))
841        );
842
843        let proposals = storage.load_quorum_proposals().await.unwrap();
844        assert_eq!(
845            proposals,
846            BTreeMap::from_iter([(ViewNumber::new(3), quorum_proposal4)])
847        );
848
849        // A decide event should have been processed.
850        for (leaf, info) in leaves.iter().zip(consumer.leaf_chain().await.iter()) {
851            assert_eq!(info.leaf, *leaf);
852            let decided_vid_share = info.vid_share.as_ref().unwrap();
853            assert_eq!(decided_vid_share.view_number(), leaf.view_number());
854        }
855
856        // The decided leaf should not have been garbage collected.
857        assert_eq!(
858            storage.load_anchor_leaf().await.unwrap(),
859            Some((leaves[2].clone(), qcs[2].clone()))
860        );
861        assert_eq!(
862            storage.load_anchor_view().await.unwrap(),
863            leaves[2].view_number()
864        );
865
866        // Process a second decide event.
867        let consumer = EventCollector::default();
868        tracing::info!(leaf = ?leaves[3], qc = ?qcs[3], "decide view 3");
869        storage
870            .append_decided_leaves(
871                ViewNumber::new(3),
872                vec![(
873                    &leaf_info(leaves[3].clone()),
874                    CertificatePair::non_epoch_change(qcs[3].clone()),
875                )],
876                None,
877                &consumer,
878            )
879            .await
880            .unwrap();
881        assert_eq!(
882            storage.load_anchor_view().await.unwrap(),
883            ViewNumber::new(3)
884        );
885
886        // A decide event should have been processed.
887        let events = consumer.events.read().await;
888        assert_eq!(events.len(), 1);
889        assert_eq!(events[0].view_number, ViewNumber::new(3));
890        let EventType::Decide {
891            committing_qc,
892            leaf_chain,
893            ..
894        } = &events[0].event
895        else {
896            panic!("expected decide event, got {:?}", events[0]);
897        };
898        assert_eq!(*committing_qc.qc(), qcs[3]);
899        assert_eq!(leaf_chain.len(), 1);
900        let info = &leaf_chain[0];
901        assert_eq!(info.leaf, leaves[3]);
902
903        // The remaining data should have been GCed.
904        assert_eq!(
905            storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
906            None
907        );
908
909        assert_eq!(
910            storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
911            None
912        );
913        assert_eq!(
914            storage.load_quorum_proposals().await.unwrap(),
915            BTreeMap::new()
916        );
917    }
918
919    #[rstest_reuse::apply(persistence_types)]
920    pub async fn test_upgrade_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
921        let tmp = P::tmp_storage().await;
922        let storage = P::connect(&tmp).await;
923
924        // Test get upgrade certificate
925        assert_eq!(storage.load_upgrade_certificate().await.unwrap(), None);
926
927        let upgrade_data = UpgradeProposalData {
928            old_version: Version { major: 0, minor: 1 },
929            new_version: Version { major: 1, minor: 0 },
930            decide_by: ViewNumber::genesis(),
931            new_version_hash: Default::default(),
932            old_version_last_view: ViewNumber::genesis(),
933            new_version_first_view: ViewNumber::genesis(),
934        };
935
936        let decide_upgrade_certificate = UpgradeCertificate::<SeqTypes>::new(
937            upgrade_data.clone(),
938            upgrade_data.commit(),
939            ViewNumber::genesis(),
940            Default::default(),
941            Default::default(),
942        );
943        let res = storage
944            .store_upgrade_certificate(Some(decide_upgrade_certificate.clone()))
945            .await;
946        assert!(res.is_ok());
947
948        let res = storage.load_upgrade_certificate().await.unwrap();
949        let view_number = res.unwrap().view_number;
950        assert_eq!(view_number, ViewNumber::genesis());
951
952        let new_view_number_for_certificate = ViewNumber::new(50);
953        let mut new_upgrade_certificate = decide_upgrade_certificate.clone();
954        new_upgrade_certificate.view_number = new_view_number_for_certificate;
955
956        let res = storage
957            .store_upgrade_certificate(Some(new_upgrade_certificate.clone()))
958            .await;
959        assert!(res.is_ok());
960
961        let res = storage.load_upgrade_certificate().await.unwrap();
962        let view_number = res.unwrap().view_number;
963        assert_eq!(view_number, new_view_number_for_certificate);
964    }
965
966    #[rstest_reuse::apply(persistence_types)]
967    pub async fn test_next_epoch_quorum_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
968        let tmp = P::tmp_storage().await;
969        let storage = P::connect(&tmp).await;
970
971        //  test that next epoch qc2 does not exist
972        assert_eq!(
973            storage.load_next_epoch_quorum_certificate().await.unwrap(),
974            None
975        );
976
977        let upgrade_lock = UpgradeLock::<SeqTypes>::new(TEST_VERSIONS.test);
978
979        let genesis_view = ViewNumber::genesis();
980
981        let leaf = Leaf2::genesis(
982            &ValidatedState::default(),
983            &NodeState::default(),
984            TEST_VERSIONS.test.base,
985        )
986        .await;
987        let data: NextEpochQuorumData2<SeqTypes> = QuorumData2 {
988            leaf_commit: leaf.commit(),
989            epoch: Some(EpochNumber::new(1)),
990            block_number: Some(leaf.height()),
991        }
992        .into();
993
994        let versioned_data =
995            VersionedVoteData::new_infallible(data.clone(), genesis_view, &upgrade_lock);
996
997        let bytes: [u8; 32] = versioned_data.commit().into();
998
999        let next_epoch_qc = NextEpochQuorumCertificate2::new(
1000            data,
1001            Commitment::from_raw(bytes),
1002            genesis_view,
1003            None,
1004            PhantomData,
1005        );
1006
1007        let res = storage
1008            .store_next_epoch_quorum_certificate(next_epoch_qc.clone())
1009            .await;
1010        assert!(res.is_ok());
1011
1012        let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
1013        let view_number = res.unwrap().view_number;
1014        assert_eq!(view_number, ViewNumber::genesis());
1015
1016        let new_view_number_for_qc = ViewNumber::new(50);
1017        let mut new_qc = next_epoch_qc.clone();
1018        new_qc.view_number = new_view_number_for_qc;
1019
1020        let res = storage
1021            .store_next_epoch_quorum_certificate(new_qc.clone())
1022            .await;
1023        assert!(res.is_ok());
1024
1025        let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
1026        let view_number = res.unwrap().view_number;
1027        assert_eq!(view_number, new_view_number_for_qc);
1028    }
1029
1030    #[rstest_reuse::apply(persistence_types)]
1031    pub async fn test_decide_with_failing_event_consumer<P: TestablePersistence>(
1032        _p: PhantomData<P>,
1033    ) {
1034        let tmp = P::tmp_storage().await;
1035        let storage = P::connect(&tmp).await;
1036
1037        // Create a short blockchain.
1038        let mut chain = vec![];
1039
1040        let leaf: Leaf2 = Leaf::genesis(
1041            &ValidatedState::default(),
1042            &NodeState::mock(),
1043            MOCK_UPGRADE.base,
1044        )
1045        .await
1046        .into();
1047        let leaf_payload = leaf.block_payload().unwrap();
1048        let leaf_payload_bytes_arc = leaf_payload.encode();
1049        let avidm_param = init_avidm_param(2).unwrap();
1050        let weights = vec![1u32; 2];
1051        let ns_table = parse_ns_table(
1052            leaf_payload.byte_len().as_usize(),
1053            &leaf_payload.ns_table().encode(),
1054        );
1055        let (payload_commitment, shares) =
1056            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
1057                .unwrap();
1058
1059        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
1060        let mut vid = AvidMDisperseShare::<SeqTypes> {
1061            view_number: ViewNumber::new(0),
1062            payload_commitment,
1063            share: shares[0].clone(),
1064            recipient_key: pubkey,
1065            epoch: Some(EpochNumber::new(0)),
1066            target_epoch: Some(EpochNumber::new(0)),
1067            common: avidm_param,
1068        }
1069        .to_proposal(&privkey)
1070        .unwrap()
1071        .clone();
1072        let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1073            proposal: QuorumProposal2::<SeqTypes> {
1074                block_header: leaf.block_header().clone(),
1075                view_number: ViewNumber::genesis(),
1076                justify_qc: QuorumCertificate::genesis(
1077                    &ValidatedState::default(),
1078                    &NodeState::mock(),
1079                    TEST_VERSIONS.test,
1080                )
1081                .await
1082                .to_qc2(),
1083                upgrade_certificate: None,
1084                view_change_evidence: None,
1085                next_drb_result: None,
1086                next_epoch_justify_qc: None,
1087                epoch: None,
1088                state_cert: None,
1089            },
1090        };
1091        let mut qc = QuorumCertificate2::genesis(
1092            &ValidatedState::default(),
1093            &NodeState::mock(),
1094            TEST_VERSIONS.test,
1095        )
1096        .await;
1097
1098        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
1099            .expect("Failed to sign block payload");
1100        let mut da_proposal = Proposal {
1101            data: DaProposal2::<SeqTypes> {
1102                encoded_transactions: leaf_payload_bytes_arc.clone(),
1103                metadata: leaf_payload.ns_table().clone(),
1104                view_number: ViewNumber::new(0),
1105                epoch: Some(EpochNumber::new(0)),
1106                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1107            },
1108            signature: block_payload_signature,
1109            _pd: Default::default(),
1110        };
1111
1112        let vid_commitment = vid_commitment(
1113            &leaf_payload_bytes_arc,
1114            &leaf.block_header().metadata().encode(),
1115            2,
1116            TEST_VERSIONS.test.base,
1117        );
1118
1119        for i in 0..4 {
1120            quorum_proposal.proposal.view_number = ViewNumber::new(i);
1121            let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
1122            qc.view_number = leaf.view_number();
1123            qc.data.leaf_commit = Committable::commit(&leaf);
1124            vid.data.view_number = leaf.view_number();
1125            da_proposal.data.view_number = leaf.view_number();
1126            chain.push((leaf.clone(), qc.clone(), vid.clone(), da_proposal.clone()));
1127        }
1128
1129        // Add proposals.
1130        for (_, _, vid, da) in &chain {
1131            tracing::info!(?da, ?vid, "insert proposal");
1132            storage.append_da2(da, vid_commitment).await.unwrap();
1133            storage
1134                .append_vid(&convert_proposal(vid.clone()))
1135                .await
1136                .unwrap();
1137        }
1138
1139        // Decide 2 leaves, but fail in event processing.
1140        let leaf_chain = chain
1141            .iter()
1142            .take(2)
1143            .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1144            .collect::<Vec<_>>();
1145        tracing::info!("decide with event handling failure");
1146        storage
1147            .append_decided_leaves(
1148                ViewNumber::new(1),
1149                leaf_chain
1150                    .iter()
1151                    .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))),
1152                None,
1153                &FailConsumer,
1154            )
1155            .await
1156            .unwrap();
1157        // No garbage collection should have run.
1158        for i in 0..4 {
1159            tracing::info!(i, "check proposal availability");
1160            assert!(
1161                storage
1162                    .load_vid_share(ViewNumber::new(i))
1163                    .await
1164                    .unwrap()
1165                    .is_some()
1166            );
1167            assert!(
1168                storage
1169                    .load_da_proposal(ViewNumber::new(i))
1170                    .await
1171                    .unwrap()
1172                    .is_some()
1173            );
1174        }
1175        tracing::info!("check anchor leaf updated");
1176        assert_eq!(
1177            storage
1178                .load_anchor_leaf()
1179                .await
1180                .unwrap()
1181                .unwrap()
1182                .0
1183                .view_number(),
1184            ViewNumber::new(1)
1185        );
1186        assert_eq!(
1187            storage.load_anchor_view().await.unwrap(),
1188            ViewNumber::new(1)
1189        );
1190
1191        // Now decide remaining leaves successfully. We should now garbage collect and process a
1192        // decide event for all the leaves.
1193        let consumer = EventCollector::default();
1194        let leaf_chain = chain
1195            .iter()
1196            .skip(2)
1197            .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1198            .collect::<Vec<_>>();
1199        tracing::info!("decide successfully");
1200        storage
1201            .append_decided_leaves(
1202                ViewNumber::new(3),
1203                leaf_chain
1204                    .iter()
1205                    .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))),
1206                None,
1207                &consumer,
1208            )
1209            .await
1210            .unwrap();
1211        // Garbage collection should have run.
1212        for i in 0..4 {
1213            tracing::info!(i, "check proposal garbage collected");
1214            assert!(
1215                storage
1216                    .load_vid_share(ViewNumber::new(i))
1217                    .await
1218                    .unwrap()
1219                    .is_none()
1220            );
1221            assert!(
1222                storage
1223                    .load_da_proposal(ViewNumber::new(i))
1224                    .await
1225                    .unwrap()
1226                    .is_none()
1227            );
1228        }
1229        tracing::info!("check anchor leaf updated");
1230        assert_eq!(
1231            storage
1232                .load_anchor_leaf()
1233                .await
1234                .unwrap()
1235                .unwrap()
1236                .0
1237                .view_number(),
1238            ViewNumber::new(3)
1239        );
1240        assert_eq!(
1241            storage.load_anchor_view().await.unwrap(),
1242            ViewNumber::new(3)
1243        );
1244
1245        // Check decide event.
1246        tracing::info!("check decide event");
1247        let leaf_chain = consumer.leaf_chain().await;
1248        assert_eq!(leaf_chain.len(), 4, "{leaf_chain:#?}");
1249        for ((leaf, ..), info) in chain.iter().zip(leaf_chain.iter()) {
1250            assert_eq!(info.leaf, *leaf);
1251            let decided_vid_share = info.vid_share.as_ref().unwrap();
1252            assert_eq!(decided_vid_share.view_number(), leaf.view_number());
1253            assert!(info.leaf.block_payload().is_some());
1254        }
1255    }
1256
1257    #[rstest_reuse::apply(persistence_types)]
1258    pub async fn test_pruning<P: TestablePersistence>(_p: PhantomData<P>) {
1259        let tmp = P::tmp_storage().await;
1260
1261        let mut options = P::options(&tmp);
1262        options.set_view_retention(1);
1263        let storage = options.create().await.unwrap();
1264
1265        // Add some "old" data, from view 0.
1266        let leaf = Leaf::genesis(
1267            &ValidatedState::default(),
1268            &NodeState::mock(),
1269            MOCK_UPGRADE.base,
1270        )
1271        .await;
1272        let leaf_payload = leaf.block_payload().unwrap();
1273        let leaf_payload_bytes_arc = leaf_payload.encode();
1274        let avidm_param = init_avidm_param(2).unwrap();
1275        let weights = vec![1u32; 2];
1276
1277        let ns_table = parse_ns_table(
1278            leaf_payload.byte_len().as_usize(),
1279            &leaf_payload.ns_table().encode(),
1280        );
1281        let (payload_commitment, shares) =
1282            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
1283                .unwrap();
1284
1285        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
1286        let vid_share = convert_proposal(
1287            AvidMDisperseShare::<SeqTypes> {
1288                view_number: ViewNumber::new(0),
1289                payload_commitment,
1290                share: shares[0].clone(),
1291                recipient_key: pubkey,
1292                epoch: None,
1293                target_epoch: None,
1294                common: avidm_param,
1295            }
1296            .to_proposal(&privkey)
1297            .unwrap()
1298            .clone(),
1299        );
1300
1301        let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1302            proposal: QuorumProposal2::<SeqTypes> {
1303                block_header: leaf.block_header().clone(),
1304                view_number: ViewNumber::genesis(),
1305                justify_qc: QuorumCertificate::genesis(
1306                    &ValidatedState::default(),
1307                    &NodeState::mock(),
1308                    TEST_VERSIONS.test,
1309                )
1310                .await
1311                .to_qc2(),
1312                upgrade_certificate: None,
1313                view_change_evidence: None,
1314                next_drb_result: None,
1315                next_epoch_justify_qc: None,
1316                epoch: None,
1317                state_cert: None,
1318            },
1319        };
1320        let quorum_proposal_signature =
1321            BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
1322                .expect("Failed to sign quorum proposal");
1323        let quorum_proposal = Proposal {
1324            data: quorum_proposal,
1325            signature: quorum_proposal_signature,
1326            _pd: Default::default(),
1327        };
1328
1329        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
1330            .expect("Failed to sign block payload");
1331        let da_proposal = Proposal {
1332            data: DaProposal2::<SeqTypes> {
1333                encoded_transactions: leaf_payload_bytes_arc,
1334                metadata: leaf_payload.ns_table().clone(),
1335                view_number: ViewNumber::new(0),
1336                epoch: None,
1337                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1338            },
1339            signature: block_payload_signature,
1340            _pd: Default::default(),
1341        };
1342
1343        storage
1344            .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
1345            .await
1346            .unwrap();
1347        storage.append_vid(&vid_share).await.unwrap();
1348        storage
1349            .append_quorum_proposal2(&quorum_proposal)
1350            .await
1351            .unwrap();
1352
1353        // Decide a newer view, view 1.
1354        storage
1355            .append_decided_leaves(ViewNumber::new(1), [], None, &NullEventConsumer)
1356            .await
1357            .unwrap();
1358
1359        // The old data is not more than the retention period (1 view) old, so it should not be
1360        // GCed.
1361        assert_eq!(
1362            storage
1363                .load_da_proposal(ViewNumber::new(0))
1364                .await
1365                .unwrap()
1366                .unwrap(),
1367            da_proposal
1368        );
1369        assert_eq!(
1370            storage
1371                .load_vid_share(ViewNumber::new(0))
1372                .await
1373                .unwrap()
1374                .unwrap(),
1375            vid_share
1376        );
1377        assert_eq!(
1378            storage
1379                .load_quorum_proposal(ViewNumber::new(0))
1380                .await
1381                .unwrap(),
1382            quorum_proposal
1383        );
1384
1385        // Decide an even newer view, triggering GC of the old data.
1386        storage
1387            .append_decided_leaves(ViewNumber::new(2), [], None, &NullEventConsumer)
1388            .await
1389            .unwrap();
1390        assert!(
1391            storage
1392                .load_da_proposal(ViewNumber::new(0))
1393                .await
1394                .unwrap()
1395                .is_none()
1396        );
1397        assert!(
1398            storage
1399                .load_vid_share(ViewNumber::new(0))
1400                .await
1401                .unwrap()
1402                .is_none()
1403        );
1404        assert!(
1405            storage
1406                .load_quorum_proposal(ViewNumber::new(0))
1407                .await
1408                .is_err()
1409        );
1410    }
1411
1412    async fn assert_events_eq<P: TestablePersistence>(
1413        persistence: &P,
1414        block: u64,
1415        stake_table_fetcher: &Fetcher,
1416        l1_client: &L1Client,
1417        stake_table_contract: Address,
1418    ) -> anyhow::Result<()> {
1419        // Load persisted events
1420        let (stored_l1, events) = persistence.load_events(0, block).await?;
1421        assert!(!events.is_empty());
1422        assert!(stored_l1.is_some());
1423        assert!(events.iter().all(|((l1_block, _), _)| *l1_block <= block));
1424        // Fetch events directly from the contract and compare with persisted data
1425        let contract_events = Fetcher::fetch_events_from_contract(
1426            l1_client.clone(),
1427            stake_table_contract,
1428            None,
1429            block,
1430        )
1431        .await?;
1432        assert_eq!(
1433            contract_events, events,
1434            "Events from contract and persistence do not match"
1435        );
1436
1437        // Fetch events from stake table fetcher and compare with persisted data
1438        let fetched_events = stake_table_fetcher
1439            .fetch_and_store_stake_table_events(stake_table_contract, block)
1440            .await?;
1441        assert_eq!(fetched_events, events);
1442
1443        Ok(())
1444    }
1445
1446    // test for validating stake table event fetching from persistence,
1447    // ensuring that persisted data matches the on-chain events and that event fetcher work correctly.
1448    #[rstest_reuse::apply(persistence_types)]
1449    pub async fn test_stake_table_fetching_from_persistence<P: TestablePersistence>(
1450        #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
1451        stake_table_version: StakeTableContractVersion,
1452        _p: PhantomData<P>,
1453    ) -> anyhow::Result<()> {
1454        let epoch_height = 20;
1455
1456        let network_config = TestConfigBuilder::default()
1457            .epoch_height(epoch_height)
1458            .build();
1459
1460        let anvil_provider = network_config.anvil().unwrap();
1461
1462        let query_service_port =
1463            reserve_tcp_port().expect("OS should have ephemeral ports available");
1464        let query_api_options = Options::with_port(query_service_port);
1465
1466        const NODE_COUNT: usize = 2;
1467
1468        let storage = join_all((0..NODE_COUNT).map(|_| P::tmp_storage())).await;
1469        let persistence_options: [_; NODE_COUNT] = storage
1470            .iter()
1471            .map(P::options)
1472            .collect::<Vec<_>>()
1473            .try_into()
1474            .unwrap();
1475
1476        let persistence = persistence_options[0].clone().create().await.unwrap();
1477
1478        // Build the config with PoS hook
1479        let l1_url = network_config.l1_url();
1480
1481        let upgrade = Upgrade::trivial(version(0, 3));
1482
1483        let testnet_config = TestNetworkConfigBuilder::with_num_nodes()
1484            .api_config(query_api_options)
1485            .network_config(network_config.clone())
1486            .persistences(persistence_options.clone())
1487            .pos_hook(
1488                DelegationConfig::MultipleDelegators,
1489                stake_table_version,
1490                upgrade,
1491            )
1492            .await
1493            .expect("Pos deployment failed")
1494            .build();
1495
1496        //start the network
1497        let test_network = TestNetwork::new(testnet_config, upgrade).await;
1498
1499        let client: Client<ServerError, SequencerApiVersion> = Client::new(
1500            format!("http://localhost:{query_service_port}")
1501                .parse()
1502                .unwrap(),
1503        );
1504        client.connect(None).await;
1505        tracing::info!(query_service_port, "server running");
1506
1507        // wait until we enter in epoch 3
1508        let _initial_blocks = client
1509            .socket("availability/stream/blocks/0")
1510            .subscribe::<BlockQueryData<SeqTypes>>()
1511            .await
1512            .unwrap()
1513            .take(40)
1514            .try_collect::<Vec<_>>()
1515            .await
1516            .unwrap();
1517        // Load initial persisted events and validate they exist.
1518        let membership_coordinator = test_network
1519            .server
1520            .consensus()
1521            .read()
1522            .await
1523            .membership_coordinator
1524            .clone();
1525
1526        let l1_client = L1Client::new(vec![l1_url]).unwrap();
1527        let node_state = test_network.server.node_state();
1528        let chain_config = node_state.chain_config;
1529        let stake_table_contract = chain_config.stake_table_contract.unwrap();
1530
1531        let current_membership = membership_coordinator.membership();
1532        {
1533            let membership_state = current_membership.read().await;
1534            let stake_table_fetcher = membership_state.fetcher();
1535
1536            let block1 = anvil_provider
1537                .get_block_number()
1538                .await
1539                .expect("latest l1 block");
1540
1541            assert_events_eq(
1542                &persistence,
1543                block1,
1544                stake_table_fetcher,
1545                &l1_client,
1546                stake_table_contract,
1547            )
1548            .await?;
1549        }
1550        let _epoch_4_blocks = client
1551            .socket("availability/stream/blocks/0")
1552            .subscribe::<BlockQueryData<SeqTypes>>()
1553            .await
1554            .unwrap()
1555            .take(65)
1556            .try_collect::<Vec<_>>()
1557            .await
1558            .unwrap();
1559        let block2 = anvil_provider
1560            .get_block_number()
1561            .await
1562            .expect("latest l1 block");
1563
1564        {
1565            let membership_state = current_membership.read().await;
1566            let stake_table_fetcher = membership_state.fetcher();
1567
1568            assert_events_eq(
1569                &persistence,
1570                block2,
1571                stake_table_fetcher,
1572                &l1_client,
1573                stake_table_contract,
1574            )
1575            .await?;
1576        }
1577        Ok(())
1578    }
1579
1580    #[rstest_reuse::apply(persistence_types)]
1581    pub async fn test_stake_table_background_fetching<P: TestablePersistence>(
1582        #[values(StakeTableContractVersion::V1, StakeTableContractVersion::V2)]
1583        stake_table_version: StakeTableContractVersion,
1584        _p: PhantomData<P>,
1585    ) -> anyhow::Result<()> {
1586        use espresso_types::v0_3::ChainConfig;
1587        use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
1588
1589        let blocks_per_epoch = 10;
1590
1591        let network_config = TestConfigBuilder::<1>::default()
1592            .epoch_height(blocks_per_epoch)
1593            .build();
1594
1595        let anvil_provider = network_config.anvil().unwrap();
1596
1597        let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1598            &network_config.hotshot_config().hotshot_stake_table(),
1599            STAKE_TABLE_CAPACITY_FOR_TEST,
1600        )
1601        .unwrap();
1602
1603        let (_, priv_keys): (Vec<_>, Vec<_>) = (0..20)
1604            .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed([1; 32], i as u64))
1605            .unzip();
1606        let state_key_pairs = (0..20)
1607            .map(|i| StateKeyPair::generate_from_seed_indexed([2; 32], i as u64))
1608            .collect::<Vec<_>>();
1609
1610        let validators = staking_priv_keys(&priv_keys, &state_key_pairs, 20);
1611
1612        let deployer = ProviderBuilder::new()
1613            .wallet(EthereumWallet::from(network_config.signer().clone()))
1614            .connect_http(network_config.l1_url().clone());
1615
1616        let mut contracts = Contracts::new();
1617        let args = DeployerArgsBuilder::default()
1618            .deployer(deployer.clone())
1619            .rpc_url(network_config.l1_url().clone())
1620            .mock_light_client(true)
1621            .genesis_lc_state(genesis_state)
1622            .genesis_st_state(genesis_stake)
1623            .blocks_per_epoch(blocks_per_epoch)
1624            .epoch_start_block(1)
1625            .exit_escrow_period(U256::from(max(
1626                blocks_per_epoch * 15 + 100,
1627                DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1628            )))
1629            .multisig_pauser(network_config.signer().address())
1630            .token_name("Espresso".to_string())
1631            .token_symbol("ESP".to_string())
1632            .initial_token_supply(U256::from(3590000000u64))
1633            .ops_timelock_delay(U256::from(0))
1634            .ops_timelock_admin(network_config.signer().address())
1635            .ops_timelock_proposers(vec![network_config.signer().address()])
1636            .ops_timelock_executors(vec![network_config.signer().address()])
1637            .safe_exit_timelock_delay(U256::from(10))
1638            .safe_exit_timelock_admin(network_config.signer().address())
1639            .safe_exit_timelock_proposers(vec![network_config.signer().address()])
1640            .safe_exit_timelock_executors(vec![network_config.signer().address()])
1641            .build()
1642            .unwrap();
1643
1644        match stake_table_version {
1645            StakeTableContractVersion::V1 => args.deploy_to_stake_table_v1(&mut contracts).await,
1646            StakeTableContractVersion::V2 => args.deploy_all(&mut contracts).await,
1647        }
1648        .expect("contracts deployed");
1649
1650        let st_addr = contracts
1651            .address(Contract::StakeTableProxy)
1652            .expect("StakeTableProxy deployed");
1653        let l1_url = network_config.l1_url().clone();
1654
1655        let mut planned_txns = StakingTransactions::create(
1656            l1_url.clone(),
1657            &deployer,
1658            st_addr,
1659            validators,
1660            None,
1661            DelegationConfig::MultipleDelegators,
1662        )
1663        .await
1664        .expect("stake table setup failed");
1665
1666        planned_txns
1667            .apply_prerequisites()
1668            .await
1669            .expect("prerequisites failed");
1670
1671        // Ensure we have at least one stake table affecting transaction
1672        planned_txns.apply_one().await.expect("send tx failed");
1673
1674        // new block every 1s
1675        anvil_provider
1676            .anvil_set_interval_mining(1)
1677            .await
1678            .expect("interval mining");
1679
1680        // spawn a separate task
1681        // this is going to keep registering validators and multiple delegators
1682        // the interval mining is set to 1s so each transaction finalization would take atleast 1s
1683        spawn({
1684            async move {
1685                {
1686                    while let Some(receipt) =
1687                        planned_txns.apply_one().await.expect("send tx failed")
1688                    {
1689                        tracing::debug!(?receipt, "transaction finalized");
1690                    }
1691                }
1692            }
1693        });
1694
1695        let storage = P::tmp_storage().await;
1696        let persistence = P::options(&storage).create().await.unwrap();
1697
1698        let l1_client = L1ClientOptions {
1699            stake_table_update_interval: Duration::from_secs(7),
1700            l1_retry_delay: Duration::from_millis(10),
1701            l1_events_max_block_range: 10000,
1702            ..Default::default()
1703        }
1704        .connect(vec![l1_url])
1705        .unwrap();
1706        l1_client.spawn_tasks().await;
1707
1708        let fetcher = Fetcher::new(
1709            Arc::new(NullStateCatchup::default()),
1710            Arc::new(Mutex::new(persistence.clone())),
1711            l1_client.clone(),
1712            ChainConfig {
1713                stake_table_contract: Some(st_addr),
1714                base_fee: 0.into(),
1715                ..Default::default()
1716            },
1717        );
1718
1719        // sleep so that we have enough events
1720        sleep(Duration::from_secs(20)).await;
1721
1722        fetcher.spawn_update_loop().await;
1723        let mut prev_l1_block = 0;
1724        let mut prev_events_len = 0;
1725        for _i in 0..10 {
1726            // Wait for more than update interval to assert that persistence was updated
1727            // L1 update interval is 7s in this test
1728
1729            tokio::time::sleep(std::time::Duration::from_secs(8)).await;
1730
1731            let block = anvil_provider
1732                .get_block_number()
1733                .await
1734                .expect("latest l1 block");
1735
1736            let (read_offset, persisted_events) = persistence.load_events(0, block).await?;
1737            let read_offset = read_offset.unwrap();
1738            let l1_block = match read_offset {
1739                EventsPersistenceRead::Complete => block,
1740                EventsPersistenceRead::UntilL1Block(block) => block,
1741            };
1742
1743            tracing::info!("{l1_block:?}, persistence events = {persisted_events:?}.");
1744            assert!(persisted_events.len() > prev_events_len);
1745
1746            assert!(l1_block > prev_l1_block, "events not updated");
1747
1748            let contract_events =
1749                Fetcher::fetch_events_from_contract(l1_client.clone(), st_addr, None, l1_block)
1750                    .await?;
1751            assert_eq!(persisted_events, contract_events);
1752
1753            prev_l1_block = l1_block;
1754            prev_events_len = persisted_events.len();
1755        }
1756
1757        Ok(())
1758    }
1759
1760    #[rstest_reuse::apply(persistence_types)]
1761    pub async fn test_membership_persistence<P: TestablePersistence>(
1762        _p: PhantomData<P>,
1763    ) -> anyhow::Result<()> {
1764        let tmp = P::tmp_storage().await;
1765        let mut opt = P::options(&tmp);
1766
1767        let storage = opt.create().await.unwrap();
1768
1769        let validator = AuthenticatedValidator::mock();
1770        let mut st = IndexMap::new();
1771        st.insert(validator.account, validator);
1772
1773        storage
1774            .store_stake(EpochNumber::new(10), st.clone(), None, None)
1775            .await?;
1776
1777        let (table, ..) = storage.load_stake(EpochNumber::new(10)).await?.unwrap();
1778        assert_eq!(st, table);
1779
1780        let val2 = AuthenticatedValidator::mock();
1781        let mut st2 = IndexMap::new();
1782        st2.insert(val2.account, val2);
1783        storage
1784            .store_stake(EpochNumber::new(11), st2.clone(), None, None)
1785            .await?;
1786
1787        let tables = storage.load_latest_stake(4).await?.unwrap();
1788        let mut iter = tables.iter();
1789        assert_eq!(
1790            Some(&(EpochNumber::new(11), (st2.clone(), None), None)),
1791            iter.next()
1792        );
1793        assert_eq!(Some(&(EpochNumber::new(10), (st, None), None)), iter.next());
1794        assert_eq!(None, iter.next());
1795
1796        for i in 0..=20 {
1797            storage
1798                .store_stake(EpochNumber::new(i), st2.clone(), None, None)
1799                .await?;
1800        }
1801
1802        let tables = storage.load_latest_stake(5).await?.unwrap();
1803        let mut iter = tables.iter();
1804        assert_eq!(
1805            Some(&(EpochNumber::new(20), (st2.clone(), None), None)),
1806            iter.next()
1807        );
1808        assert_eq!(
1809            Some(&(EpochNumber::new(19), (st2.clone(), None), None)),
1810            iter.next()
1811        );
1812        assert_eq!(
1813            Some(&(EpochNumber::new(18), (st2.clone(), None), None)),
1814            iter.next()
1815        );
1816        assert_eq!(
1817            Some(&(EpochNumber::new(17), (st2.clone(), None), None)),
1818            iter.next()
1819        );
1820        assert_eq!(
1821            Some(&(EpochNumber::new(16), (st2, None), None)),
1822            iter.next()
1823        );
1824        assert_eq!(None, iter.next());
1825
1826        Ok(())
1827    }
1828
1829    #[rstest_reuse::apply(persistence_types)]
1830    pub async fn test_delete_stake_tables<P: TestablePersistence>(
1831        _p: PhantomData<P>,
1832    ) -> anyhow::Result<()> {
1833        let tmp = P::tmp_storage().await;
1834        let mut opt = P::options(&tmp);
1835        let storage = opt.create().await.unwrap();
1836
1837        let event1 = StakeTableEvent::Delegate(Delegated {
1838            delegator: Address::ZERO,
1839            validator: Address::ZERO,
1840            amount: U256::from(100),
1841        });
1842        let event2 = StakeTableEvent::Delegate(Delegated {
1843            delegator: Address::ZERO,
1844            validator: Address::ZERO,
1845            amount: U256::from(200),
1846        });
1847
1848        let l1_block = 42u64;
1849        let events: Vec<(EventKey, StakeTableEvent)> =
1850            vec![((l1_block, 0), event1), ((l1_block, 1), event2)];
1851
1852        storage.store_events(l1_block, events.clone()).await?;
1853
1854        let (read_offset, loaded_events) = storage.load_events(0_u64, l1_block).await?;
1855        assert!(read_offset.is_some());
1856        assert_eq!(loaded_events.len(), 2);
1857        assert_eq!(loaded_events, events);
1858
1859        // Store some validators
1860        let v = RegisteredValidator::mock();
1861        let mut vmap = IndexMap::new();
1862        vmap.insert(v.account, v);
1863        storage
1864            .store_all_validators(EpochNumber::new(1), vmap.clone())
1865            .await?;
1866
1867        let loaded = storage
1868            .load_all_validators(EpochNumber::new(1), 0, 10)
1869            .await?;
1870        assert_eq!(loaded.len(), 1);
1871
1872        storage.delete_stake_tables().await?;
1873
1874        // Events cleared
1875        let (read_offset, loaded_events) = storage.load_events(0_u64, l1_block).await?;
1876        assert!(read_offset.is_none());
1877        assert!(loaded_events.is_empty());
1878
1879        // Validators cleared
1880        let loaded = storage
1881            .load_all_validators(EpochNumber::new(1), 0, 10)
1882            .await
1883            .unwrap_or_default();
1884        assert!(loaded.is_empty());
1885
1886        let event3 = StakeTableEvent::Delegate(Delegated {
1887            delegator: Address::ZERO,
1888            validator: Address::ZERO,
1889            amount: U256::from(300),
1890        });
1891        let new_events: Vec<(EventKey, StakeTableEvent)> = vec![((l1_block, 0), event3)];
1892        storage.store_events(l1_block, new_events.clone()).await?;
1893
1894        let (read_offset, loaded_events) = storage.load_events(0_u64, l1_block).await?;
1895        assert!(read_offset.is_some());
1896        assert_eq!(loaded_events.len(), 1);
1897        assert_eq!(loaded_events, new_events);
1898
1899        Ok(())
1900    }
1901
1902    #[rstest_reuse::apply(persistence_types)]
1903    pub async fn test_store_and_load_all_validators<P: TestablePersistence>(
1904        _p: PhantomData<P>,
1905    ) -> anyhow::Result<()> {
1906        let tmp = P::tmp_storage().await;
1907        let mut opt = P::options(&tmp);
1908        let storage = opt.create().await.unwrap();
1909
1910        let mut vmap1 = IndexMap::new();
1911        for _i in 0..25 {
1912            let v = RegisteredValidator::mock();
1913            vmap1.insert(v.account, v);
1914        }
1915        storage
1916            .store_all_validators(EpochNumber::new(10), vmap1.clone())
1917            .await?;
1918
1919        let mut expected_all: Vec<_> = vmap1.clone().into_values().collect();
1920        expected_all.sort_by_key(|v| v.account);
1921
1922        // Load all
1923        let loaded_all = storage
1924            .load_all_validators(EpochNumber::new(10), 0, 100)
1925            .await?;
1926        // SQLite returns a different ordered list even though there is an `ORDER BY address ASC` clause
1927        assert_eq!(expected_all, loaded_all);
1928
1929        // Load first 10
1930        let loaded_first_10 = storage
1931            .load_all_validators(EpochNumber::new(10), 0, 10)
1932            .await?;
1933
1934        assert_eq!(expected_all[..10], loaded_first_10);
1935
1936        // Load next 10
1937        let loaded_next_10 = storage
1938            .load_all_validators(EpochNumber::new(10), 10, 10)
1939            .await?;
1940
1941        assert_eq!(expected_all[10..20], loaded_next_10);
1942
1943        // Load remaining 5
1944        let loaded_last_5 = storage
1945            .load_all_validators(EpochNumber::new(10), 20, 10)
1946            .await?;
1947
1948        assert_eq!(expected_all[20..], loaded_last_5);
1949
1950        // offset beyond size should return empty
1951        let loaded_empty = storage
1952            .load_all_validators(EpochNumber::new(10), 100, 10)
1953            .await?;
1954        assert!(loaded_empty.is_empty());
1955
1956        // epoch 11
1957        let validator2 = RegisteredValidator::mock();
1958        let mut vmap2 = IndexMap::new();
1959        vmap2.insert(validator2.account, validator2.clone());
1960
1961        storage
1962            .store_all_validators(EpochNumber::new(11), vmap2.clone())
1963            .await?;
1964
1965        let mut expected_epoch11: Vec<_> = vmap2.clone().into_values().collect();
1966        expected_epoch11.sort_by_key(|v| v.account);
1967
1968        let loaded2 = storage
1969            .load_all_validators(EpochNumber::new(11), 0, 100)
1970            .await?;
1971
1972        assert_eq!(expected_epoch11, loaded2);
1973
1974        // Epoch 10 still there
1975        let loaded1_again = storage
1976            .load_all_validators(EpochNumber::new(10), 0, 100)
1977            .await?;
1978
1979        assert_eq!(expected_all, loaded1_again);
1980
1981        Ok(())
1982    }
1983
1984    #[rstest_reuse::apply(persistence_types)]
1985    pub async fn test_non_consecutive_decide<P: TestablePersistence>(_p: PhantomData<P>) {
1986        let tmp = P::tmp_storage().await;
1987        let storage = P::connect(&tmp).await;
1988
1989        let genesis_leaf: Leaf2 = Leaf2::genesis(
1990            &ValidatedState::default(),
1991            &NodeState::mock(),
1992            TEST_VERSIONS.test.base,
1993        )
1994        .await;
1995        let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1996            proposal: QuorumProposal2::<SeqTypes> {
1997                epoch: None,
1998                block_header: genesis_leaf.block_header().clone(),
1999                view_number: genesis_leaf.view_number(),
2000                justify_qc: QuorumCertificate2::genesis(
2001                    &ValidatedState::default(),
2002                    &NodeState::mock(),
2003                    TEST_VERSIONS.test,
2004                )
2005                .await,
2006                upgrade_certificate: None,
2007                view_change_evidence: None,
2008                next_drb_result: None,
2009                next_epoch_justify_qc: None,
2010                state_cert: None,
2011            },
2012        };
2013
2014        let leaf0 = Leaf2::from_quorum_proposal(&quorum_proposal);
2015
2016        quorum_proposal.proposal.view_number = ViewNumber::new(2);
2017        *quorum_proposal.proposal.block_header.height_mut() = 2;
2018        quorum_proposal.proposal.justify_qc.view_number = ViewNumber::new(1);
2019        let leaf2 = Leaf2::from_quorum_proposal(&quorum_proposal);
2020
2021        let mut qc0 = leaf0.justify_qc();
2022        qc0.data.leaf_commit = Committable::commit(&leaf0);
2023
2024        let mut qc2 = leaf2.justify_qc();
2025        qc2.view_number += 1;
2026        qc2.data.leaf_commit = Committable::commit(&leaf2);
2027
2028        let mut deciding_qc = qc2.clone();
2029        deciding_qc.view_number += 1;
2030
2031        // Decide the first leaf, but fail to generate a decide event.
2032        storage
2033            .append_decided_leaves(
2034                ViewNumber::new(0),
2035                [(
2036                    &leaf_info(leaf0.clone()),
2037                    CertificatePair::non_epoch_change(qc0),
2038                )],
2039                None,
2040                &FailConsumer,
2041            )
2042            .await
2043            .unwrap();
2044
2045        // Later, decide a new leaf, but skipping some leaf in the middle. This should generate
2046        // decide events for both the leaves, correctly separating into two events since the leaves
2047        // are non-consecutive, and correctly applying `deciding_qc` only to the last event.
2048        let consumer = EventCollector::default();
2049        storage
2050            .append_decided_leaves(
2051                ViewNumber::new(2),
2052                [(
2053                    &leaf_info(leaf2.clone()),
2054                    CertificatePair::non_epoch_change(qc2),
2055                )],
2056                Some(Arc::new(CertificatePair::non_epoch_change(
2057                    deciding_qc.clone(),
2058                ))),
2059                &consumer,
2060            )
2061            .await
2062            .unwrap();
2063
2064        let events = consumer.events.read().await;
2065        assert_eq!(events.len(), 2);
2066
2067        let EventType::Decide {
2068            leaf_chain: leaf_chain0,
2069            deciding_qc: deciding_qc0,
2070            ..
2071        } = &events[0].event
2072        else {
2073            panic!("expected decide event, got {:?}", events[0].event);
2074        };
2075        assert_eq!(leaf_chain0.len(), 1);
2076        assert_eq!(leaf_chain0[0].leaf, leaf0);
2077        assert_eq!(*deciding_qc0, None);
2078
2079        let EventType::Decide {
2080            leaf_chain: leaf_chain2,
2081            deciding_qc: deciding_qc2,
2082            ..
2083        } = &events[1].event
2084        else {
2085            panic!("expected decide event, got {:?}", events[1].event);
2086        };
2087        assert_eq!(leaf_chain2.len(), 1);
2088        assert_eq!(leaf_chain2[0].leaf, leaf2);
2089        assert_eq!(deciding_qc2.as_ref().unwrap().qc(), &deciding_qc);
2090    }
2091}