Skip to main content

espresso_node/
persistence.rs

1//! Sequencer node persistence.
2//!
3//! This module implements the persistence required for a sequencer node to rejoin the network and
4//! resume participating in consensus, in the event that its process crashes or is killed and loses
5//! all in-memory state.
6//!
7//! This is distinct from the query service persistent storage found in the `api` module, which is
8//! an extension that node operators can opt into. This module defines the minimum level of
9//! persistence which is _required_ to run a node.
10
11use std::collections::HashMap;
12
13use alloy::primitives::{Address, U256};
14use anyhow::Context;
15use async_trait::async_trait;
16use espresso_types::{
17    PubKey,
18    v0_3::{ChainConfig, RegisteredValidator},
19};
20
21pub mod fs;
22pub mod no_storage;
23mod persistence_metrics;
24pub mod sql;
25
26/// RegisteredValidator without x25519_key/p2p_addr fields.
27/// Used for migrating data written before x25519 support was added.
28#[derive(serde::Serialize, serde::Deserialize)]
29pub(crate) struct RegisteredValidatorNoX25519 {
30    pub account: Address,
31    pub stake_table_key: PubKey,
32    pub state_ver_key: hotshot_types::light_client::StateVerKey,
33    pub stake: U256,
34    pub commission: u16,
35    pub delegators: HashMap<Address, U256>,
36    pub authenticated: bool,
37}
38
39impl RegisteredValidatorNoX25519 {
40    pub fn migrate(self) -> RegisteredValidator<PubKey> {
41        RegisteredValidator {
42            account: self.account,
43            stake_table_key: self.stake_table_key,
44            state_ver_key: self.state_ver_key,
45            stake: self.stake,
46            commission: self.commission,
47            delegators: self.delegators,
48            authenticated: self.authenticated,
49            x25519_key: None,
50            p2p_addr: None,
51        }
52    }
53}
54
55/// Update a `NetworkConfig` that may have originally been persisted with an old version.
56fn migrate_network_config(
57    mut network_config: serde_json::Value,
58) -> anyhow::Result<serde_json::Value> {
59    let config = network_config
60        .get_mut("config")
61        .context("missing field `config`")?
62        .as_object_mut()
63        .context("`config` must be an object")?;
64
65    if !config.contains_key("builder_urls") {
66        // When multi-builder support was added, the configuration field `builder_url: Url` was
67        // replaced by an array `builder_urls: Vec<Url>`. If the saved config has no `builder_urls`
68        // field, it is older than this change. Populate `builder_urls` with a singleton array
69        // formed from the old value of `builder_url`, and delete the no longer used `builder_url`.
70        let url = config
71            .remove("builder_url")
72            .context("missing field `builder_url`")?;
73        config.insert("builder_urls".into(), vec![url].into());
74    }
75
76    // HotShotConfig was upgraded to include parameters for proposing and voting on upgrades.
77    // Configs which were persisted before this upgrade may be missing these parameters. This
78    // migration initializes them with a default. By default, we use JS MAX_SAFE_INTEGER for the
79    // start parameters so that nodes will never do an upgrade, unless explicitly configured
80    // otherwise.
81    if !config.contains_key("start_proposing_view") {
82        config.insert("start_proposing_view".into(), 9007199254740991u64.into());
83    }
84    if !config.contains_key("stop_proposing_view") {
85        config.insert("stop_proposing_view".into(), 0.into());
86    }
87    if !config.contains_key("start_voting_view") {
88        config.insert("start_voting_view".into(), 9007199254740991u64.into());
89    }
90    if !config.contains_key("stop_voting_view") {
91        config.insert("stop_voting_view".into(), 0.into());
92    }
93    if !config.contains_key("start_proposing_time") {
94        config.insert("start_proposing_time".into(), 9007199254740991u64.into());
95    }
96    if !config.contains_key("stop_proposing_time") {
97        config.insert("stop_proposing_time".into(), 0.into());
98    }
99    if !config.contains_key("start_voting_time") {
100        config.insert("start_voting_time".into(), 9007199254740991u64.into());
101    }
102    if !config.contains_key("stop_voting_time") {
103        config.insert("stop_voting_time".into(), 0.into());
104    }
105
106    // HotShotConfig was upgraded to include an `epoch_height` parameter. Initialize with a default
107    // if missing.
108    if !config.contains_key("epoch_height") {
109        config.insert("epoch_height".into(), 0.into());
110    }
111
112    // HotShotConfig was upgraded to include `drb_difficulty` and `drb_upgrade_difficulty` parameters. Initialize with a default
113    // if missing.
114    if !config.contains_key("drb_difficulty") {
115        config.insert("drb_difficulty".into(), 0.into());
116    }
117    if !config.contains_key("drb_upgrade_difficulty") {
118        config.insert("drb_upgrade_difficulty".into(), 0.into());
119    }
120
121    // HotShotConfig was upgraded to include `da_committeees`. Initialize with an empty `da_committees` if missing.
122    if !config.contains_key("da_committees") {
123        config.insert("da_committees".into(), serde_json::json!([]));
124    }
125
126    Ok(network_config)
127}
128
129#[async_trait]
130pub trait ChainConfigPersistence: Sized + Send + Sync {
131    async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()>;
132}
133
134#[cfg(test)]
135mod tests {
136    use std::{cmp::max, collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration};
137
138    use alloy::{
139        network::EthereumWallet,
140        primitives::{Address, U256},
141        providers::{Provider, ProviderBuilder, ext::AnvilApi},
142    };
143    use anyhow::bail;
144    use async_lock::{Mutex, RwLock};
145    use async_trait::async_trait;
146    use committable::{Commitment, Committable};
147    use espresso_contract_deployer::{
148        Contract, Contracts, DEFAULT_EXIT_ESCROW_PERIOD_SECONDS, builder::DeployerArgsBuilder,
149        network_config::light_client_genesis_from_stake_table,
150    };
151    use espresso_types::{
152        Event, L1Client, L1ClientOptions, Leaf, Leaf2, NodeState, PubKey, SeqTypes, ValidatedState,
153        traits::{
154            EventConsumer, EventsPersistenceRead, MembershipPersistence, NullEventConsumer,
155            PersistenceOptions, SequencerPersistence,
156        },
157        v0_3::{AuthenticatedValidator, EventKey, Fetcher, RegisteredValidator, StakeTableEvent},
158    };
159    use futures::{StreamExt, TryStreamExt, future::join_all};
160    use hotshot::{
161        InitializerEpochInfo,
162        types::{BLSPubKey, SignatureKey},
163    };
164    use hotshot_contract_adapter::{
165        sol_types::StakeTableV3::Delegated, stake_table::StakeTableContractVersion,
166    };
167    use hotshot_example_types::node_types::TEST_VERSIONS;
168    use hotshot_query_service::{availability::BlockQueryData, testing::mocks::MOCK_UPGRADE};
169    use hotshot_types::{
170        data::{
171            DaProposal2, EpochNumber, QuorumProposal2, QuorumProposalWrapper, VidCommitment,
172            ViewNumber, ns_table::parse_ns_table, vid_commitment, vid_disperse::AvidMDisperseShare,
173        },
174        event::{EventType, HotShotAction, LeafInfo},
175        light_client::StateKeyPair,
176        message::{Proposal, UpgradeLock, convert_proposal},
177        new_protocol::CoordinatorEvent,
178        simple_certificate::{
179            CertificatePair, NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2,
180            UpgradeCertificate,
181        },
182        simple_vote::{NextEpochQuorumData2, QuorumData2, UpgradeProposalData, VersionedVoteData},
183        traits::{EncodeBytes, block_contents::BlockHeader},
184        utils::EpochTransitionIndicator,
185        vid::avidm::{AvidMScheme, init_avidm_param},
186        vote::HasViewNumber,
187    };
188    use indexmap::IndexMap;
189    use staking_cli::demo::{DelegationConfig, StakingTransactions};
190    use surf_disco::Client;
191    use test_utils::reserve_tcp_port;
192    use tide_disco::error::ServerError;
193    use tokio::{spawn, time::sleep};
194    use vbs::version::Version;
195    use versions::{Upgrade, version};
196
197    use crate::{
198        RECENT_STAKE_TABLES_LIMIT, SequencerApiVersion,
199        api::{
200            Options,
201            test_helpers::{STAKE_TABLE_CAPACITY_FOR_TEST, TestNetwork, TestNetworkConfigBuilder},
202        },
203        catchup::NullStateCatchup,
204        testing::{TestConfigBuilder, staking_priv_keys},
205    };
206
207    #[async_trait]
208    pub trait TestablePersistence: SequencerPersistence + MembershipPersistence {
209        type Storage: Sync;
210
211        async fn tmp_storage() -> Self::Storage;
212        fn options(storage: &Self::Storage) -> impl PersistenceOptions<Persistence = Self>;
213
214        async fn connect(storage: &Self::Storage) -> Self {
215            Self::options(storage).create().await.unwrap()
216        }
217    }
218
219    #[rstest_reuse::template]
220    #[rstest::rstest]
221    #[case(PhantomData::<crate::persistence::sql::Persistence>)]
222    #[case(PhantomData::<crate::persistence::fs::Persistence>)]
223    #[test_log::test(tokio::test(flavor = "multi_thread"))]
224    pub fn persistence_types<P: TestablePersistence>(#[case] _p: PhantomData<P>) {}
225
226    #[derive(Clone, Debug, Default)]
227    struct EventCollector {
228        events: Arc<RwLock<Vec<Event>>>,
229    }
230
231    impl EventCollector {
232        async fn leaf_chain(&self) -> Vec<LeafInfo<SeqTypes>> {
233            self.events
234                .read()
235                .await
236                .iter()
237                .flat_map(|event| {
238                    let EventType::Decide { leaf_chain, .. } = &event.event else {
239                        panic!("expected decide event, got {event:?}");
240                    };
241                    leaf_chain.iter().cloned().rev()
242                })
243                .collect::<Vec<_>>()
244        }
245    }
246
247    #[async_trait]
248    impl EventConsumer for EventCollector {
249        async fn handle_event(&self, event: &CoordinatorEvent<SeqTypes>) -> anyhow::Result<()> {
250            if let CoordinatorEvent::LegacyEvent(event) = event {
251                self.events.write().await.push(event.clone());
252            }
253            Ok(())
254        }
255    }
256
257    #[derive(Clone, Copy, Debug)]
258    struct FailConsumer;
259
260    #[async_trait]
261    impl EventConsumer for FailConsumer {
262        async fn handle_event(&self, _: &CoordinatorEvent<SeqTypes>) -> anyhow::Result<()> {
263            bail!("mock error injection");
264        }
265    }
266
267    #[rstest_reuse::apply(persistence_types)]
268    pub async fn test_voted_view<P: TestablePersistence>(_p: PhantomData<P>) {
269        let tmp = P::tmp_storage().await;
270        let storage = P::connect(&tmp).await;
271
272        // Initially, there is no saved view.
273        assert_eq!(storage.load_latest_acted_view().await.unwrap(), None);
274
275        // Store a view.
276        let view1 = ViewNumber::genesis();
277        storage
278            .record_action(view1, None, HotShotAction::Vote)
279            .await
280            .unwrap();
281        assert_eq!(
282            storage.load_latest_acted_view().await.unwrap().unwrap(),
283            view1
284        );
285
286        // Store a newer view, make sure storage gets updated.
287        let view2 = view1 + 1;
288        storage
289            .record_action(view2, None, HotShotAction::Vote)
290            .await
291            .unwrap();
292        assert_eq!(
293            storage.load_latest_acted_view().await.unwrap().unwrap(),
294            view2
295        );
296
297        // Store an old view, make sure storage is unchanged.
298        storage
299            .record_action(view1, None, HotShotAction::Vote)
300            .await
301            .unwrap();
302        assert_eq!(
303            storage.load_latest_acted_view().await.unwrap().unwrap(),
304            view2
305        );
306    }
307
308    #[rstest_reuse::apply(persistence_types)]
309    pub async fn test_restart_view<P: TestablePersistence>(_p: PhantomData<P>) {
310        let tmp = P::tmp_storage().await;
311        let storage = P::connect(&tmp).await;
312
313        // Initially, there is no saved view.
314        assert_eq!(storage.load_restart_view().await.unwrap(), None);
315
316        // Store a view.
317        let view1 = ViewNumber::genesis();
318        storage
319            .record_action(view1, None, HotShotAction::Vote)
320            .await
321            .unwrap();
322        assert_eq!(
323            storage.load_restart_view().await.unwrap().unwrap(),
324            view1 + 1
325        );
326
327        // Store a newer view, make sure storage gets updated.
328        let view2 = view1 + 1;
329        storage
330            .record_action(view2, None, HotShotAction::Vote)
331            .await
332            .unwrap();
333        assert_eq!(
334            storage.load_restart_view().await.unwrap().unwrap(),
335            view2 + 1
336        );
337
338        // Store an old view, make sure storage is unchanged.
339        storage
340            .record_action(view1, None, HotShotAction::Vote)
341            .await
342            .unwrap();
343        assert_eq!(
344            storage.load_restart_view().await.unwrap().unwrap(),
345            view2 + 1
346        );
347
348        // store a higher proposed view, make sure storage is unchanged.
349        storage
350            .record_action(view2 + 1, None, HotShotAction::Propose)
351            .await
352            .unwrap();
353        assert_eq!(
354            storage.load_restart_view().await.unwrap().unwrap(),
355            view2 + 1
356        );
357
358        // store a higher timeout vote view, make sure storage is unchanged.
359        storage
360            .record_action(view2 + 1, None, HotShotAction::TimeoutVote)
361            .await
362            .unwrap();
363        assert_eq!(
364            storage.load_restart_view().await.unwrap().unwrap(),
365            view2 + 1
366        );
367    }
368
369    #[rstest_reuse::apply(persistence_types)]
370    pub async fn test_store_drb_input<P: TestablePersistence>(_p: PhantomData<P>) {
371        use hotshot_types::drb::DrbInput;
372
373        let tmp = P::tmp_storage().await;
374        let storage = P::connect(&tmp).await;
375        let difficulty_level = 10;
376
377        // Initially, there is no saved info.
378        if storage.load_drb_input(10).await.is_ok() {
379            panic!("unexpected nonempty drb_input");
380        }
381
382        let drb_input_1 = DrbInput {
383            epoch: 10,
384            iteration: 10,
385            value: [0u8; 32],
386            difficulty_level,
387        };
388
389        let drb_input_2 = DrbInput {
390            epoch: 10,
391            iteration: 20,
392            value: [0u8; 32],
393            difficulty_level,
394        };
395
396        let drb_input_3 = DrbInput {
397            epoch: 10,
398            iteration: 30,
399            value: [0u8; 32],
400            difficulty_level,
401        };
402
403        let _ = storage.store_drb_input(drb_input_1.clone()).await;
404
405        assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_1);
406
407        let _ = storage.store_drb_input(drb_input_3.clone()).await;
408
409        // check that the drb input is overwritten
410        assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
411
412        let _ = storage.store_drb_input(drb_input_2.clone()).await;
413
414        // check that the drb input is not overwritten by the older value
415        assert_eq!(storage.load_drb_input(10).await.unwrap(), drb_input_3);
416    }
417
418    #[rstest_reuse::apply(persistence_types)]
419    pub async fn test_epoch_info<P: TestablePersistence>(_p: PhantomData<P>) {
420        let tmp = P::tmp_storage().await;
421        let storage = P::connect(&tmp).await;
422
423        // Initially, there is no saved info.
424        assert_eq!(storage.load_start_epoch_info().await.unwrap(), Vec::new());
425
426        // Store a drb result.
427        storage
428            .store_drb_result(EpochNumber::new(1), [1; 32])
429            .await
430            .unwrap();
431        assert_eq!(
432            storage.load_start_epoch_info().await.unwrap(),
433            vec![InitializerEpochInfo::<SeqTypes> {
434                epoch: EpochNumber::new(1),
435                drb_result: [1; 32],
436                block_header: None,
437            }]
438        );
439
440        // Store a second DRB result
441        storage
442            .store_drb_result(EpochNumber::new(2), [3; 32])
443            .await
444            .unwrap();
445        assert_eq!(
446            storage.load_start_epoch_info().await.unwrap(),
447            vec![
448                InitializerEpochInfo::<SeqTypes> {
449                    epoch: EpochNumber::new(1),
450                    drb_result: [1; 32],
451                    block_header: None,
452                },
453                InitializerEpochInfo::<SeqTypes> {
454                    epoch: EpochNumber::new(2),
455                    drb_result: [3; 32],
456                    block_header: None,
457                }
458            ]
459        );
460
461        // Make a header
462        let instance_state = NodeState::mock();
463        let validated_state = hotshot_types::traits::ValidatedState::genesis(&instance_state).0;
464        let leaf: Leaf2 = Leaf::genesis(&validated_state, &instance_state, MOCK_UPGRADE.base)
465            .await
466            .into();
467        let header = leaf.block_header().clone();
468
469        // Test storing the header
470        storage
471            .store_epoch_root(EpochNumber::new(1), header.clone())
472            .await
473            .unwrap();
474        assert_eq!(
475            storage.load_start_epoch_info().await.unwrap(),
476            vec![
477                InitializerEpochInfo::<SeqTypes> {
478                    epoch: EpochNumber::new(1),
479                    drb_result: [1; 32],
480                    block_header: Some(header.clone()),
481                },
482                InitializerEpochInfo::<SeqTypes> {
483                    epoch: EpochNumber::new(2),
484                    drb_result: [3; 32],
485                    block_header: None,
486                }
487            ]
488        );
489
490        // Store more than the limit
491        let total_epochs = RECENT_STAKE_TABLES_LIMIT + 10;
492        for i in 0..total_epochs {
493            let epoch = EpochNumber::new(i);
494            let drb = [i as u8; 32];
495            storage
496                .store_drb_result(epoch, drb)
497                .await
498                .unwrap_or_else(|_| panic!("Failed to store DRB result for epoch {i}"));
499        }
500
501        let results = storage.load_start_epoch_info().await.unwrap();
502
503        // Check that only the most recent RECENT_STAKE_TABLES_LIMIT epochs are returned
504        assert_eq!(
505            results.len(),
506            RECENT_STAKE_TABLES_LIMIT as usize,
507            "Should return only the most recent {RECENT_STAKE_TABLES_LIMIT} epochs",
508        );
509
510        for (i, info) in results.iter().enumerate() {
511            let expected_epoch =
512                EpochNumber::new(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64);
513            let expected_drb = [(total_epochs - RECENT_STAKE_TABLES_LIMIT + i as u64) as u8; 32];
514            assert_eq!(info.epoch, expected_epoch, "invalid epoch at index {i}",);
515            assert_eq!(info.drb_result, expected_drb, "invalid DRB at index {i}",);
516            assert!(info.block_header.is_none(), "Expected no block header");
517        }
518    }
519
520    fn leaf_info(leaf: Leaf2) -> LeafInfo<SeqTypes> {
521        LeafInfo {
522            leaf,
523            vid_share: None,
524            state: Default::default(),
525            delta: None,
526            state_cert: None,
527        }
528    }
529
530    #[rstest_reuse::apply(persistence_types)]
531    pub async fn test_append_and_decide<P: TestablePersistence>(_p: PhantomData<P>) {
532        let tmp = P::tmp_storage().await;
533        let storage = P::connect(&tmp).await;
534
535        // Test append VID
536        assert_eq!(
537            storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
538            None
539        );
540
541        let leaf: Leaf2 = Leaf2::genesis(
542            &ValidatedState::default(),
543            &NodeState::mock(),
544            TEST_VERSIONS.test.base,
545        )
546        .await;
547        let leaf_payload = leaf.block_payload().unwrap();
548        let leaf_payload_bytes_arc = leaf_payload.encode();
549
550        let avidm_param = init_avidm_param(2).unwrap();
551        let weights = vec![1u32; 2];
552
553        let ns_table = parse_ns_table(
554            leaf_payload.byte_len().as_usize(),
555            &leaf_payload.ns_table().encode(),
556        );
557        let (payload_commitment, shares) =
558            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
559                .unwrap();
560
561        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
562        let signature = PubKey::sign(&privkey, &[]).unwrap();
563        let mut vid = AvidMDisperseShare::<SeqTypes> {
564            view_number: ViewNumber::new(0),
565            payload_commitment,
566            share: shares[0].clone(),
567            recipient_key: pubkey,
568            epoch: Some(EpochNumber::new(0)),
569            target_epoch: Some(EpochNumber::new(0)),
570            common: avidm_param,
571        };
572        let mut quorum_proposal = Proposal {
573            data: QuorumProposalWrapper::<SeqTypes> {
574                proposal: QuorumProposal2::<SeqTypes> {
575                    epoch: None,
576                    block_header: leaf.block_header().clone(),
577                    view_number: ViewNumber::genesis(),
578                    justify_qc: QuorumCertificate2::genesis(
579                        &ValidatedState::default(),
580                        &NodeState::mock(),
581                        TEST_VERSIONS.test,
582                    )
583                    .await,
584                    upgrade_certificate: None,
585                    view_change_evidence: None,
586                    next_drb_result: None,
587                    next_epoch_justify_qc: None,
588                    state_cert: None,
589                },
590            },
591            signature,
592            _pd: Default::default(),
593        };
594
595        let vid_share0 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
596
597        storage.append_vid(&vid_share0).await.unwrap();
598
599        assert_eq!(
600            storage.load_vid_share(ViewNumber::new(0)).await.unwrap(),
601            Some(vid_share0.clone())
602        );
603
604        vid.view_number = ViewNumber::new(1);
605
606        let vid_share1 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
607        storage.append_vid(&vid_share1).await.unwrap();
608
609        assert_eq!(
610            storage.load_vid_share(vid.view_number()).await.unwrap(),
611            Some(vid_share1.clone())
612        );
613
614        vid.view_number = ViewNumber::new(2);
615
616        let vid_share2 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
617        storage.append_vid(&vid_share2).await.unwrap();
618
619        assert_eq!(
620            storage.load_vid_share(vid.view_number()).await.unwrap(),
621            Some(vid_share2.clone())
622        );
623
624        vid.view_number = ViewNumber::new(3);
625
626        let vid_share3 = convert_proposal(vid.clone().to_proposal(&privkey).unwrap().clone());
627        storage.append_vid(&vid_share3).await.unwrap();
628
629        assert_eq!(
630            storage.load_vid_share(vid.view_number()).await.unwrap(),
631            Some(vid_share3.clone())
632        );
633
634        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
635            .expect("Failed to sign block payload");
636
637        let da_proposal_inner = DaProposal2::<SeqTypes> {
638            encoded_transactions: leaf_payload_bytes_arc.clone(),
639            metadata: leaf_payload.ns_table().clone(),
640            view_number: ViewNumber::new(0),
641            epoch: None,
642            epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
643        };
644
645        let da_proposal = Proposal {
646            data: da_proposal_inner,
647            signature: block_payload_signature,
648            _pd: Default::default(),
649        };
650
651        let vid_commitment = vid_commitment(
652            &leaf_payload_bytes_arc,
653            &leaf.block_header().metadata().encode(),
654            2,
655            TEST_VERSIONS.test.base,
656        );
657
658        storage
659            .append_da2(&da_proposal, vid_commitment)
660            .await
661            .unwrap();
662
663        assert_eq!(
664            storage.load_da_proposal(ViewNumber::new(0)).await.unwrap(),
665            Some(da_proposal.clone())
666        );
667
668        let mut da_proposal1 = da_proposal.clone();
669        da_proposal1.data.view_number = ViewNumber::new(1);
670        storage
671            .append_da2(&da_proposal1.clone(), vid_commitment)
672            .await
673            .unwrap();
674
675        assert_eq!(
676            storage
677                .load_da_proposal(da_proposal1.data.view_number)
678                .await
679                .unwrap(),
680            Some(da_proposal1.clone())
681        );
682
683        let mut da_proposal2 = da_proposal1.clone();
684        da_proposal2.data.view_number = ViewNumber::new(2);
685        storage
686            .append_da2(&da_proposal2.clone(), vid_commitment)
687            .await
688            .unwrap();
689
690        assert_eq!(
691            storage
692                .load_da_proposal(da_proposal2.data.view_number)
693                .await
694                .unwrap(),
695            Some(da_proposal2.clone())
696        );
697
698        let mut da_proposal3 = da_proposal2.clone();
699        da_proposal3.data.view_number = ViewNumber::new(3);
700        storage
701            .append_da2(&da_proposal3.clone(), vid_commitment)
702            .await
703            .unwrap();
704
705        assert_eq!(
706            storage
707                .load_da_proposal(da_proposal3.data.view_number)
708                .await
709                .unwrap(),
710            Some(da_proposal3.clone())
711        );
712
713        let quorum_proposal1 = quorum_proposal.clone();
714
715        storage
716            .append_quorum_proposal2(&quorum_proposal1)
717            .await
718            .unwrap();
719
720        assert_eq!(
721            storage.load_quorum_proposals().await.unwrap(),
722            BTreeMap::from_iter([(ViewNumber::genesis(), quorum_proposal1.clone())])
723        );
724
725        quorum_proposal.data.proposal.view_number = ViewNumber::new(1);
726        let quorum_proposal2 = quorum_proposal.clone();
727        storage
728            .append_quorum_proposal2(&quorum_proposal2)
729            .await
730            .unwrap();
731
732        assert_eq!(
733            storage.load_quorum_proposals().await.unwrap(),
734            BTreeMap::from_iter([
735                (ViewNumber::genesis(), quorum_proposal1.clone()),
736                (ViewNumber::new(1), quorum_proposal2.clone())
737            ])
738        );
739
740        quorum_proposal.data.proposal.view_number = ViewNumber::new(2);
741        quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(1);
742        let quorum_proposal3 = quorum_proposal.clone();
743        storage
744            .append_quorum_proposal2(&quorum_proposal3)
745            .await
746            .unwrap();
747
748        assert_eq!(
749            storage.load_quorum_proposals().await.unwrap(),
750            BTreeMap::from_iter([
751                (ViewNumber::genesis(), quorum_proposal1.clone()),
752                (ViewNumber::new(1), quorum_proposal2.clone()),
753                (ViewNumber::new(2), quorum_proposal3.clone())
754            ])
755        );
756
757        quorum_proposal.data.proposal.view_number = ViewNumber::new(3);
758        quorum_proposal.data.proposal.justify_qc.view_number = ViewNumber::new(2);
759
760        // This one should stick around after GC runs.
761        let quorum_proposal4 = quorum_proposal.clone();
762        storage
763            .append_quorum_proposal2(&quorum_proposal4)
764            .await
765            .unwrap();
766
767        assert_eq!(
768            storage.load_quorum_proposals().await.unwrap(),
769            BTreeMap::from_iter([
770                (ViewNumber::genesis(), quorum_proposal1.clone()),
771                (ViewNumber::new(1), quorum_proposal2.clone()),
772                (ViewNumber::new(2), quorum_proposal3.clone()),
773                (ViewNumber::new(3), quorum_proposal4.clone())
774            ])
775        );
776
777        // Test decide and garbage collection. Pass in a leaf chain with no VID shares or payloads,
778        // so we have to fetch the missing data from storage.
779        let leaves = [
780            Leaf2::from_quorum_proposal(&quorum_proposal1.data),
781            Leaf2::from_quorum_proposal(&quorum_proposal2.data),
782            Leaf2::from_quorum_proposal(&quorum_proposal3.data),
783            Leaf2::from_quorum_proposal(&quorum_proposal4.data),
784        ];
785        let mut final_qc = leaves[3].justify_qc();
786        final_qc.view_number += 1;
787        final_qc.data.leaf_commit = Committable::commit(&leaf);
788        let qcs = [
789            leaves[1].justify_qc(),
790            leaves[2].justify_qc(),
791            leaves[3].justify_qc(),
792            final_qc,
793        ];
794
795        assert_eq!(
796            storage.load_anchor_view().await.unwrap(),
797            ViewNumber::genesis()
798        );
799
800        let consumer = EventCollector::default();
801        let leaf_chain = leaves
802            .iter()
803            .take(3)
804            .map(|leaf| leaf_info(leaf.clone()))
805            .zip(&qcs)
806            .collect::<Vec<_>>();
807        tracing::info!(?leaf_chain, "decide view 2");
808        storage
809            .append_decided_leaves(
810                ViewNumber::new(2),
811                leaf_chain
812                    .iter()
813                    .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change((*qc).clone()))),
814                None,
815                &consumer,
816            )
817            .await
818            .unwrap();
819        assert_eq!(
820            storage.load_anchor_view().await.unwrap(),
821            ViewNumber::new(2)
822        );
823
824        for i in 0..=2 {
825            assert_eq!(
826                storage.load_da_proposal(ViewNumber::new(i)).await.unwrap(),
827                None
828            );
829
830            assert_eq!(
831                storage.load_vid_share(ViewNumber::new(i)).await.unwrap(),
832                None
833            );
834        }
835
836        assert_eq!(
837            storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
838            Some(da_proposal3)
839        );
840
841        assert_eq!(
842            storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
843            Some(convert_proposal(vid_share3.clone()))
844        );
845
846        let proposals = storage.load_quorum_proposals().await.unwrap();
847        assert_eq!(
848            proposals,
849            BTreeMap::from_iter([(ViewNumber::new(3), quorum_proposal4)])
850        );
851
852        // A decide event should have been processed.
853        for (leaf, info) in leaves.iter().zip(consumer.leaf_chain().await.iter()) {
854            assert_eq!(info.leaf, *leaf);
855            let decided_vid_share = info.vid_share.as_ref().unwrap();
856            assert_eq!(decided_vid_share.view_number(), leaf.view_number());
857        }
858
859        // The decided leaf should not have been garbage collected.
860        assert_eq!(
861            storage.load_anchor_leaf().await.unwrap(),
862            Some((leaves[2].clone(), qcs[2].clone()))
863        );
864        assert_eq!(
865            storage.load_anchor_view().await.unwrap(),
866            leaves[2].view_number()
867        );
868
869        // Process a second decide event.
870        let consumer = EventCollector::default();
871        tracing::info!(leaf = ?leaves[3], qc = ?qcs[3], "decide view 3");
872        storage
873            .append_decided_leaves(
874                ViewNumber::new(3),
875                vec![(
876                    &leaf_info(leaves[3].clone()),
877                    CertificatePair::non_epoch_change(qcs[3].clone()),
878                )],
879                None,
880                &consumer,
881            )
882            .await
883            .unwrap();
884        assert_eq!(
885            storage.load_anchor_view().await.unwrap(),
886            ViewNumber::new(3)
887        );
888
889        // A decide event should have been processed.
890        let events = consumer.events.read().await;
891        assert_eq!(events.len(), 1);
892        assert_eq!(events[0].view_number, ViewNumber::new(3));
893        let EventType::Decide {
894            committing_qc,
895            leaf_chain,
896            ..
897        } = &events[0].event
898        else {
899            panic!("expected decide event, got {:?}", events[0]);
900        };
901        assert_eq!(*committing_qc.qc(), qcs[3]);
902        assert_eq!(leaf_chain.len(), 1);
903        let info = &leaf_chain[0];
904        assert_eq!(info.leaf, leaves[3]);
905
906        // The remaining data should have been GCed.
907        assert_eq!(
908            storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(),
909            None
910        );
911
912        assert_eq!(
913            storage.load_vid_share(ViewNumber::new(3)).await.unwrap(),
914            None
915        );
916        assert_eq!(
917            storage.load_quorum_proposals().await.unwrap(),
918            BTreeMap::new()
919        );
920    }
921
922    #[rstest_reuse::apply(persistence_types)]
923    pub async fn test_upgrade_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
924        let tmp = P::tmp_storage().await;
925        let storage = P::connect(&tmp).await;
926
927        // Test get upgrade certificate
928        assert_eq!(storage.load_upgrade_certificate().await.unwrap(), None);
929
930        let upgrade_data = UpgradeProposalData {
931            old_version: Version { major: 0, minor: 1 },
932            new_version: Version { major: 1, minor: 0 },
933            decide_by: ViewNumber::genesis(),
934            new_version_hash: Default::default(),
935            old_version_last_view: ViewNumber::genesis(),
936            new_version_first_view: ViewNumber::genesis(),
937        };
938
939        let decide_upgrade_certificate = UpgradeCertificate::<SeqTypes>::new(
940            upgrade_data.clone(),
941            upgrade_data.commit(),
942            ViewNumber::genesis(),
943            Default::default(),
944            Default::default(),
945        );
946        let res = storage
947            .store_upgrade_certificate(Some(decide_upgrade_certificate.clone()))
948            .await;
949        assert!(res.is_ok());
950
951        let res = storage.load_upgrade_certificate().await.unwrap();
952        let view_number = res.unwrap().view_number;
953        assert_eq!(view_number, ViewNumber::genesis());
954
955        let new_view_number_for_certificate = ViewNumber::new(50);
956        let mut new_upgrade_certificate = decide_upgrade_certificate.clone();
957        new_upgrade_certificate.view_number = new_view_number_for_certificate;
958
959        let res = storage
960            .store_upgrade_certificate(Some(new_upgrade_certificate.clone()))
961            .await;
962        assert!(res.is_ok());
963
964        let res = storage.load_upgrade_certificate().await.unwrap();
965        let view_number = res.unwrap().view_number;
966        assert_eq!(view_number, new_view_number_for_certificate);
967    }
968
969    #[rstest_reuse::apply(persistence_types)]
970    pub async fn test_next_epoch_quorum_certificate<P: TestablePersistence>(_p: PhantomData<P>) {
971        let tmp = P::tmp_storage().await;
972        let storage = P::connect(&tmp).await;
973
974        //  test that next epoch qc2 does not exist
975        assert_eq!(
976            storage.load_next_epoch_quorum_certificate().await.unwrap(),
977            None
978        );
979
980        let upgrade_lock = UpgradeLock::<SeqTypes>::new(TEST_VERSIONS.test);
981
982        let genesis_view = ViewNumber::genesis();
983
984        let leaf = Leaf2::genesis(
985            &ValidatedState::default(),
986            &NodeState::default(),
987            TEST_VERSIONS.test.base,
988        )
989        .await;
990        let data: NextEpochQuorumData2<SeqTypes> = QuorumData2 {
991            leaf_commit: leaf.commit(),
992            epoch: Some(EpochNumber::new(1)),
993            block_number: Some(leaf.height()),
994        }
995        .into();
996
997        let versioned_data =
998            VersionedVoteData::new_infallible(data.clone(), genesis_view, &upgrade_lock);
999
1000        let bytes: [u8; 32] = versioned_data.commit().into();
1001
1002        let next_epoch_qc = NextEpochQuorumCertificate2::new(
1003            data,
1004            Commitment::from_raw(bytes),
1005            genesis_view,
1006            None,
1007            PhantomData,
1008        );
1009
1010        let res = storage
1011            .store_next_epoch_quorum_certificate(next_epoch_qc.clone())
1012            .await;
1013        assert!(res.is_ok());
1014
1015        let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
1016        let view_number = res.unwrap().view_number;
1017        assert_eq!(view_number, ViewNumber::genesis());
1018
1019        let new_view_number_for_qc = ViewNumber::new(50);
1020        let mut new_qc = next_epoch_qc.clone();
1021        new_qc.view_number = new_view_number_for_qc;
1022
1023        let res = storage
1024            .store_next_epoch_quorum_certificate(new_qc.clone())
1025            .await;
1026        assert!(res.is_ok());
1027
1028        let res = storage.load_next_epoch_quorum_certificate().await.unwrap();
1029        let view_number = res.unwrap().view_number;
1030        assert_eq!(view_number, new_view_number_for_qc);
1031    }
1032
1033    #[rstest_reuse::apply(persistence_types)]
1034    pub async fn test_decide_with_failing_event_consumer<P: TestablePersistence>(
1035        _p: PhantomData<P>,
1036    ) {
1037        let tmp = P::tmp_storage().await;
1038        let storage = P::connect(&tmp).await;
1039
1040        // Create a short blockchain.
1041        let mut chain = vec![];
1042
1043        let leaf: Leaf2 = Leaf::genesis(
1044            &ValidatedState::default(),
1045            &NodeState::mock(),
1046            MOCK_UPGRADE.base,
1047        )
1048        .await
1049        .into();
1050        let leaf_payload = leaf.block_payload().unwrap();
1051        let leaf_payload_bytes_arc = leaf_payload.encode();
1052        let avidm_param = init_avidm_param(2).unwrap();
1053        let weights = vec![1u32; 2];
1054        let ns_table = parse_ns_table(
1055            leaf_payload.byte_len().as_usize(),
1056            &leaf_payload.ns_table().encode(),
1057        );
1058        let (payload_commitment, shares) =
1059            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
1060                .unwrap();
1061
1062        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
1063        let mut vid = AvidMDisperseShare::<SeqTypes> {
1064            view_number: ViewNumber::new(0),
1065            payload_commitment,
1066            share: shares[0].clone(),
1067            recipient_key: pubkey,
1068            epoch: Some(EpochNumber::new(0)),
1069            target_epoch: Some(EpochNumber::new(0)),
1070            common: avidm_param,
1071        }
1072        .to_proposal(&privkey)
1073        .unwrap()
1074        .clone();
1075        let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1076            proposal: QuorumProposal2::<SeqTypes> {
1077                block_header: leaf.block_header().clone(),
1078                view_number: ViewNumber::genesis(),
1079                justify_qc: QuorumCertificate::genesis(
1080                    &ValidatedState::default(),
1081                    &NodeState::mock(),
1082                    TEST_VERSIONS.test,
1083                )
1084                .await
1085                .to_qc2(),
1086                upgrade_certificate: None,
1087                view_change_evidence: None,
1088                next_drb_result: None,
1089                next_epoch_justify_qc: None,
1090                epoch: None,
1091                state_cert: None,
1092            },
1093        };
1094        let mut qc = QuorumCertificate2::genesis(
1095            &ValidatedState::default(),
1096            &NodeState::mock(),
1097            TEST_VERSIONS.test,
1098        )
1099        .await;
1100
1101        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
1102            .expect("Failed to sign block payload");
1103        let mut da_proposal = Proposal {
1104            data: DaProposal2::<SeqTypes> {
1105                encoded_transactions: leaf_payload_bytes_arc.clone(),
1106                metadata: leaf_payload.ns_table().clone(),
1107                view_number: ViewNumber::new(0),
1108                epoch: Some(EpochNumber::new(0)),
1109                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1110            },
1111            signature: block_payload_signature,
1112            _pd: Default::default(),
1113        };
1114
1115        let vid_commitment = vid_commitment(
1116            &leaf_payload_bytes_arc,
1117            &leaf.block_header().metadata().encode(),
1118            2,
1119            TEST_VERSIONS.test.base,
1120        );
1121
1122        for i in 0..4 {
1123            quorum_proposal.proposal.view_number = ViewNumber::new(i);
1124            let leaf = Leaf2::from_quorum_proposal(&quorum_proposal);
1125            qc.view_number = leaf.view_number();
1126            qc.data.leaf_commit = Committable::commit(&leaf);
1127            vid.data.view_number = leaf.view_number();
1128            da_proposal.data.view_number = leaf.view_number();
1129            chain.push((leaf.clone(), qc.clone(), vid.clone(), da_proposal.clone()));
1130        }
1131
1132        // Add proposals.
1133        for (_, _, vid, da) in &chain {
1134            tracing::info!(?da, ?vid, "insert proposal");
1135            storage.append_da2(da, vid_commitment).await.unwrap();
1136            storage
1137                .append_vid(&convert_proposal(vid.clone()))
1138                .await
1139                .unwrap();
1140        }
1141
1142        // Decide 2 leaves, but fail in event processing.
1143        let leaf_chain = chain
1144            .iter()
1145            .take(2)
1146            .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1147            .collect::<Vec<_>>();
1148        tracing::info!("decide with event handling failure");
1149        storage
1150            .append_decided_leaves(
1151                ViewNumber::new(1),
1152                leaf_chain
1153                    .iter()
1154                    .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))),
1155                None,
1156                &FailConsumer,
1157            )
1158            .await
1159            .unwrap();
1160        // No garbage collection should have run.
1161        for i in 0..4 {
1162            tracing::info!(i, "check proposal availability");
1163            assert!(
1164                storage
1165                    .load_vid_share(ViewNumber::new(i))
1166                    .await
1167                    .unwrap()
1168                    .is_some()
1169            );
1170            assert!(
1171                storage
1172                    .load_da_proposal(ViewNumber::new(i))
1173                    .await
1174                    .unwrap()
1175                    .is_some()
1176            );
1177        }
1178        tracing::info!("check anchor leaf updated");
1179        assert_eq!(
1180            storage
1181                .load_anchor_leaf()
1182                .await
1183                .unwrap()
1184                .unwrap()
1185                .0
1186                .view_number(),
1187            ViewNumber::new(1)
1188        );
1189        assert_eq!(
1190            storage.load_anchor_view().await.unwrap(),
1191            ViewNumber::new(1)
1192        );
1193
1194        // Now decide remaining leaves successfully. We should now garbage collect and process a
1195        // decide event for all the leaves.
1196        let consumer = EventCollector::default();
1197        let leaf_chain = chain
1198            .iter()
1199            .skip(2)
1200            .map(|(leaf, qc, ..)| (leaf_info(leaf.clone()), qc.clone()))
1201            .collect::<Vec<_>>();
1202        tracing::info!("decide successfully");
1203        storage
1204            .append_decided_leaves(
1205                ViewNumber::new(3),
1206                leaf_chain
1207                    .iter()
1208                    .map(|(leaf, qc)| (leaf, CertificatePair::non_epoch_change(qc.clone()))),
1209                None,
1210                &consumer,
1211            )
1212            .await
1213            .unwrap();
1214        // Garbage collection should have run.
1215        for i in 0..4 {
1216            tracing::info!(i, "check proposal garbage collected");
1217            assert!(
1218                storage
1219                    .load_vid_share(ViewNumber::new(i))
1220                    .await
1221                    .unwrap()
1222                    .is_none()
1223            );
1224            assert!(
1225                storage
1226                    .load_da_proposal(ViewNumber::new(i))
1227                    .await
1228                    .unwrap()
1229                    .is_none()
1230            );
1231        }
1232        tracing::info!("check anchor leaf updated");
1233        assert_eq!(
1234            storage
1235                .load_anchor_leaf()
1236                .await
1237                .unwrap()
1238                .unwrap()
1239                .0
1240                .view_number(),
1241            ViewNumber::new(3)
1242        );
1243        assert_eq!(
1244            storage.load_anchor_view().await.unwrap(),
1245            ViewNumber::new(3)
1246        );
1247
1248        // Check decide event.
1249        tracing::info!("check decide event");
1250        let leaf_chain = consumer.leaf_chain().await;
1251        assert_eq!(leaf_chain.len(), 4, "{leaf_chain:#?}");
1252        for ((leaf, ..), info) in chain.iter().zip(leaf_chain.iter()) {
1253            assert_eq!(info.leaf, *leaf);
1254            let decided_vid_share = info.vid_share.as_ref().unwrap();
1255            assert_eq!(decided_vid_share.view_number(), leaf.view_number());
1256            assert!(info.leaf.block_payload().is_some());
1257        }
1258    }
1259
1260    #[rstest_reuse::apply(persistence_types)]
1261    pub async fn test_pruning<P: TestablePersistence>(_p: PhantomData<P>) {
1262        let tmp = P::tmp_storage().await;
1263
1264        let mut options = P::options(&tmp);
1265        options.set_view_retention(1);
1266        let storage = options.create().await.unwrap();
1267
1268        // Add some "old" data, from view 0.
1269        let leaf = Leaf::genesis(
1270            &ValidatedState::default(),
1271            &NodeState::mock(),
1272            MOCK_UPGRADE.base,
1273        )
1274        .await;
1275        let leaf_payload = leaf.block_payload().unwrap();
1276        let leaf_payload_bytes_arc = leaf_payload.encode();
1277        let avidm_param = init_avidm_param(2).unwrap();
1278        let weights = vec![1u32; 2];
1279
1280        let ns_table = parse_ns_table(
1281            leaf_payload.byte_len().as_usize(),
1282            &leaf_payload.ns_table().encode(),
1283        );
1284        let (payload_commitment, shares) =
1285            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
1286                .unwrap();
1287
1288        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
1289        let vid_share = convert_proposal(
1290            AvidMDisperseShare::<SeqTypes> {
1291                view_number: ViewNumber::new(0),
1292                payload_commitment,
1293                share: shares[0].clone(),
1294                recipient_key: pubkey,
1295                epoch: None,
1296                target_epoch: None,
1297                common: avidm_param,
1298            }
1299            .to_proposal(&privkey)
1300            .unwrap()
1301            .clone(),
1302        );
1303
1304        let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
1305            proposal: QuorumProposal2::<SeqTypes> {
1306                block_header: leaf.block_header().clone(),
1307                view_number: ViewNumber::genesis(),
1308                justify_qc: QuorumCertificate::genesis(
1309                    &ValidatedState::default(),
1310                    &NodeState::mock(),
1311                    TEST_VERSIONS.test,
1312                )
1313                .await
1314                .to_qc2(),
1315                upgrade_certificate: None,
1316                view_change_evidence: None,
1317                next_drb_result: None,
1318                next_epoch_justify_qc: None,
1319                epoch: None,
1320                state_cert: None,
1321            },
1322        };
1323        let quorum_proposal_signature =
1324            BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
1325                .expect("Failed to sign quorum proposal");
1326        let quorum_proposal = Proposal {
1327            data: quorum_proposal,
1328            signature: quorum_proposal_signature,
1329            _pd: Default::default(),
1330        };
1331
1332        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
1333            .expect("Failed to sign block payload");
1334        let da_proposal = Proposal {
1335            data: DaProposal2::<SeqTypes> {
1336                encoded_transactions: leaf_payload_bytes_arc,
1337                metadata: leaf_payload.ns_table().clone(),
1338                view_number: ViewNumber::new(0),
1339                epoch: None,
1340                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
1341            },
1342            signature: block_payload_signature,
1343            _pd: Default::default(),
1344        };
1345
1346        storage
1347            .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
1348            .await
1349            .unwrap();
1350        storage.append_vid(&vid_share).await.unwrap();
1351        storage
1352            .append_quorum_proposal2(&quorum_proposal)
1353            .await
1354            .unwrap();
1355
1356        // Decide a newer view, view 1.
1357        storage
1358            .append_decided_leaves(ViewNumber::new(1), [], None, &NullEventConsumer)
1359            .await
1360            .unwrap();
1361
1362        // The old data is not more than the retention period (1 view) old, so it should not be
1363        // GCed.
1364        assert_eq!(
1365            storage
1366                .load_da_proposal(ViewNumber::new(0))
1367                .await
1368                .unwrap()
1369                .unwrap(),
1370            da_proposal
1371        );
1372        assert_eq!(
1373            storage
1374                .load_vid_share(ViewNumber::new(0))
1375                .await
1376                .unwrap()
1377                .unwrap(),
1378            vid_share
1379        );
1380        assert_eq!(
1381            storage
1382                .load_quorum_proposal(ViewNumber::new(0))
1383                .await
1384                .unwrap(),
1385            quorum_proposal
1386        );
1387
1388        // Decide an even newer view, triggering GC of the old data.
1389        storage
1390            .append_decided_leaves(ViewNumber::new(2), [], None, &NullEventConsumer)
1391            .await
1392            .unwrap();
1393        assert!(
1394            storage
1395                .load_da_proposal(ViewNumber::new(0))
1396                .await
1397                .unwrap()
1398                .is_none()
1399        );
1400        assert!(
1401            storage
1402                .load_vid_share(ViewNumber::new(0))
1403                .await
1404                .unwrap()
1405                .is_none()
1406        );
1407        assert!(
1408            storage
1409                .load_quorum_proposal(ViewNumber::new(0))
1410                .await
1411                .is_err()
1412        );
1413    }
1414
1415    async fn assert_events_eq<P: TestablePersistence>(
1416        persistence: &P,
1417        block: u64,
1418        stake_table_fetcher: &Fetcher,
1419        l1_client: &L1Client,
1420        stake_table_contract: Address,
1421    ) -> anyhow::Result<()> {
1422        // Load persisted events
1423        let (stored_l1, events) = persistence.load_events(0, block).await?;
1424        assert!(!events.is_empty());
1425        assert!(stored_l1.is_some());
1426        assert!(events.iter().all(|((l1_block, _), _)| *l1_block <= block));
1427        // Fetch events directly from the contract and compare with persisted data
1428        let contract_events = Fetcher::fetch_events_from_contract(
1429            l1_client.clone(),
1430            stake_table_contract,
1431            None,
1432            block,
1433        )
1434        .await?;
1435        assert_eq!(
1436            contract_events, events,
1437            "Events from contract and persistence do not match"
1438        );
1439
1440        // Fetch events from stake table fetcher and compare with persisted data
1441        let fetched_events = stake_table_fetcher
1442            .fetch_and_store_stake_table_events(stake_table_contract, block)
1443            .await?;
1444        assert_eq!(fetched_events, events);
1445
1446        Ok(())
1447    }
1448
1449    // test for validating stake table event fetching from persistence,
1450    // ensuring that persisted data matches the on-chain events and that event fetcher work correctly.
1451    #[rstest_reuse::apply(persistence_types)]
1452    pub async fn test_stake_table_fetching_from_persistence<P: TestablePersistence>(
1453        #[values(
1454            StakeTableContractVersion::V1,
1455            StakeTableContractVersion::V2,
1456            StakeTableContractVersion::V3
1457        )]
1458        stake_table_version: StakeTableContractVersion,
1459        _p: PhantomData<P>,
1460    ) -> anyhow::Result<()> {
1461        let epoch_height = 20;
1462
1463        let network_config = TestConfigBuilder::default()
1464            .epoch_height(epoch_height)
1465            .build();
1466
1467        let anvil_provider = network_config.anvil().unwrap();
1468
1469        let query_service_port =
1470            reserve_tcp_port().expect("OS should have ephemeral ports available");
1471        let query_api_options = Options::with_port(query_service_port);
1472
1473        const NODE_COUNT: usize = 2;
1474
1475        let storage = join_all((0..NODE_COUNT).map(|_| P::tmp_storage())).await;
1476        let persistence_options: [_; NODE_COUNT] = storage
1477            .iter()
1478            .map(P::options)
1479            .collect::<Vec<_>>()
1480            .try_into()
1481            .unwrap();
1482
1483        let persistence = persistence_options[0].clone().create().await.unwrap();
1484
1485        // Build the config with PoS hook
1486        let l1_url = network_config.l1_url();
1487
1488        let upgrade = Upgrade::trivial(version(0, 3));
1489
1490        let testnet_config = TestNetworkConfigBuilder::with_num_nodes()
1491            .api_config(query_api_options)
1492            .network_config(network_config.clone())
1493            .persistences(persistence_options.clone())
1494            .pos_hook(
1495                DelegationConfig::MultipleDelegators,
1496                stake_table_version,
1497                upgrade,
1498            )
1499            .await
1500            .expect("Pos deployment failed")
1501            .build();
1502
1503        //start the network
1504        let test_network = TestNetwork::new(testnet_config, upgrade).await;
1505
1506        let client: Client<ServerError, SequencerApiVersion> = Client::new(
1507            format!("http://localhost:{query_service_port}")
1508                .parse()
1509                .unwrap(),
1510        );
1511        client.connect(None).await;
1512        tracing::info!(query_service_port, "server running");
1513
1514        // wait until we enter in epoch 3
1515        let _initial_blocks = client
1516            .socket("availability/stream/blocks/0")
1517            .subscribe::<BlockQueryData<SeqTypes>>()
1518            .await
1519            .unwrap()
1520            .take(40)
1521            .try_collect::<Vec<_>>()
1522            .await
1523            .unwrap();
1524        // Load initial persisted events and validate they exist.
1525        let membership_coordinator = test_network
1526            .server
1527            .consensus_handle()
1528            .membership_coordinator()
1529            .await;
1530
1531        let l1_client = L1Client::new(vec![l1_url]).unwrap();
1532        let node_state = test_network.server.node_state();
1533        let chain_config = node_state.chain_config;
1534        let stake_table_contract = chain_config.stake_table_contract.unwrap();
1535
1536        let current_membership = membership_coordinator.membership();
1537        {
1538            let membership_state = current_membership;
1539            let stake_table_fetcher = membership_state.fetcher();
1540
1541            let block1 = anvil_provider
1542                .get_block_number()
1543                .await
1544                .expect("latest l1 block");
1545
1546            assert_events_eq(
1547                &persistence,
1548                block1,
1549                stake_table_fetcher,
1550                &l1_client,
1551                stake_table_contract,
1552            )
1553            .await?;
1554        }
1555        let _epoch_4_blocks = client
1556            .socket("availability/stream/blocks/0")
1557            .subscribe::<BlockQueryData<SeqTypes>>()
1558            .await
1559            .unwrap()
1560            .take(65)
1561            .try_collect::<Vec<_>>()
1562            .await
1563            .unwrap();
1564        let block2 = anvil_provider
1565            .get_block_number()
1566            .await
1567            .expect("latest l1 block");
1568
1569        {
1570            let membership_state = current_membership;
1571            let stake_table_fetcher = membership_state.fetcher();
1572
1573            assert_events_eq(
1574                &persistence,
1575                block2,
1576                stake_table_fetcher,
1577                &l1_client,
1578                stake_table_contract,
1579            )
1580            .await?;
1581        }
1582        Ok(())
1583    }
1584
1585    #[rstest_reuse::apply(persistence_types)]
1586    pub async fn test_stake_table_background_fetching<P: TestablePersistence>(
1587        #[values(
1588            StakeTableContractVersion::V1,
1589            StakeTableContractVersion::V2,
1590            StakeTableContractVersion::V3
1591        )]
1592        stake_table_version: StakeTableContractVersion,
1593        _p: PhantomData<P>,
1594    ) -> anyhow::Result<()> {
1595        use espresso_types::v0_3::ChainConfig;
1596        use hotshot_contract_adapter::stake_table::StakeTableContractVersion;
1597
1598        let blocks_per_epoch = 10;
1599
1600        let network_config = TestConfigBuilder::<1>::default()
1601            .epoch_height(blocks_per_epoch)
1602            .build();
1603
1604        let anvil_provider = network_config.anvil().unwrap();
1605
1606        let (genesis_state, genesis_stake) = light_client_genesis_from_stake_table(
1607            &network_config.hotshot_config().hotshot_stake_table(),
1608            STAKE_TABLE_CAPACITY_FOR_TEST,
1609        )
1610        .unwrap();
1611
1612        let (_, priv_keys): (Vec<_>, Vec<_>) = (0..20)
1613            .map(|i| <PubKey as SignatureKey>::generated_from_seed_indexed([1; 32], i as u64))
1614            .unzip();
1615        let state_key_pairs = (0..20)
1616            .map(|i| StateKeyPair::generate_from_seed_indexed([2; 32], i as u64))
1617            .collect::<Vec<_>>();
1618
1619        let validators = staking_priv_keys(&priv_keys, &state_key_pairs, 20);
1620
1621        let deployer = ProviderBuilder::new()
1622            .wallet(EthereumWallet::from(network_config.signer().clone()))
1623            .connect_http(network_config.l1_url().clone());
1624
1625        let mut contracts = Contracts::new();
1626        let args = DeployerArgsBuilder::default()
1627            .deployer(deployer.clone())
1628            .rpc_url(network_config.l1_url().clone())
1629            .mock_light_client(true)
1630            .genesis_lc_state(genesis_state)
1631            .genesis_st_state(genesis_stake)
1632            .blocks_per_epoch(blocks_per_epoch)
1633            .epoch_start_block(1)
1634            .exit_escrow_period(U256::from(max(
1635                blocks_per_epoch * 15 + 100,
1636                DEFAULT_EXIT_ESCROW_PERIOD_SECONDS,
1637            )))
1638            .multisig_pauser(network_config.signer().address())
1639            .token_name("Espresso".to_string())
1640            .token_symbol("ESP".to_string())
1641            .initial_token_supply(U256::from(3590000000u64))
1642            .ops_timelock_delay(U256::from(0))
1643            .ops_timelock_admin(network_config.signer().address())
1644            .ops_timelock_proposers(vec![network_config.signer().address()])
1645            .ops_timelock_executors(vec![network_config.signer().address()])
1646            .safe_exit_timelock_delay(U256::from(10))
1647            .safe_exit_timelock_admin(network_config.signer().address())
1648            .safe_exit_timelock_proposers(vec![network_config.signer().address()])
1649            .safe_exit_timelock_executors(vec![network_config.signer().address()])
1650            .build()
1651            .unwrap();
1652
1653        match stake_table_version {
1654            StakeTableContractVersion::V1 => args.deploy_to_stake_table_v1(&mut contracts).await,
1655            StakeTableContractVersion::V2 => args.deploy_to_stake_table_v2(&mut contracts).await,
1656            StakeTableContractVersion::V3 => args.deploy_to_stake_table_v3(&mut contracts).await,
1657        }
1658        .expect("contracts deployed");
1659
1660        let st_addr = contracts
1661            .address(Contract::StakeTableProxy)
1662            .expect("StakeTableProxy deployed");
1663        let l1_url = network_config.l1_url().clone();
1664
1665        let mut planned_txns = StakingTransactions::create(
1666            l1_url.clone(),
1667            &deployer,
1668            st_addr,
1669            validators,
1670            None,
1671            DelegationConfig::MultipleDelegators,
1672        )
1673        .await
1674        .expect("stake table setup failed");
1675
1676        planned_txns
1677            .apply_prerequisites()
1678            .await
1679            .expect("prerequisites failed");
1680
1681        // Ensure we have at least one stake table affecting transaction
1682        planned_txns.apply_one().await.expect("send tx failed");
1683
1684        // new block every 1s
1685        anvil_provider
1686            .anvil_set_interval_mining(1)
1687            .await
1688            .expect("interval mining");
1689
1690        // spawn a separate task
1691        // this is going to keep registering validators and multiple delegators
1692        // the interval mining is set to 1s so each transaction finalization would take atleast 1s
1693        spawn({
1694            async move {
1695                {
1696                    while let Some(receipt) =
1697                        planned_txns.apply_one().await.expect("send tx failed")
1698                    {
1699                        tracing::debug!(?receipt, "transaction finalized");
1700                    }
1701                }
1702            }
1703        });
1704
1705        let storage = P::tmp_storage().await;
1706        let persistence = P::options(&storage).create().await.unwrap();
1707
1708        let l1_client = L1ClientOptions {
1709            stake_table_update_interval: Duration::from_secs(7),
1710            l1_retry_delay: Duration::from_millis(10),
1711            l1_events_max_block_range: 10000,
1712            ..Default::default()
1713        }
1714        .connect(vec![l1_url])
1715        .unwrap();
1716        l1_client.spawn_tasks().await;
1717
1718        let fetcher = Fetcher::new(
1719            Arc::new(NullStateCatchup::default()),
1720            Arc::new(Mutex::new(persistence.clone())),
1721            l1_client.clone(),
1722            ChainConfig {
1723                stake_table_contract: Some(st_addr),
1724                base_fee: 0.into(),
1725                ..Default::default()
1726            },
1727        );
1728
1729        // sleep so that we have enough events
1730        sleep(Duration::from_secs(20)).await;
1731
1732        fetcher.spawn_update_loop().await;
1733        let mut prev_l1_block = 0;
1734        let mut prev_events_len = 0;
1735        for _i in 0..10 {
1736            // Wait for more than update interval to assert that persistence was updated
1737            // L1 update interval is 7s in this test
1738
1739            tokio::time::sleep(std::time::Duration::from_secs(8)).await;
1740
1741            let block = anvil_provider
1742                .get_block_number()
1743                .await
1744                .expect("latest l1 block");
1745
1746            let (read_offset, persisted_events) = persistence.load_events(0, block).await?;
1747            let read_offset = read_offset.unwrap();
1748            let l1_block = match read_offset {
1749                EventsPersistenceRead::Complete => block,
1750                EventsPersistenceRead::UntilL1Block(block) => block,
1751            };
1752
1753            tracing::info!("{l1_block:?}, persistence events = {persisted_events:?}.");
1754            assert!(persisted_events.len() > prev_events_len);
1755
1756            assert!(l1_block > prev_l1_block, "events not updated");
1757
1758            let contract_events =
1759                Fetcher::fetch_events_from_contract(l1_client.clone(), st_addr, None, l1_block)
1760                    .await?;
1761            assert_eq!(persisted_events, contract_events);
1762
1763            prev_l1_block = l1_block;
1764            prev_events_len = persisted_events.len();
1765        }
1766
1767        Ok(())
1768    }
1769
1770    #[rstest_reuse::apply(persistence_types)]
1771    pub async fn test_membership_persistence<P: TestablePersistence>(
1772        _p: PhantomData<P>,
1773    ) -> anyhow::Result<()> {
1774        let tmp = P::tmp_storage().await;
1775        let mut opt = P::options(&tmp);
1776
1777        let storage = opt.create().await.unwrap();
1778
1779        let validator = AuthenticatedValidator::mock();
1780        let mut st = IndexMap::new();
1781        st.insert(validator.account, validator);
1782
1783        storage
1784            .store_stake(EpochNumber::new(10), st.clone(), None, None)
1785            .await?;
1786
1787        let (table, ..) = storage.load_stake(EpochNumber::new(10)).await?.unwrap();
1788        assert_eq!(st, table);
1789
1790        let val2 = AuthenticatedValidator::mock();
1791        let mut st2 = IndexMap::new();
1792        st2.insert(val2.account, val2);
1793        storage
1794            .store_stake(EpochNumber::new(11), st2.clone(), None, None)
1795            .await?;
1796
1797        let tables = storage.load_latest_stake(4).await?.unwrap();
1798        let mut iter = tables.iter();
1799        assert_eq!(
1800            Some(&(EpochNumber::new(11), (st2.clone(), None), None)),
1801            iter.next()
1802        );
1803        assert_eq!(Some(&(EpochNumber::new(10), (st, None), None)), iter.next());
1804        assert_eq!(None, iter.next());
1805
1806        for i in 0..=20 {
1807            storage
1808                .store_stake(EpochNumber::new(i), st2.clone(), None, None)
1809                .await?;
1810        }
1811
1812        let tables = storage.load_latest_stake(5).await?.unwrap();
1813        let mut iter = tables.iter();
1814        assert_eq!(
1815            Some(&(EpochNumber::new(20), (st2.clone(), None), None)),
1816            iter.next()
1817        );
1818        assert_eq!(
1819            Some(&(EpochNumber::new(19), (st2.clone(), None), None)),
1820            iter.next()
1821        );
1822        assert_eq!(
1823            Some(&(EpochNumber::new(18), (st2.clone(), None), None)),
1824            iter.next()
1825        );
1826        assert_eq!(
1827            Some(&(EpochNumber::new(17), (st2.clone(), None), None)),
1828            iter.next()
1829        );
1830        assert_eq!(
1831            Some(&(EpochNumber::new(16), (st2, None), None)),
1832            iter.next()
1833        );
1834        assert_eq!(None, iter.next());
1835
1836        Ok(())
1837    }
1838
1839    #[rstest_reuse::apply(persistence_types)]
1840    pub async fn test_delete_stake_tables<P: TestablePersistence>(
1841        _p: PhantomData<P>,
1842    ) -> anyhow::Result<()> {
1843        let tmp = P::tmp_storage().await;
1844        let mut opt = P::options(&tmp);
1845        let storage = opt.create().await.unwrap();
1846
1847        let event1 = StakeTableEvent::Delegate(Delegated {
1848            delegator: Address::ZERO,
1849            validator: Address::ZERO,
1850            amount: U256::from(100),
1851        });
1852        let event2 = StakeTableEvent::Delegate(Delegated {
1853            delegator: Address::ZERO,
1854            validator: Address::ZERO,
1855            amount: U256::from(200),
1856        });
1857
1858        let l1_block = 42u64;
1859        let events: Vec<(EventKey, StakeTableEvent)> =
1860            vec![((l1_block, 0), event1), ((l1_block, 1), event2)];
1861
1862        storage.store_events(l1_block, events.clone()).await?;
1863
1864        let (read_offset, loaded_events) = storage.load_events(0_u64, l1_block).await?;
1865        assert!(read_offset.is_some());
1866        assert_eq!(loaded_events.len(), 2);
1867        assert_eq!(loaded_events, events);
1868
1869        // Store some validators
1870        let v = RegisteredValidator::mock();
1871        let mut vmap = IndexMap::new();
1872        vmap.insert(v.account, v);
1873        storage
1874            .store_all_validators(EpochNumber::new(1), vmap.clone())
1875            .await?;
1876
1877        let loaded = storage
1878            .load_all_validators(EpochNumber::new(1), 0, 10)
1879            .await?;
1880        assert_eq!(loaded.len(), 1);
1881
1882        storage.delete_stake_tables().await?;
1883
1884        // Events cleared
1885        let (read_offset, loaded_events) = storage.load_events(0_u64, l1_block).await?;
1886        assert!(read_offset.is_none());
1887        assert!(loaded_events.is_empty());
1888
1889        // Validators cleared
1890        let loaded = storage
1891            .load_all_validators(EpochNumber::new(1), 0, 10)
1892            .await
1893            .unwrap_or_default();
1894        assert!(loaded.is_empty());
1895
1896        let event3 = StakeTableEvent::Delegate(Delegated {
1897            delegator: Address::ZERO,
1898            validator: Address::ZERO,
1899            amount: U256::from(300),
1900        });
1901        let new_events: Vec<(EventKey, StakeTableEvent)> = vec![((l1_block, 0), event3)];
1902        storage.store_events(l1_block, new_events.clone()).await?;
1903
1904        let (read_offset, loaded_events) = storage.load_events(0_u64, l1_block).await?;
1905        assert!(read_offset.is_some());
1906        assert_eq!(loaded_events.len(), 1);
1907        assert_eq!(loaded_events, new_events);
1908
1909        Ok(())
1910    }
1911
1912    #[rstest_reuse::apply(persistence_types)]
1913    pub async fn test_store_and_load_all_validators<P: TestablePersistence>(
1914        _p: PhantomData<P>,
1915    ) -> anyhow::Result<()> {
1916        let tmp = P::tmp_storage().await;
1917        let mut opt = P::options(&tmp);
1918        let storage = opt.create().await.unwrap();
1919
1920        let mut vmap1 = IndexMap::new();
1921        for _i in 0..25 {
1922            let v = RegisteredValidator::mock();
1923            vmap1.insert(v.account, v);
1924        }
1925        storage
1926            .store_all_validators(EpochNumber::new(10), vmap1.clone())
1927            .await?;
1928
1929        let mut expected_all: Vec<_> = vmap1.clone().into_values().collect();
1930        expected_all.sort_by_key(|v| v.account);
1931
1932        // Load all
1933        let loaded_all = storage
1934            .load_all_validators(EpochNumber::new(10), 0, 100)
1935            .await?;
1936        // SQLite returns a different ordered list even though there is an `ORDER BY address ASC` clause
1937        assert_eq!(expected_all, loaded_all);
1938
1939        // Load first 10
1940        let loaded_first_10 = storage
1941            .load_all_validators(EpochNumber::new(10), 0, 10)
1942            .await?;
1943
1944        assert_eq!(expected_all[..10], loaded_first_10);
1945
1946        // Load next 10
1947        let loaded_next_10 = storage
1948            .load_all_validators(EpochNumber::new(10), 10, 10)
1949            .await?;
1950
1951        assert_eq!(expected_all[10..20], loaded_next_10);
1952
1953        // Load remaining 5
1954        let loaded_last_5 = storage
1955            .load_all_validators(EpochNumber::new(10), 20, 10)
1956            .await?;
1957
1958        assert_eq!(expected_all[20..], loaded_last_5);
1959
1960        // offset beyond size should return empty
1961        let loaded_empty = storage
1962            .load_all_validators(EpochNumber::new(10), 100, 10)
1963            .await?;
1964        assert!(loaded_empty.is_empty());
1965
1966        // epoch 11
1967        let validator2 = RegisteredValidator::mock();
1968        let mut vmap2 = IndexMap::new();
1969        vmap2.insert(validator2.account, validator2.clone());
1970
1971        storage
1972            .store_all_validators(EpochNumber::new(11), vmap2.clone())
1973            .await?;
1974
1975        let mut expected_epoch11: Vec<_> = vmap2.clone().into_values().collect();
1976        expected_epoch11.sort_by_key(|v| v.account);
1977
1978        let loaded2 = storage
1979            .load_all_validators(EpochNumber::new(11), 0, 100)
1980            .await?;
1981
1982        assert_eq!(expected_epoch11, loaded2);
1983
1984        // Epoch 10 still there
1985        let loaded1_again = storage
1986            .load_all_validators(EpochNumber::new(10), 0, 100)
1987            .await?;
1988
1989        assert_eq!(expected_all, loaded1_again);
1990
1991        Ok(())
1992    }
1993
1994    #[rstest_reuse::apply(persistence_types)]
1995    pub async fn test_non_consecutive_decide<P: TestablePersistence>(_p: PhantomData<P>) {
1996        let tmp = P::tmp_storage().await;
1997        let storage = P::connect(&tmp).await;
1998
1999        let genesis_leaf: Leaf2 = Leaf2::genesis(
2000            &ValidatedState::default(),
2001            &NodeState::mock(),
2002            TEST_VERSIONS.test.base,
2003        )
2004        .await;
2005        let mut quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
2006            proposal: QuorumProposal2::<SeqTypes> {
2007                epoch: None,
2008                block_header: genesis_leaf.block_header().clone(),
2009                view_number: genesis_leaf.view_number(),
2010                justify_qc: QuorumCertificate2::genesis(
2011                    &ValidatedState::default(),
2012                    &NodeState::mock(),
2013                    TEST_VERSIONS.test,
2014                )
2015                .await,
2016                upgrade_certificate: None,
2017                view_change_evidence: None,
2018                next_drb_result: None,
2019                next_epoch_justify_qc: None,
2020                state_cert: None,
2021            },
2022        };
2023
2024        let leaf0 = Leaf2::from_quorum_proposal(&quorum_proposal);
2025
2026        quorum_proposal.proposal.view_number = ViewNumber::new(2);
2027        *quorum_proposal.proposal.block_header.height_mut() = 2;
2028        quorum_proposal.proposal.justify_qc.view_number = ViewNumber::new(1);
2029        let leaf2 = Leaf2::from_quorum_proposal(&quorum_proposal);
2030
2031        let mut qc0 = leaf0.justify_qc();
2032        qc0.data.leaf_commit = Committable::commit(&leaf0);
2033
2034        let mut qc2 = leaf2.justify_qc();
2035        qc2.view_number += 1;
2036        qc2.data.leaf_commit = Committable::commit(&leaf2);
2037
2038        let mut deciding_qc = qc2.clone();
2039        deciding_qc.view_number += 1;
2040
2041        // Decide the first leaf, but fail to generate a decide event.
2042        storage
2043            .append_decided_leaves(
2044                ViewNumber::new(0),
2045                [(
2046                    &leaf_info(leaf0.clone()),
2047                    CertificatePair::non_epoch_change(qc0),
2048                )],
2049                None,
2050                &FailConsumer,
2051            )
2052            .await
2053            .unwrap();
2054
2055        // Later, decide a new leaf, but skipping some leaf in the middle. This should generate
2056        // decide events for both the leaves, correctly separating into two events since the leaves
2057        // are non-consecutive, and correctly applying `deciding_qc` only to the last event.
2058        let consumer = EventCollector::default();
2059        storage
2060            .append_decided_leaves(
2061                ViewNumber::new(2),
2062                [(
2063                    &leaf_info(leaf2.clone()),
2064                    CertificatePair::non_epoch_change(qc2),
2065                )],
2066                Some(Arc::new(CertificatePair::non_epoch_change(
2067                    deciding_qc.clone(),
2068                ))),
2069                &consumer,
2070            )
2071            .await
2072            .unwrap();
2073
2074        let events = consumer.events.read().await;
2075        assert_eq!(events.len(), 2);
2076
2077        let EventType::Decide {
2078            leaf_chain: leaf_chain0,
2079            deciding_qc: deciding_qc0,
2080            ..
2081        } = &events[0].event
2082        else {
2083            panic!("expected decide event, got {:?}", events[0].event);
2084        };
2085        assert_eq!(leaf_chain0.len(), 1);
2086        assert_eq!(leaf_chain0[0].leaf, leaf0);
2087        assert_eq!(*deciding_qc0, None);
2088
2089        let EventType::Decide {
2090            leaf_chain: leaf_chain2,
2091            deciding_qc: deciding_qc2,
2092            ..
2093        } = &events[1].event
2094        else {
2095            panic!("expected decide event, got {:?}", events[1].event);
2096        };
2097        assert_eq!(leaf_chain2.len(), 1);
2098        assert_eq!(leaf_chain2[0].leaf, leaf2);
2099        assert_eq!(deciding_qc2.as_ref().unwrap().qc(), &deciding_qc);
2100    }
2101}