espresso_types/v0/
traits.rs

1//! This module contains all the traits used for building the sequencer types.
2//! It also includes some trait implementations that cannot be implemented in an external crate.
3use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::{Address, U256};
6use anyhow::{Context, bail, ensure};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{HotShotInitializer, InitializerEpochInfo, types::EventType};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13    data::{
14        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
15        QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
16    },
17    drb::{DrbInput, DrbResult},
18    event::{HotShotAction, LeafInfo},
19    message::{Proposal, convert_proposal},
20    simple_certificate::{
21        CertificatePair, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
22        QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
23    },
24    stake_table::HSStakeTable,
25    traits::{
26        ValidatedState as HotShotState, metrics::Metrics, node_implementation::NodeType,
27        storage::Storage,
28    },
29    utils::genesis_epoch_from_version,
30    vote::HasViewNumber,
31};
32use indexmap::IndexMap;
33use serde::{Serialize, de::DeserializeOwned};
34use versions::Upgrade;
35
36use super::{
37    impls::NodeState,
38    utils::BackoffParams,
39    v0_3::{EventKey, IndexedStake, StakeTableEvent},
40};
41use crate::{
42    AuthenticatedValidatorMap, BlockMerkleTree, Event, FeeAccount, FeeAccountProof,
43    FeeMerkleCommitment, Leaf2, NetworkConfig, PubKey, SeqTypes,
44    v0::impls::{StakeTableHash, ValidatedState},
45    v0_3::{
46        ChainConfig, RegisteredValidator, RewardAccountProofV1, RewardAccountV1, RewardAmount,
47        RewardMerkleCommitmentV1,
48    },
49    v0_4::{PermittedRewardMerkleTreeV2, RewardAccountV2, RewardMerkleCommitmentV2},
50};
51
52#[async_trait]
53pub trait StateCatchup: Send + Sync {
54    /// Fetch the leaf at the given height without retrying on transient errors.
55    async fn try_fetch_leaf(
56        &self,
57        retry: usize,
58        height: u64,
59        stake_table: HSStakeTable<SeqTypes>,
60        success_threshold: U256,
61    ) -> anyhow::Result<Leaf2>;
62
63    /// Fetch the leaf at the given height, retrying on transient errors.
64    async fn fetch_leaf(
65        &self,
66        height: u64,
67        stake_table: HSStakeTable<SeqTypes>,
68        success_threshold: U256,
69    ) -> anyhow::Result<Leaf2> {
70        self.backoff()
71            .retry(self, |provider, retry| {
72                let stake_table_clone = stake_table.clone();
73                async move {
74                    provider
75                        .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
76                        .await
77                }
78                .boxed()
79            })
80            .await
81    }
82
83    /// Fetch the given list of accounts without retrying on transient errors.
84    async fn try_fetch_accounts(
85        &self,
86        retry: usize,
87        instance: &NodeState,
88        height: u64,
89        view: ViewNumber,
90        fee_merkle_tree_root: FeeMerkleCommitment,
91        accounts: &[FeeAccount],
92    ) -> anyhow::Result<Vec<FeeAccountProof>>;
93
94    /// Fetch the given list of accounts, retrying on transient errors.
95    async fn fetch_accounts(
96        &self,
97        instance: &NodeState,
98        height: u64,
99        view: ViewNumber,
100        fee_merkle_tree_root: FeeMerkleCommitment,
101        accounts: Vec<FeeAccount>,
102    ) -> anyhow::Result<Vec<FeeAccountProof>> {
103        self.backoff()
104            .retry(self, |provider, retry| {
105                let accounts = &accounts;
106                async move {
107                    provider
108                        .try_fetch_accounts(
109                            retry,
110                            instance,
111                            height,
112                            view,
113                            fee_merkle_tree_root,
114                            accounts,
115                        )
116                        .await
117                        .map_err(|err| {
118                            err.context(format!(
119                                "fetching accounts {accounts:?}, height {height}, view {view}"
120                            ))
121                        })
122                }
123                .boxed()
124            })
125            .await
126    }
127
128    /// Fetch and remember the blocks frontier without retrying on transient errors.
129    async fn try_remember_blocks_merkle_tree(
130        &self,
131        retry: usize,
132        instance: &NodeState,
133        height: u64,
134        view: ViewNumber,
135        mt: &mut BlockMerkleTree,
136    ) -> anyhow::Result<()>;
137
138    /// Fetch and remember the blocks frontier, retrying on transient errors.
139    async fn remember_blocks_merkle_tree(
140        &self,
141        instance: &NodeState,
142        height: u64,
143        view: ViewNumber,
144        mt: &mut BlockMerkleTree,
145    ) -> anyhow::Result<()> {
146        self.backoff()
147            .retry(mt, |mt, retry| {
148                self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
149                    .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
150                    .boxed()
151            })
152            .await
153    }
154
155    /// Fetch the chain config without retrying on transient errors.
156    async fn try_fetch_chain_config(
157        &self,
158        retry: usize,
159        commitment: Commitment<ChainConfig>,
160    ) -> anyhow::Result<ChainConfig>;
161
162    /// Fetch the chain config, retrying on transient errors.
163    async fn fetch_chain_config(
164        &self,
165        commitment: Commitment<ChainConfig>,
166    ) -> anyhow::Result<ChainConfig> {
167        self.backoff()
168            .retry(self, |provider, retry| {
169                provider
170                    .try_fetch_chain_config(retry, commitment)
171                    .map_err(|err| err.context("fetching chain config"))
172                    .boxed()
173            })
174            .await
175    }
176
177    /// Fetch the given reward merkle tree without retrying on transient errors.
178    async fn try_fetch_reward_merkle_tree_v2(
179        &self,
180        retry: usize,
181        height: u64,
182        view: ViewNumber,
183        reward_merkle_tree_root: RewardMerkleCommitmentV2,
184        accounts: Arc<Vec<RewardAccountV2>>,
185    ) -> anyhow::Result<PermittedRewardMerkleTreeV2>;
186
187    async fn fetch_reward_merkle_tree_v2(
188        &self,
189        height: u64,
190        view: ViewNumber,
191        reward_merkle_tree_root: RewardMerkleCommitmentV2,
192        accounts: Arc<Vec<RewardAccountV2>>,
193    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
194        self.backoff()
195            .retry(self, |provider, retry| {
196                let accounts = accounts.clone();
197                async move {
198                    provider
199                        .try_fetch_reward_merkle_tree_v2(
200                            retry,
201                            height,
202                            view,
203                            reward_merkle_tree_root,
204                            accounts,
205                        )
206                        .await
207                        .map_err(|err| {
208                            err.context(format!("fetching reward merkle tree for height {height}"))
209                        })
210                }
211                .boxed()
212            })
213            .await
214    }
215
216    /// Fetch the given list of reward accounts without retrying on transient errors.
217    async fn try_fetch_reward_accounts_v1(
218        &self,
219        retry: usize,
220        instance: &NodeState,
221        height: u64,
222        view: ViewNumber,
223        reward_merkle_tree_root: RewardMerkleCommitmentV1,
224        accounts: &[RewardAccountV1],
225    ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
226
227    /// Fetch the given list of reward accounts, retrying on transient errors.
228    async fn fetch_reward_accounts_v1(
229        &self,
230        instance: &NodeState,
231        height: u64,
232        view: ViewNumber,
233        reward_merkle_tree_root: RewardMerkleCommitmentV1,
234        accounts: Vec<RewardAccountV1>,
235    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
236        self.backoff()
237            .retry(self, |provider, retry| {
238                let accounts = &accounts;
239                async move {
240                    provider
241                        .try_fetch_reward_accounts_v1(
242                            retry,
243                            instance,
244                            height,
245                            view,
246                            reward_merkle_tree_root,
247                            accounts,
248                        )
249                        .await
250                        .map_err(|err| {
251                            err.context(format!(
252                                "fetching v1 reward accounts {accounts:?}, height {height}, view \
253                                 {view}"
254                            ))
255                        })
256                }
257                .boxed()
258            })
259            .await
260    }
261
262    /// Fetch the state certificate for a given epoch without retrying on transient errors.
263    async fn try_fetch_state_cert(
264        &self,
265        retry: usize,
266        epoch: u64,
267    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>>;
268
269    /// Fetch the state certificate for a given epoch, retrying on transient errors.
270    async fn fetch_state_cert(
271        &self,
272        epoch: u64,
273    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
274        self.backoff()
275            .retry(self, |provider, retry| {
276                provider
277                    .try_fetch_state_cert(retry, epoch)
278                    .map_err(|err| err.context(format!("fetching state cert for epoch {epoch}")))
279                    .boxed()
280            })
281            .await
282    }
283
284    /// Returns true if the catchup provider is local (e.g. does not make calls to remote resources).
285    fn is_local(&self) -> bool;
286
287    /// Returns the backoff parameters for the catchup provider.
288    fn backoff(&self) -> &BackoffParams;
289
290    /// Returns the name of the catchup provider.
291    fn name(&self) -> String;
292}
293
294#[async_trait]
295impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
296    async fn try_fetch_leaf(
297        &self,
298        retry: usize,
299        height: u64,
300        stake_table: HSStakeTable<SeqTypes>,
301        success_threshold: U256,
302    ) -> anyhow::Result<Leaf2> {
303        (**self)
304            .try_fetch_leaf(retry, height, stake_table, success_threshold)
305            .await
306    }
307
308    async fn fetch_leaf(
309        &self,
310        height: u64,
311        stake_table: HSStakeTable<SeqTypes>,
312        success_threshold: U256,
313    ) -> anyhow::Result<Leaf2> {
314        (**self)
315            .fetch_leaf(height, stake_table, success_threshold)
316            .await
317    }
318
319    async fn try_fetch_accounts(
320        &self,
321        retry: usize,
322        instance: &NodeState,
323        height: u64,
324        view: ViewNumber,
325        fee_merkle_tree_root: FeeMerkleCommitment,
326        accounts: &[FeeAccount],
327    ) -> anyhow::Result<Vec<FeeAccountProof>> {
328        (**self)
329            .try_fetch_accounts(
330                retry,
331                instance,
332                height,
333                view,
334                fee_merkle_tree_root,
335                accounts,
336            )
337            .await
338    }
339
340    async fn fetch_accounts(
341        &self,
342        instance: &NodeState,
343        height: u64,
344        view: ViewNumber,
345        fee_merkle_tree_root: FeeMerkleCommitment,
346        accounts: Vec<FeeAccount>,
347    ) -> anyhow::Result<Vec<FeeAccountProof>> {
348        (**self)
349            .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
350            .await
351    }
352
353    async fn try_remember_blocks_merkle_tree(
354        &self,
355        retry: usize,
356        instance: &NodeState,
357        height: u64,
358        view: ViewNumber,
359        mt: &mut BlockMerkleTree,
360    ) -> anyhow::Result<()> {
361        (**self)
362            .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
363            .await
364    }
365
366    async fn remember_blocks_merkle_tree(
367        &self,
368        instance: &NodeState,
369        height: u64,
370        view: ViewNumber,
371        mt: &mut BlockMerkleTree,
372    ) -> anyhow::Result<()> {
373        (**self)
374            .remember_blocks_merkle_tree(instance, height, view, mt)
375            .await
376    }
377
378    async fn try_fetch_chain_config(
379        &self,
380        retry: usize,
381        commitment: Commitment<ChainConfig>,
382    ) -> anyhow::Result<ChainConfig> {
383        (**self).try_fetch_chain_config(retry, commitment).await
384    }
385
386    async fn fetch_chain_config(
387        &self,
388        commitment: Commitment<ChainConfig>,
389    ) -> anyhow::Result<ChainConfig> {
390        (**self).fetch_chain_config(commitment).await
391    }
392
393    async fn try_fetch_reward_merkle_tree_v2(
394        &self,
395        retry: usize,
396        height: u64,
397        view: ViewNumber,
398        reward_merkle_tree_root: RewardMerkleCommitmentV2,
399        accounts: Arc<Vec<RewardAccountV2>>,
400    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
401        (**self)
402            .try_fetch_reward_merkle_tree_v2(retry, height, view, reward_merkle_tree_root, accounts)
403            .await
404    }
405
406    async fn fetch_reward_merkle_tree_v2(
407        &self,
408        height: u64,
409        view: ViewNumber,
410        reward_merkle_tree_root: RewardMerkleCommitmentV2,
411        accounts: Arc<Vec<RewardAccountV2>>,
412    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
413        (**self)
414            .fetch_reward_merkle_tree_v2(height, view, reward_merkle_tree_root, accounts)
415            .await
416    }
417
418    async fn try_fetch_reward_accounts_v1(
419        &self,
420        retry: usize,
421        instance: &NodeState,
422        height: u64,
423        view: ViewNumber,
424        reward_merkle_tree_root: RewardMerkleCommitmentV1,
425        accounts: &[RewardAccountV1],
426    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
427        (**self)
428            .try_fetch_reward_accounts_v1(
429                retry,
430                instance,
431                height,
432                view,
433                reward_merkle_tree_root,
434                accounts,
435            )
436            .await
437    }
438
439    async fn fetch_reward_accounts_v1(
440        &self,
441        instance: &NodeState,
442        height: u64,
443        view: ViewNumber,
444        reward_merkle_tree_root: RewardMerkleCommitmentV1,
445        accounts: Vec<RewardAccountV1>,
446    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
447        (**self)
448            .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
449            .await
450    }
451
452    async fn try_fetch_state_cert(
453        &self,
454        retry: usize,
455        epoch: u64,
456    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
457        (**self).try_fetch_state_cert(retry, epoch).await
458    }
459
460    async fn fetch_state_cert(
461        &self,
462        epoch: u64,
463    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
464        (**self).fetch_state_cert(epoch).await
465    }
466
467    fn backoff(&self) -> &BackoffParams {
468        (**self).backoff()
469    }
470
471    fn name(&self) -> String {
472        (**self).name()
473    }
474
475    fn is_local(&self) -> bool {
476        (**self).is_local()
477    }
478}
479
480#[async_trait]
481pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
482    type Persistence: SequencerPersistence + MembershipPersistence;
483
484    fn set_view_retention(&mut self, view_retention: u64);
485    async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
486    async fn reset(self) -> anyhow::Result<()>;
487}
488
489/// Determine the read state based on the queried block range.
490// - If the persistence returned events up to the requested block, the read is complete.
491/// - Otherwise, indicate that the read is up to the last processed block.
492#[derive(Clone, Copy, Debug, PartialEq, Eq)]
493pub enum EventsPersistenceRead {
494    Complete,
495    UntilL1Block(u64),
496}
497
498/// Tuple type for stake table data: (validators, block_reward, stake_table_hash)
499pub type StakeTuple = (
500    AuthenticatedValidatorMap,
501    Option<RewardAmount>,
502    Option<StakeTableHash>,
503);
504
505#[async_trait]
506/// Trait used by `Memberships` implementations to interact with persistence layer.
507pub trait MembershipPersistence: Send + Sync + 'static {
508    /// Load stake table for epoch from storage
509    async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<StakeTuple>>;
510
511    /// Load stake tables for storage for latest `n` known epochs
512    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
513
514    /// Store stake table at `epoch` in the persistence layer
515    async fn store_stake(
516        &self,
517        epoch: EpochNumber,
518        stake: AuthenticatedValidatorMap,
519        block_reward: Option<RewardAmount>,
520        stake_table_hash: Option<StakeTableHash>,
521    ) -> anyhow::Result<()>;
522
523    async fn store_events(
524        &self,
525        l1_finalized: u64,
526        events: Vec<(EventKey, StakeTableEvent)>,
527    ) -> anyhow::Result<()>;
528    async fn load_events(
529        &self,
530        from_l1_block: u64,
531        l1_finalized: u64,
532    ) -> anyhow::Result<(
533        Option<EventsPersistenceRead>,
534        Vec<(EventKey, StakeTableEvent)>,
535    )>;
536
537    /// Delete all stake table events, the L1 block tracker, and the epoch DRB and root data.
538    async fn delete_stake_tables(&self) -> anyhow::Result<()>;
539
540    async fn store_all_validators(
541        &self,
542        epoch: EpochNumber,
543        all_validators: IndexMap<Address, RegisteredValidator<PubKey>>,
544    ) -> anyhow::Result<()>;
545
546    async fn load_all_validators(
547        &self,
548        epoch: EpochNumber,
549        offset: u64,
550        limit: u64,
551    ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>>;
552}
553
554#[async_trait]
555pub trait SequencerPersistence:
556    Sized + Send + Sync + Clone + 'static + DhtPersistentStorage + MembershipPersistence
557{
558    async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()>;
559
560    /// Use this storage as a state catchup backend, if supported.
561    fn into_catchup_provider(
562        self,
563        _backoff: BackoffParams,
564    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
565        bail!("state catchup is not implemented for this persistence type");
566    }
567
568    /// Load the orchestrator config from storage.
569    ///
570    /// Returns `None` if no config exists (we are joining a network for the first time). Fails with
571    /// `Err` if it could not be determined whether a config exists or not.
572    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
573
574    /// Save the orchestrator config to storage.
575    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
576
577    /// Load the highest view saved with [`save_voted_view`](Self::save_voted_view).
578    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
579
580    /// Load the view to restart from.
581    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
582
583    /// Load the proposals saved by consensus
584    async fn load_quorum_proposals(
585        &self,
586    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
587
588    async fn load_quorum_proposal(
589        &self,
590        view: ViewNumber,
591    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
592
593    async fn load_vid_share(
594        &self,
595        view: ViewNumber,
596    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
597    async fn load_da_proposal(
598        &self,
599        view: ViewNumber,
600    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
601    async fn load_upgrade_certificate(
602        &self,
603    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
604    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
605    async fn load_state_cert(
606        &self,
607    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
608
609    /// Get a state certificate for an epoch.
610    async fn get_state_cert_by_epoch(
611        &self,
612        epoch: u64,
613    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
614
615    /// Insert a state certificate for a given epoch.
616    async fn insert_state_cert(
617        &self,
618        epoch: u64,
619        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
620    ) -> anyhow::Result<()>;
621
622    /// Load the latest known consensus state.
623    ///
624    /// Returns an initializer to resume HotShot from the latest saved state (or start from genesis,
625    /// if there is no saved state). Also returns the anchor view number, which can be used as a
626    /// reference point to process any events which were not processed before a previous shutdown,
627    /// if applicable,.
628    async fn load_consensus_state(
629        &self,
630        state: NodeState,
631        upgrade: Upgrade,
632    ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
633        let genesis_validated_state = ValidatedState::genesis(&state).0;
634        let highest_voted_view = match self
635            .load_latest_acted_view()
636            .await
637            .context("loading last voted view")?
638        {
639            Some(view) => {
640                tracing::info!(?view, "starting with last actioned view");
641                view
642            },
643            None => {
644                tracing::info!("no saved view, starting from genesis");
645                ViewNumber::genesis()
646            },
647        };
648
649        let restart_view = match self
650            .load_restart_view()
651            .await
652            .context("loading restart view")?
653        {
654            Some(view) => {
655                tracing::info!(?view, "starting from saved view");
656                view
657            },
658            None => {
659                tracing::info!("no saved view, starting from genesis");
660                ViewNumber::genesis()
661            },
662        };
663        let next_epoch_high_qc = self
664            .load_next_epoch_quorum_certificate()
665            .await
666            .context("loading next epoch qc")?;
667        let (leaf, mut high_qc, anchor_view) = match self
668            .load_anchor_leaf()
669            .await
670            .context("loading anchor leaf")?
671        {
672            Some((leaf, high_qc)) => {
673                tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
674                ensure!(
675                    leaf.view_number() == high_qc.view_number,
676                    format!(
677                        "loaded anchor leaf from view {}, but high QC is from view {}",
678                        leaf.view_number(),
679                        high_qc.view_number
680                    )
681                );
682
683                let anchor_view = leaf.view_number();
684                (leaf, high_qc, Some(anchor_view))
685            },
686            None => {
687                tracing::info!("no saved leaf, starting from genesis leaf");
688                (
689                    hotshot_types::data::Leaf2::genesis(
690                        &genesis_validated_state,
691                        &state,
692                        upgrade.base,
693                    )
694                    .await,
695                    QuorumCertificate2::genesis(&genesis_validated_state, &state, upgrade).await,
696                    None,
697                )
698            },
699        };
700
701        if let Some((extended_high_qc, _)) = self.load_eqc().await
702            && extended_high_qc.view_number() > high_qc.view_number()
703        {
704            high_qc = extended_high_qc
705        }
706
707        let validated_state = if leaf.block_header().height() == 0 {
708            // If we are starting from genesis, we can provide the full state.
709            genesis_validated_state
710        } else {
711            // Otherwise, we will have to construct a sparse state and fetch missing data during
712            // catchup.
713            ValidatedState::from_header(leaf.block_header())
714        };
715
716        // If we are not starting from genesis, we start from the view following the maximum view
717        // between `highest_voted_view` and `leaf.view_number`. This prevents double votes from
718        // starting in a view in which we had already voted before the restart, and prevents
719        // unnecessary catchup from starting in a view earlier than the anchor leaf.
720        let restart_view = max(restart_view, leaf.view_number());
721        // TODO:
722        let epoch = genesis_epoch_from_version(upgrade.base);
723
724        let config = self.load_config().await.context("loading config")?;
725        let epoch_height = config
726            .as_ref()
727            .map(|c| c.config.epoch_height)
728            .unwrap_or_default();
729        let epoch_start_block = config
730            .as_ref()
731            .map(|c| c.config.epoch_start_block)
732            .unwrap_or_default();
733
734        let saved_proposals = self
735            .load_quorum_proposals()
736            .await
737            .context("loading saved proposals")?;
738
739        let upgrade_certificate = self
740            .load_upgrade_certificate()
741            .await
742            .context("loading upgrade certificate")?;
743
744        let start_epoch_info = self
745            .load_start_epoch_info()
746            .await
747            .context("loading start epoch info")?;
748
749        let state_cert = self
750            .load_state_cert()
751            .await
752            .context("loading light client state update certificate")?;
753
754        tracing::warn!(
755            ?leaf,
756            ?restart_view,
757            ?epoch,
758            ?high_qc,
759            ?validated_state,
760            ?state_cert,
761            "loaded consensus state"
762        );
763
764        Ok((
765            HotShotInitializer {
766                instance_state: state,
767                epoch_height,
768                epoch_start_block,
769                anchor_leaf: leaf,
770                anchor_state: Arc::new(validated_state),
771                anchor_state_delta: None,
772                start_view: restart_view,
773                start_epoch: epoch,
774                last_actioned_view: highest_voted_view,
775                saved_proposals,
776                high_qc,
777                next_epoch_high_qc,
778                decided_upgrade_certificate: upgrade_certificate,
779                undecided_leaves: Default::default(),
780                undecided_state: Default::default(),
781                saved_vid_shares: Default::default(), // TODO: implement saved_vid_shares
782                start_epoch_info,
783                state_cert,
784            },
785            anchor_view,
786        ))
787    }
788
789    /// Update storage based on an event from consensus.
790    async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
791        if let EventType::Decide {
792            leaf_chain,
793            committing_qc,
794            deciding_qc,
795            ..
796        } = &event.event
797        {
798            let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
799                // No new leaves.
800                return;
801            };
802
803            // Associate each decided leaf with a QC.
804            let chain = leaf_chain.iter().zip(
805                // The first (most recent) leaf corresponds to the QC triggering the decide event.
806                std::iter::once((**committing_qc).clone())
807                    // Moving backwards in the chain, each leaf corresponds with the subsequent
808                    // leaf's justify QC.
809                    .chain(leaf_chain.iter().map(|leaf| CertificatePair::for_parent(&leaf.leaf))),
810            );
811
812            if let Err(err) = self
813                .append_decided_leaves(leaf.view_number(), chain, deciding_qc.clone(), consumer)
814                .await
815            {
816                tracing::error!(
817                    "failed to save decided leaves, chain may not be up to date: {err:#}"
818                );
819                return;
820            }
821        }
822    }
823
824    /// Append decided leaves to persistent storage and emit a corresponding event.
825    ///
826    /// `consumer` will be sent a `Decide` event containing all decided leaves in persistent storage
827    /// up to and including `view`. If available in persistent storage, full block payloads and VID
828    /// info will also be included for each leaf.
829    ///
830    /// Once the new decided leaves have been processed, old data up to `view` will be garbage
831    /// collected The consumer's handling of this event is a prerequisite for the completion of
832    /// garbage collection: if the consumer fails to process the event, no data is deleted. This
833    /// ensures that, if called repeatedly, all decided leaves ever recorded in consensus storage
834    /// will eventually be passed to the consumer.
835    ///
836    /// Note that the converse is not true: if garbage collection fails, it is not guaranteed that
837    /// the consumer hasn't processed the decide event. Thus, in rare cases, some events may be
838    /// processed twice, or the consumer may get two events which share a subset of their data.
839    /// Thus, it is the consumer's responsibility to make sure its handling of each leaf is
840    /// idempotent.
841    ///
842    /// If the consumer fails to handle the new decide event, it may be retried, or simply postponed
843    /// until the next decide, at which point all persisted leaves from the failed GC run will be
844    /// included in the event along with subsequently decided leaves.
845    ///
846    /// This functionality is useful for keeping a separate view of the blockchain in sync with the
847    /// consensus storage. For example, the `consumer` could be used for moving data from consensus
848    /// storage to long-term archival storage.
849    async fn append_decided_leaves(
850        &self,
851        decided_view: ViewNumber,
852        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
853        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
854        consumer: &(impl EventConsumer + 'static),
855    ) -> anyhow::Result<()>;
856
857    async fn load_anchor_leaf(
858        &self,
859    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
860    async fn append_vid(
861        &self,
862        proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
863    ) -> anyhow::Result<()>;
864    async fn append_da(
865        &self,
866        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
867        vid_commit: VidCommitment,
868    ) -> anyhow::Result<()>;
869    async fn record_action(
870        &self,
871        view: ViewNumber,
872        epoch: Option<EpochNumber>,
873        action: HotShotAction,
874    ) -> anyhow::Result<()>;
875
876    async fn append_quorum_proposal2(
877        &self,
878        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
879    ) -> anyhow::Result<()>;
880
881    /// Update the current eQC in storage.
882    async fn store_eqc(
883        &self,
884        _high_qc: QuorumCertificate2<SeqTypes>,
885        _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
886    ) -> anyhow::Result<()>;
887
888    /// Load the current eQC from storage.
889    async fn load_eqc(
890        &self,
891    ) -> Option<(
892        QuorumCertificate2<SeqTypes>,
893        NextEpochQuorumCertificate2<SeqTypes>,
894    )>;
895
896    async fn store_upgrade_certificate(
897        &self,
898        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
899    ) -> anyhow::Result<()>;
900
901    async fn migrate_storage(&self) -> anyhow::Result<()> {
902        tracing::warn!("migrating consensus data...");
903
904        self.migrate_anchor_leaf().await?;
905        self.migrate_da_proposals().await?;
906        self.migrate_vid_shares().await?;
907        self.migrate_quorum_proposals().await?;
908        self.migrate_quorum_certificates().await?;
909        self.migrate_reward_merkle_tree_v2()
910            .await
911            .context("failed to migrate reward merkle tree v2")?;
912        self.migrate_x25519_keys().await?;
913        tracing::warn!("consensus storage has been migrated to new types");
914
915        Ok(())
916    }
917
918    async fn migrate_x25519_keys(&self) -> anyhow::Result<()>;
919
920    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
921    async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
922    async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
923    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
924    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
925
926    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
927        match self.load_anchor_leaf().await? {
928            Some((leaf, _)) => Ok(leaf.view_number()),
929            None => Ok(ViewNumber::genesis()),
930        }
931    }
932
933    async fn store_next_epoch_quorum_certificate(
934        &self,
935        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
936    ) -> anyhow::Result<()>;
937
938    async fn load_next_epoch_quorum_certificate(
939        &self,
940    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
941
942    async fn append_da2(
943        &self,
944        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
945        vid_commit: VidCommitment,
946    ) -> anyhow::Result<()>;
947
948    async fn append_proposal2(
949        &self,
950        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
951    ) -> anyhow::Result<()> {
952        self.append_quorum_proposal2(proposal).await
953    }
954
955    async fn store_drb_result(
956        &self,
957        epoch: EpochNumber,
958        drb_result: DrbResult,
959    ) -> anyhow::Result<()>;
960    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
961    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
962    async fn store_epoch_root(
963        &self,
964        epoch: EpochNumber,
965        block_header: <SeqTypes as NodeType>::BlockHeader,
966    ) -> anyhow::Result<()>;
967    async fn add_state_cert(
968        &self,
969        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
970    ) -> anyhow::Result<()>;
971
972    fn enable_metrics(&mut self, metrics: &dyn Metrics);
973}
974
975#[async_trait]
976pub trait EventConsumer: Debug + Send + Sync {
977    async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
978}
979
980#[async_trait]
981impl<T> EventConsumer for Box<T>
982where
983    T: EventConsumer + ?Sized,
984{
985    async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
986        (**self).handle_event(event).await
987    }
988}
989
990#[derive(Clone, Copy, Debug)]
991pub struct NullEventConsumer;
992
993#[async_trait]
994impl EventConsumer for NullEventConsumer {
995    async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
996        Ok(())
997    }
998}
999
1000#[async_trait]
1001impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
1002    async fn append_vid(
1003        &self,
1004        proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
1005    ) -> anyhow::Result<()> {
1006        (**self).append_vid(proposal).await
1007    }
1008
1009    async fn append_da(
1010        &self,
1011        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1012        vid_commit: VidCommitment,
1013    ) -> anyhow::Result<()> {
1014        (**self).append_da(proposal, vid_commit).await
1015    }
1016
1017    async fn append_da2(
1018        &self,
1019        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1020        vid_commit: VidCommitment,
1021    ) -> anyhow::Result<()> {
1022        (**self).append_da2(proposal, vid_commit).await
1023    }
1024
1025    async fn record_action(
1026        &self,
1027        view: ViewNumber,
1028        epoch: Option<EpochNumber>,
1029        action: HotShotAction,
1030    ) -> anyhow::Result<()> {
1031        (**self).record_action(view, epoch, action).await
1032    }
1033
1034    async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
1035        Ok(())
1036    }
1037
1038    async fn append_proposal(
1039        &self,
1040        proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
1041    ) -> anyhow::Result<()> {
1042        (**self)
1043            .append_quorum_proposal2(&convert_proposal(proposal.clone()))
1044            .await
1045    }
1046
1047    async fn append_proposal2(
1048        &self,
1049        proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
1050    ) -> anyhow::Result<()> {
1051        let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1052            convert_proposal(proposal.clone());
1053        (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
1054    }
1055
1056    async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
1057        Ok(())
1058    }
1059
1060    /// Update the current eQC in storage.
1061    async fn update_eqc(
1062        &self,
1063        high_qc: QuorumCertificate2<SeqTypes>,
1064        next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1065    ) -> anyhow::Result<()> {
1066        if let Some((existing_high_qc, _)) = (**self).load_eqc().await
1067            && high_qc.view_number() < existing_high_qc.view_number()
1068        {
1069            return Ok(());
1070        }
1071
1072        (**self).store_eqc(high_qc, next_epoch_high_qc).await
1073    }
1074
1075    async fn update_next_epoch_high_qc2(
1076        &self,
1077        _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1078    ) -> anyhow::Result<()> {
1079        Ok(())
1080    }
1081
1082    async fn update_decided_upgrade_certificate(
1083        &self,
1084        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1085    ) -> anyhow::Result<()> {
1086        (**self)
1087            .store_upgrade_certificate(decided_upgrade_certificate)
1088            .await
1089    }
1090
1091    async fn store_drb_result(
1092        &self,
1093        epoch: EpochNumber,
1094        drb_result: DrbResult,
1095    ) -> anyhow::Result<()> {
1096        (**self).store_drb_result(epoch, drb_result).await
1097    }
1098
1099    async fn store_epoch_root(
1100        &self,
1101        epoch: EpochNumber,
1102        block_header: <SeqTypes as NodeType>::BlockHeader,
1103    ) -> anyhow::Result<()> {
1104        (**self).store_epoch_root(epoch, block_header).await
1105    }
1106
1107    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1108        (**self).store_drb_input(drb_input).await
1109    }
1110
1111    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1112        (**self).load_drb_input(epoch).await
1113    }
1114
1115    async fn update_state_cert(
1116        &self,
1117        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1118    ) -> anyhow::Result<()> {
1119        (**self).add_state_cert(state_cert).await
1120    }
1121}
1122
1123/// Data that can be deserialized from a subslice of namespace payload bytes.
1124///
1125/// Companion trait for [`NsPayloadBytesRange`], which specifies the subslice of
1126/// namespace payload bytes to read.
1127pub trait FromNsPayloadBytes<'a> {
1128    /// Deserialize `Self` from namespace payload bytes.
1129    fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1130}
1131
1132/// Specifies a subslice of namespace payload bytes to read.
1133///
1134/// Companion trait for [`FromNsPayloadBytes`], which holds data that can be
1135/// deserialized from that subslice of bytes.
1136pub trait NsPayloadBytesRange<'a> {
1137    type Output: FromNsPayloadBytes<'a>;
1138
1139    /// Range relative to this ns payload
1140    fn ns_payload_range(&self) -> Range<usize>;
1141}
1142
1143/// Types which can be deserialized from either integers or strings.
1144///
1145/// Some types can be represented as an integer or a string in human-readable formats like JSON or
1146/// TOML. For example, 1 GWEI might be represented by the integer `1000000000` or the string `"1
1147/// gwei"`. Such types can implement `FromStringOrInteger` and then use [`impl_string_or_integer`]
1148/// to derive this user-friendly serialization.
1149///
1150/// These types are assumed to have an efficient representation as an integral type in Rust --
1151/// [`Self::Binary`] -- and will be serialized to and from this type when using a non-human-readable
1152/// encoding. With human readable encodings, serialization is always to a string.
1153pub trait FromStringOrInteger: Sized {
1154    type Binary: Serialize + DeserializeOwned;
1155    type Integer: Serialize + DeserializeOwned;
1156
1157    fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1158    fn from_string(s: String) -> anyhow::Result<Self>;
1159    fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1160
1161    fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1162    fn to_string(&self) -> anyhow::Result<String>;
1163}