Skip to main content

espresso_types/v0/
traits.rs

1//! This module contains all the traits used for building the sequencer types.
2//! It also includes some trait implementations that cannot be implemented in an external crate.
3use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::{Address, U256};
6use anyhow::{Context, bail, ensure};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{HotShotInitializer, InitializerEpochInfo, types::EventType};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_new_protocol::{message::Certificate2, storage::NewProtocolStorage};
13use hotshot_types::{
14    data::{
15        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
16        QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
17    },
18    drb::{DrbInput, DrbResult},
19    event::{HotShotAction, LeafInfo},
20    message::{Proposal, convert_proposal},
21    new_protocol::CoordinatorEvent,
22    simple_certificate::{
23        CertificatePair, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
24        QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
25    },
26    stake_table::HSStakeTable,
27    traits::{
28        ValidatedState as HotShotState, metrics::Metrics, node_implementation::NodeType,
29        storage::Storage,
30    },
31    utils::genesis_epoch_from_version,
32    vote::HasViewNumber,
33};
34use indexmap::IndexMap;
35use serde::{Serialize, de::DeserializeOwned};
36use versions::Upgrade;
37
38use super::{
39    impls::NodeState,
40    utils::BackoffParams,
41    v0_3::{EventKey, IndexedStake, StakeTableEvent},
42};
43use crate::{
44    AuthenticatedValidatorMap, BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment,
45    Leaf2, NetworkConfig, PubKey, SeqTypes,
46    v0::impls::{StakeTableHash, ValidatedState},
47    v0_3::{
48        ChainConfig, RegisteredValidator, RewardAccountProofV1, RewardAccountV1, RewardAmount,
49        RewardMerkleCommitmentV1,
50    },
51    v0_4::{PermittedRewardMerkleTreeV2, RewardAccountV2, RewardMerkleCommitmentV2},
52};
53
54#[async_trait]
55pub trait StateCatchup: Send + Sync {
56    /// Fetch the leaf at the given height without retrying on transient errors.
57    async fn try_fetch_leaf(
58        &self,
59        retry: usize,
60        height: u64,
61        stake_table: HSStakeTable<SeqTypes>,
62        success_threshold: U256,
63    ) -> anyhow::Result<Leaf2>;
64
65    /// Fetch the leaf at the given height, retrying on transient errors.
66    async fn fetch_leaf(
67        &self,
68        height: u64,
69        stake_table: HSStakeTable<SeqTypes>,
70        success_threshold: U256,
71    ) -> anyhow::Result<Leaf2> {
72        self.backoff()
73            .retry(self, |provider, retry| {
74                let stake_table_clone = stake_table.clone();
75                async move {
76                    provider
77                        .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
78                        .await
79                }
80                .boxed()
81            })
82            .await
83    }
84
85    /// Fetch the given list of accounts without retrying on transient errors.
86    async fn try_fetch_accounts(
87        &self,
88        retry: usize,
89        instance: &NodeState,
90        height: u64,
91        view: ViewNumber,
92        fee_merkle_tree_root: FeeMerkleCommitment,
93        accounts: &[FeeAccount],
94    ) -> anyhow::Result<Vec<FeeAccountProof>>;
95
96    /// Fetch the given list of accounts, retrying on transient errors.
97    async fn fetch_accounts(
98        &self,
99        instance: &NodeState,
100        height: u64,
101        view: ViewNumber,
102        fee_merkle_tree_root: FeeMerkleCommitment,
103        accounts: Vec<FeeAccount>,
104    ) -> anyhow::Result<Vec<FeeAccountProof>> {
105        self.backoff()
106            .retry(self, |provider, retry| {
107                let accounts = &accounts;
108                async move {
109                    provider
110                        .try_fetch_accounts(
111                            retry,
112                            instance,
113                            height,
114                            view,
115                            fee_merkle_tree_root,
116                            accounts,
117                        )
118                        .await
119                        .map_err(|err| {
120                            err.context(format!(
121                                "fetching accounts {accounts:?}, height {height}, view {view}"
122                            ))
123                        })
124                }
125                .boxed()
126            })
127            .await
128    }
129
130    /// Fetch and remember the blocks frontier without retrying on transient errors.
131    async fn try_remember_blocks_merkle_tree(
132        &self,
133        retry: usize,
134        instance: &NodeState,
135        height: u64,
136        view: ViewNumber,
137        mt: &mut BlockMerkleTree,
138    ) -> anyhow::Result<()>;
139
140    /// Fetch and remember the blocks frontier, retrying on transient errors.
141    async fn remember_blocks_merkle_tree(
142        &self,
143        instance: &NodeState,
144        height: u64,
145        view: ViewNumber,
146        mt: &mut BlockMerkleTree,
147    ) -> anyhow::Result<()> {
148        self.backoff()
149            .retry(mt, |mt, retry| {
150                self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
151                    .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
152                    .boxed()
153            })
154            .await
155    }
156
157    /// Fetch the chain config without retrying on transient errors.
158    async fn try_fetch_chain_config(
159        &self,
160        retry: usize,
161        commitment: Commitment<ChainConfig>,
162    ) -> anyhow::Result<ChainConfig>;
163
164    /// Fetch the chain config, retrying on transient errors.
165    async fn fetch_chain_config(
166        &self,
167        commitment: Commitment<ChainConfig>,
168    ) -> anyhow::Result<ChainConfig> {
169        self.backoff()
170            .retry(self, |provider, retry| {
171                provider
172                    .try_fetch_chain_config(retry, commitment)
173                    .map_err(|err| err.context("fetching chain config"))
174                    .boxed()
175            })
176            .await
177    }
178
179    /// Fetch the given reward merkle tree without retrying on transient errors.
180    async fn try_fetch_reward_merkle_tree_v2(
181        &self,
182        retry: usize,
183        height: u64,
184        view: ViewNumber,
185        reward_merkle_tree_root: RewardMerkleCommitmentV2,
186        accounts: Arc<Vec<RewardAccountV2>>,
187    ) -> anyhow::Result<PermittedRewardMerkleTreeV2>;
188
189    async fn fetch_reward_merkle_tree_v2(
190        &self,
191        height: u64,
192        view: ViewNumber,
193        reward_merkle_tree_root: RewardMerkleCommitmentV2,
194        accounts: Arc<Vec<RewardAccountV2>>,
195    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
196        self.backoff()
197            .retry(self, |provider, retry| {
198                let accounts = accounts.clone();
199                async move {
200                    provider
201                        .try_fetch_reward_merkle_tree_v2(
202                            retry,
203                            height,
204                            view,
205                            reward_merkle_tree_root,
206                            accounts,
207                        )
208                        .await
209                        .map_err(|err| {
210                            err.context(format!("fetching reward merkle tree for height {height}"))
211                        })
212                }
213                .boxed()
214            })
215            .await
216    }
217
218    /// Fetch the given list of reward accounts without retrying on transient errors.
219    async fn try_fetch_reward_accounts_v1(
220        &self,
221        retry: usize,
222        instance: &NodeState,
223        height: u64,
224        view: ViewNumber,
225        reward_merkle_tree_root: RewardMerkleCommitmentV1,
226        accounts: &[RewardAccountV1],
227    ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
228
229    /// Fetch the given list of reward accounts, retrying on transient errors.
230    async fn fetch_reward_accounts_v1(
231        &self,
232        instance: &NodeState,
233        height: u64,
234        view: ViewNumber,
235        reward_merkle_tree_root: RewardMerkleCommitmentV1,
236        accounts: Vec<RewardAccountV1>,
237    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
238        self.backoff()
239            .retry(self, |provider, retry| {
240                let accounts = &accounts;
241                async move {
242                    provider
243                        .try_fetch_reward_accounts_v1(
244                            retry,
245                            instance,
246                            height,
247                            view,
248                            reward_merkle_tree_root,
249                            accounts,
250                        )
251                        .await
252                        .map_err(|err| {
253                            err.context(format!(
254                                "fetching v1 reward accounts {accounts:?}, height {height}, view \
255                                 {view}"
256                            ))
257                        })
258                }
259                .boxed()
260            })
261            .await
262    }
263
264    /// Fetch the state certificate for a given epoch without retrying on transient errors.
265    async fn try_fetch_state_cert(
266        &self,
267        retry: usize,
268        epoch: u64,
269    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>>;
270
271    /// Fetch the state certificate for a given epoch, retrying on transient errors.
272    async fn fetch_state_cert(
273        &self,
274        epoch: u64,
275    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
276        self.backoff()
277            .retry(self, |provider, retry| {
278                provider
279                    .try_fetch_state_cert(retry, epoch)
280                    .map_err(|err| err.context(format!("fetching state cert for epoch {epoch}")))
281                    .boxed()
282            })
283            .await
284    }
285
286    /// Returns true if the catchup provider is local (e.g. does not make calls to remote resources).
287    fn is_local(&self) -> bool;
288
289    /// Returns the backoff parameters for the catchup provider.
290    fn backoff(&self) -> &BackoffParams;
291
292    /// Returns the name of the catchup provider.
293    fn name(&self) -> String;
294}
295
296#[async_trait]
297impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
298    async fn try_fetch_leaf(
299        &self,
300        retry: usize,
301        height: u64,
302        stake_table: HSStakeTable<SeqTypes>,
303        success_threshold: U256,
304    ) -> anyhow::Result<Leaf2> {
305        (**self)
306            .try_fetch_leaf(retry, height, stake_table, success_threshold)
307            .await
308    }
309
310    async fn fetch_leaf(
311        &self,
312        height: u64,
313        stake_table: HSStakeTable<SeqTypes>,
314        success_threshold: U256,
315    ) -> anyhow::Result<Leaf2> {
316        (**self)
317            .fetch_leaf(height, stake_table, success_threshold)
318            .await
319    }
320
321    async fn try_fetch_accounts(
322        &self,
323        retry: usize,
324        instance: &NodeState,
325        height: u64,
326        view: ViewNumber,
327        fee_merkle_tree_root: FeeMerkleCommitment,
328        accounts: &[FeeAccount],
329    ) -> anyhow::Result<Vec<FeeAccountProof>> {
330        (**self)
331            .try_fetch_accounts(
332                retry,
333                instance,
334                height,
335                view,
336                fee_merkle_tree_root,
337                accounts,
338            )
339            .await
340    }
341
342    async fn fetch_accounts(
343        &self,
344        instance: &NodeState,
345        height: u64,
346        view: ViewNumber,
347        fee_merkle_tree_root: FeeMerkleCommitment,
348        accounts: Vec<FeeAccount>,
349    ) -> anyhow::Result<Vec<FeeAccountProof>> {
350        (**self)
351            .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
352            .await
353    }
354
355    async fn try_remember_blocks_merkle_tree(
356        &self,
357        retry: usize,
358        instance: &NodeState,
359        height: u64,
360        view: ViewNumber,
361        mt: &mut BlockMerkleTree,
362    ) -> anyhow::Result<()> {
363        (**self)
364            .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
365            .await
366    }
367
368    async fn remember_blocks_merkle_tree(
369        &self,
370        instance: &NodeState,
371        height: u64,
372        view: ViewNumber,
373        mt: &mut BlockMerkleTree,
374    ) -> anyhow::Result<()> {
375        (**self)
376            .remember_blocks_merkle_tree(instance, height, view, mt)
377            .await
378    }
379
380    async fn try_fetch_chain_config(
381        &self,
382        retry: usize,
383        commitment: Commitment<ChainConfig>,
384    ) -> anyhow::Result<ChainConfig> {
385        (**self).try_fetch_chain_config(retry, commitment).await
386    }
387
388    async fn fetch_chain_config(
389        &self,
390        commitment: Commitment<ChainConfig>,
391    ) -> anyhow::Result<ChainConfig> {
392        (**self).fetch_chain_config(commitment).await
393    }
394
395    async fn try_fetch_reward_merkle_tree_v2(
396        &self,
397        retry: usize,
398        height: u64,
399        view: ViewNumber,
400        reward_merkle_tree_root: RewardMerkleCommitmentV2,
401        accounts: Arc<Vec<RewardAccountV2>>,
402    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
403        (**self)
404            .try_fetch_reward_merkle_tree_v2(retry, height, view, reward_merkle_tree_root, accounts)
405            .await
406    }
407
408    async fn fetch_reward_merkle_tree_v2(
409        &self,
410        height: u64,
411        view: ViewNumber,
412        reward_merkle_tree_root: RewardMerkleCommitmentV2,
413        accounts: Arc<Vec<RewardAccountV2>>,
414    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
415        (**self)
416            .fetch_reward_merkle_tree_v2(height, view, reward_merkle_tree_root, accounts)
417            .await
418    }
419
420    async fn try_fetch_reward_accounts_v1(
421        &self,
422        retry: usize,
423        instance: &NodeState,
424        height: u64,
425        view: ViewNumber,
426        reward_merkle_tree_root: RewardMerkleCommitmentV1,
427        accounts: &[RewardAccountV1],
428    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
429        (**self)
430            .try_fetch_reward_accounts_v1(
431                retry,
432                instance,
433                height,
434                view,
435                reward_merkle_tree_root,
436                accounts,
437            )
438            .await
439    }
440
441    async fn fetch_reward_accounts_v1(
442        &self,
443        instance: &NodeState,
444        height: u64,
445        view: ViewNumber,
446        reward_merkle_tree_root: RewardMerkleCommitmentV1,
447        accounts: Vec<RewardAccountV1>,
448    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
449        (**self)
450            .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
451            .await
452    }
453
454    async fn try_fetch_state_cert(
455        &self,
456        retry: usize,
457        epoch: u64,
458    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
459        (**self).try_fetch_state_cert(retry, epoch).await
460    }
461
462    async fn fetch_state_cert(
463        &self,
464        epoch: u64,
465    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
466        (**self).fetch_state_cert(epoch).await
467    }
468
469    fn backoff(&self) -> &BackoffParams {
470        (**self).backoff()
471    }
472
473    fn name(&self) -> String {
474        (**self).name()
475    }
476
477    fn is_local(&self) -> bool {
478        (**self).is_local()
479    }
480}
481
482#[async_trait]
483pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
484    type Persistence: SequencerPersistence + MembershipPersistence;
485
486    fn set_view_retention(&mut self, view_retention: u64);
487    async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
488    async fn reset(self) -> anyhow::Result<()>;
489}
490
491/// Determine the read state based on the queried block range.
492// - If the persistence returned events up to the requested block, the read is complete.
493/// - Otherwise, indicate that the read is up to the last processed block.
494#[derive(Clone, Copy, Debug, PartialEq, Eq)]
495pub enum EventsPersistenceRead {
496    Complete,
497    UntilL1Block(u64),
498}
499
500/// Tuple type for stake table data: (validators, block_reward, stake_table_hash)
501pub type StakeTuple = (
502    AuthenticatedValidatorMap,
503    Option<RewardAmount>,
504    Option<StakeTableHash>,
505);
506
507#[async_trait]
508/// Trait used by `Memberships` implementations to interact with persistence layer.
509pub trait MembershipPersistence: Send + Sync + 'static {
510    /// Load stake table for epoch from storage
511    async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<StakeTuple>>;
512
513    /// Load stake tables for storage for latest `n` known epochs
514    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
515
516    /// Store stake table at `epoch` in the persistence layer
517    async fn store_stake(
518        &self,
519        epoch: EpochNumber,
520        stake: AuthenticatedValidatorMap,
521        block_reward: Option<RewardAmount>,
522        stake_table_hash: Option<StakeTableHash>,
523    ) -> anyhow::Result<()>;
524
525    async fn store_events(
526        &self,
527        l1_finalized: u64,
528        events: Vec<(EventKey, StakeTableEvent)>,
529    ) -> anyhow::Result<()>;
530    async fn load_events(
531        &self,
532        from_l1_block: u64,
533        l1_finalized: u64,
534    ) -> anyhow::Result<(
535        Option<EventsPersistenceRead>,
536        Vec<(EventKey, StakeTableEvent)>,
537    )>;
538
539    /// Delete all stake table events, the L1 block tracker, and the epoch DRB and root data.
540    async fn delete_stake_tables(&self) -> anyhow::Result<()>;
541
542    async fn store_all_validators(
543        &self,
544        epoch: EpochNumber,
545        all_validators: IndexMap<Address, RegisteredValidator<PubKey>>,
546    ) -> anyhow::Result<()>;
547
548    async fn load_all_validators(
549        &self,
550        epoch: EpochNumber,
551        offset: u64,
552        limit: u64,
553    ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>>;
554}
555
556#[async_trait]
557pub trait SequencerPersistence:
558    Sized + Send + Sync + Clone + 'static + DhtPersistentStorage + MembershipPersistence
559{
560    async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()>;
561
562    /// Use this storage as a state catchup backend, if supported.
563    fn into_catchup_provider(
564        self,
565        _backoff: BackoffParams,
566    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
567        bail!("state catchup is not implemented for this persistence type");
568    }
569
570    /// Load the orchestrator config from storage.
571    ///
572    /// Returns `None` if no config exists (we are joining a network for the first time). Fails with
573    /// `Err` if it could not be determined whether a config exists or not.
574    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
575
576    /// Save the orchestrator config to storage.
577    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
578
579    /// Load the highest view saved with [`save_voted_view`](Self::save_voted_view).
580    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
581
582    /// Load the view to restart from.
583    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
584
585    /// Load the proposals saved by consensus
586    async fn load_quorum_proposals(
587        &self,
588    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
589
590    async fn load_quorum_proposal(
591        &self,
592        view: ViewNumber,
593    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
594
595    async fn load_vid_share(
596        &self,
597        view: ViewNumber,
598    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
599    async fn load_da_proposal(
600        &self,
601        view: ViewNumber,
602    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
603    async fn load_upgrade_certificate(
604        &self,
605    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
606    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
607    async fn load_state_cert(
608        &self,
609    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
610
611    /// Get a state certificate for an epoch.
612    async fn get_state_cert_by_epoch(
613        &self,
614        epoch: u64,
615    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
616
617    /// Insert a state certificate for a given epoch.
618    async fn insert_state_cert(
619        &self,
620        epoch: u64,
621        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
622    ) -> anyhow::Result<()>;
623
624    /// Load the latest known consensus state.
625    ///
626    /// Returns an initializer to resume HotShot from the latest saved state (or start from genesis,
627    /// if there is no saved state). Also returns the anchor view number, which can be used as a
628    /// reference point to process any events which were not processed before a previous shutdown,
629    /// if applicable,.
630    async fn load_consensus_state(
631        &self,
632        state: NodeState,
633        upgrade: Upgrade,
634    ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
635        let genesis_validated_state = ValidatedState::genesis(&state).0;
636        let highest_voted_view = match self
637            .load_latest_acted_view()
638            .await
639            .context("loading last voted view")?
640        {
641            Some(view) => {
642                tracing::info!(?view, "starting with last actioned view");
643                view
644            },
645            None => {
646                tracing::info!("no saved view, starting from genesis");
647                ViewNumber::genesis()
648            },
649        };
650
651        let restart_view = match self
652            .load_restart_view()
653            .await
654            .context("loading restart view")?
655        {
656            Some(view) => {
657                tracing::info!(?view, "starting from saved view");
658                view
659            },
660            None => {
661                tracing::info!("no saved view, starting from genesis");
662                ViewNumber::genesis()
663            },
664        };
665        let next_epoch_high_qc = self
666            .load_next_epoch_quorum_certificate()
667            .await
668            .context("loading next epoch qc")?;
669        let (leaf, mut high_qc, anchor_view) = match self
670            .load_anchor_leaf()
671            .await
672            .context("loading anchor leaf")?
673        {
674            Some((leaf, high_qc)) => {
675                tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
676                ensure!(
677                    leaf.view_number() == high_qc.view_number,
678                    format!(
679                        "loaded anchor leaf from view {}, but high QC is from view {}",
680                        leaf.view_number(),
681                        high_qc.view_number
682                    )
683                );
684
685                let anchor_view = leaf.view_number();
686                (leaf, high_qc, Some(anchor_view))
687            },
688            None => {
689                tracing::info!("no saved leaf, starting from genesis leaf");
690                (
691                    hotshot_types::data::Leaf2::genesis(
692                        &genesis_validated_state,
693                        &state,
694                        upgrade.base,
695                    )
696                    .await,
697                    QuorumCertificate2::genesis(&genesis_validated_state, &state, upgrade).await,
698                    None,
699                )
700            },
701        };
702
703        if let Some((extended_high_qc, _)) = self.load_eqc().await
704            && extended_high_qc.view_number() > high_qc.view_number()
705        {
706            high_qc = extended_high_qc
707        }
708
709        let validated_state = if leaf.block_header().height() == 0 {
710            // If we are starting from genesis, we can provide the full state.
711            genesis_validated_state
712        } else {
713            // Otherwise, we will have to construct a sparse state and fetch missing data during
714            // catchup.
715            ValidatedState::from_header(leaf.block_header())
716        };
717
718        // If we are not starting from genesis, we start from the view following the maximum view
719        // between `highest_voted_view` and `leaf.view_number`. This prevents double votes from
720        // starting in a view in which we had already voted before the restart, and prevents
721        // unnecessary catchup from starting in a view earlier than the anchor leaf.
722        let restart_view = max(restart_view, leaf.view_number());
723        // TODO:
724        let epoch = genesis_epoch_from_version(upgrade.base);
725
726        let config = self.load_config().await.context("loading config")?;
727        let epoch_height = config
728            .as_ref()
729            .map(|c| c.config.epoch_height)
730            .unwrap_or_default();
731        let epoch_start_block = config
732            .as_ref()
733            .map(|c| c.config.epoch_start_block)
734            .unwrap_or_default();
735
736        let saved_proposals = self
737            .load_quorum_proposals()
738            .await
739            .context("loading saved proposals")?;
740
741        let upgrade_certificate = self
742            .load_upgrade_certificate()
743            .await
744            .context("loading upgrade certificate")?;
745
746        let start_epoch_info = self
747            .load_start_epoch_info()
748            .await
749            .context("loading start epoch info")?;
750
751        let state_cert = self
752            .load_state_cert()
753            .await
754            .context("loading light client state update certificate")?;
755
756        tracing::warn!(
757            ?leaf,
758            ?restart_view,
759            ?epoch,
760            ?high_qc,
761            ?validated_state,
762            ?state_cert,
763            "loaded consensus state"
764        );
765
766        Ok((
767            HotShotInitializer {
768                instance_state: state,
769                epoch_height,
770                epoch_start_block,
771                anchor_leaf: leaf,
772                anchor_state: Arc::new(validated_state),
773                anchor_state_delta: None,
774                start_view: restart_view,
775                start_epoch: epoch,
776                last_actioned_view: highest_voted_view,
777                saved_proposals,
778                high_qc,
779                next_epoch_high_qc,
780                decided_upgrade_certificate: upgrade_certificate,
781                undecided_leaves: Default::default(),
782                undecided_state: Default::default(),
783                saved_vid_shares: Default::default(), // TODO: implement saved_vid_shares
784                start_epoch_info,
785                state_cert,
786            },
787            anchor_view,
788        ))
789    }
790
791    /// Decode a consensus decide event and persist its leaves, for the consensus event loop.
792    /// Returns `Some((decided_view, deciding_qc))` on a decide so the caller can wake a background
793    /// task to run [`process_decided_events`](Self::process_decided_events); `None` otherwise.
794    ///
795    /// This is the persist-only half of a decide: query-service ingestion and GC are deferred to
796    /// [`process_decided_events`](Self::process_decided_events). Tests that want the synchronous
797    /// persist-then-process behavior use [`append_decided_leaves`](Self::append_decided_leaves).
798    async fn persist_event(
799        &self,
800        event: &CoordinatorEvent<SeqTypes>,
801        consumer: &(impl EventConsumer + 'static),
802    ) -> Option<(ViewNumber, Option<Arc<CertificatePair<SeqTypes>>>)> {
803        match event {
804            CoordinatorEvent::LegacyEvent(hotshot_event) => {
805                let EventType::Decide {
806                    leaf_chain,
807                    committing_qc,
808                    deciding_qc,
809                    ..
810                } = &hotshot_event.event
811                else {
812                    return None;
813                };
814                let LeafInfo { leaf, .. } = leaf_chain.first()?;
815                let decided_view = leaf.view_number();
816
817                let chain = leaf_chain.iter().zip(
818                    std::iter::once((**committing_qc).clone()).chain(
819                        leaf_chain
820                            .iter()
821                            .map(|leaf| CertificatePair::for_parent(&leaf.leaf)),
822                    ),
823                );
824
825                if let Err(err) = self
826                    .persist_decided_leaves(decided_view, chain, deciding_qc.clone(), consumer)
827                    .await
828                {
829                    tracing::error!(
830                        "failed to save decided leaves, chain may not be up to date: {err:#}"
831                    );
832                    return None;
833                }
834                Some((decided_view, deciding_qc.clone()))
835            },
836            CoordinatorEvent::NewDecide {
837                leaf_infos, cert1, ..
838            } => {
839                let first = leaf_infos.first()?;
840                let decided_view = first.leaf.view_number();
841
842                // `cert1` certifies the newest leaf; each newer leaf's justify_qc certifies the
843                // next older leaf.
844                let certifying_qcs = std::iter::once(cert1.clone())
845                    .chain(leaf_infos.iter().map(|info| info.leaf.justify_qc()))
846                    .take(leaf_infos.len())
847                    .map(CertificatePair::non_epoch_change);
848
849                if let Err(err) = self
850                    .persist_decided_leaves(
851                        decided_view,
852                        leaf_infos.iter().zip(certifying_qcs),
853                        None,
854                        consumer,
855                    )
856                    .await
857                {
858                    tracing::error!(
859                        "failed to save decided leaves from new protocol, chain may not be up to \
860                         date: {err:#}"
861                    );
862                    return None;
863                }
864                Some((decided_view, None))
865            },
866            _ => None,
867        }
868    }
869
870    /// Append decided leaves to persistent storage and emit a corresponding event.
871    ///
872    /// `consumer` will be sent a `Decide` event containing all decided leaves in persistent storage
873    /// up to and including `view`. If available in persistent storage, full block payloads and VID
874    /// info will also be included for each leaf.
875    ///
876    /// Once the new decided leaves have been processed, old data up to `view` will be garbage
877    /// collected The consumer's handling of this event is a prerequisite for the completion of
878    /// garbage collection: if the consumer fails to process the event, no data is deleted. This
879    /// ensures that, if called repeatedly, all decided leaves ever recorded in consensus storage
880    /// will eventually be passed to the consumer.
881    ///
882    /// Note that the converse is not true: if garbage collection fails, it is not guaranteed that
883    /// the consumer hasn't processed the decide event. Thus, in rare cases, some events may be
884    /// processed twice, or the consumer may get two events which share a subset of their data.
885    /// Thus, it is the consumer's responsibility to make sure its handling of each leaf is
886    /// idempotent.
887    ///
888    /// If the consumer fails to handle the new decide event, it may be retried, or simply postponed
889    /// until the next decide, at which point all persisted leaves from the failed GC run will be
890    /// included in the event along with subsequently decided leaves.
891    ///
892    /// This functionality is useful for keeping a separate view of the blockchain in sync with the
893    /// consensus storage. For example, the `consumer` could be used for moving data from consensus
894    /// storage to long-term archival storage.
895    ///
896    /// Convenience combinator: [`persist_decided_leaves`](Self::persist_decided_leaves) then
897    /// [`process_decided_events`](Self::process_decided_events). Production drives the two halves on
898    /// separate tasks; tests and back-compat callers use this synchronous form.
899    async fn append_decided_leaves(
900        &self,
901        decided_view: ViewNumber,
902        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
903        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
904        consumer: &(impl EventConsumer + 'static),
905    ) -> anyhow::Result<()> {
906        self.persist_decided_leaves(decided_view, leaf_chain, deciding_qc.clone(), consumer)
907            .await?;
908        // Leaves are persisted; processing failures are non-fatal here and retried in production.
909        if let Err(err) = self
910            .process_decided_events(decided_view, deciding_qc, consumer)
911            .await
912        {
913            tracing::warn!(?decided_view, "decide event processing failed: {err:#}");
914        }
915        Ok(())
916    }
917
918    /// Persist decided leaves only (the critical, must-not-lag half of a decide; also the
919    /// anchor for restart recovery). Query-service ingestion and GC are deferred to
920    /// [`process_decided_events`](Self::process_decided_events). Backends with no replayable storage
921    /// (e.g. `NoStorage`) may instead forward decide events to `consumer` here.
922    async fn persist_decided_leaves(
923        &self,
924        decided_view: ViewNumber,
925        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
926        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
927        consumer: &(impl EventConsumer + 'static),
928    ) -> anyhow::Result<()>;
929
930    /// Generate decide events for `consumer` from persisted leaves, then GC processed data.
931    /// Cursor-driven (e.g. `last_processed_view`): advances only on success, so it may lag
932    /// consensus without losing data.
933    ///
934    /// Returns the highest view confirmed processed (the cursor), or `None` if nothing was
935    /// processed, so the caller can track real progress. Errors are propagated; the failed range
936    /// is retried on the next call.
937    ///
938    /// Default returns `Some(decided_view)`: backends with no replayable storage (e.g. `NoStorage`)
939    /// forward events synchronously in `persist_decided_leaves` and are always caught up here.
940    async fn process_decided_events(
941        &self,
942        decided_view: ViewNumber,
943        _deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
944        _consumer: &(impl EventConsumer + 'static),
945    ) -> anyhow::Result<Option<ViewNumber>> {
946        Ok(Some(decided_view))
947    }
948
949    async fn load_anchor_leaf(
950        &self,
951    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
952    async fn append_vid(
953        &self,
954        proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
955    ) -> anyhow::Result<()>;
956    async fn append_da(
957        &self,
958        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
959        vid_commit: VidCommitment,
960    ) -> anyhow::Result<()>;
961    async fn record_action(
962        &self,
963        view: ViewNumber,
964        epoch: Option<EpochNumber>,
965        action: HotShotAction,
966    ) -> anyhow::Result<()>;
967
968    async fn append_quorum_proposal2(
969        &self,
970        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
971    ) -> anyhow::Result<()>;
972
973    /// Persist cert2 for the given view.
974    async fn append_cert2(
975        &self,
976        _view: ViewNumber,
977        _cert2: Certificate2<SeqTypes>,
978    ) -> anyhow::Result<()> {
979        Ok(())
980    }
981
982    /// Load a persisted cert2 by view, if any.
983    async fn load_cert2(
984        &self,
985        _view: ViewNumber,
986    ) -> anyhow::Result<Option<Certificate2<SeqTypes>>> {
987        Ok(None)
988    }
989
990    /// Update the current eQC in storage.
991    async fn store_eqc(
992        &self,
993        _high_qc: QuorumCertificate2<SeqTypes>,
994        _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
995    ) -> anyhow::Result<()>;
996
997    /// Load the current eQC from storage.
998    async fn load_eqc(
999        &self,
1000    ) -> Option<(
1001        QuorumCertificate2<SeqTypes>,
1002        NextEpochQuorumCertificate2<SeqTypes>,
1003    )>;
1004
1005    async fn store_upgrade_certificate(
1006        &self,
1007        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1008    ) -> anyhow::Result<()>;
1009
1010    async fn migrate_storage(&self) -> anyhow::Result<()> {
1011        tracing::warn!("migrating consensus data...");
1012
1013        self.migrate_anchor_leaf().await?;
1014        self.migrate_da_proposals().await?;
1015        self.migrate_vid_shares().await?;
1016        self.migrate_quorum_proposals().await?;
1017        self.migrate_quorum_certificates().await?;
1018        self.migrate_reward_merkle_tree_v2()
1019            .await
1020            .context("failed to migrate reward merkle tree v2")?;
1021        self.migrate_x25519_keys().await?;
1022        tracing::warn!("consensus storage has been migrated to new types");
1023
1024        Ok(())
1025    }
1026
1027    async fn migrate_x25519_keys(&self) -> anyhow::Result<()>;
1028
1029    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
1030    async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
1031    async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
1032    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
1033    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
1034
1035    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
1036        match self.load_anchor_leaf().await? {
1037            Some((leaf, _)) => Ok(leaf.view_number()),
1038            None => Ok(ViewNumber::genesis()),
1039        }
1040    }
1041
1042    async fn store_next_epoch_quorum_certificate(
1043        &self,
1044        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1045    ) -> anyhow::Result<()>;
1046
1047    async fn load_next_epoch_quorum_certificate(
1048        &self,
1049    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
1050
1051    async fn append_da2(
1052        &self,
1053        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1054        vid_commit: VidCommitment,
1055    ) -> anyhow::Result<()>;
1056
1057    async fn append_proposal2(
1058        &self,
1059        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1060    ) -> anyhow::Result<()> {
1061        self.append_quorum_proposal2(proposal).await
1062    }
1063
1064    async fn store_drb_result(
1065        &self,
1066        epoch: EpochNumber,
1067        drb_result: DrbResult,
1068    ) -> anyhow::Result<()>;
1069    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
1070    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
1071    async fn store_epoch_root(
1072        &self,
1073        epoch: EpochNumber,
1074        block_header: <SeqTypes as NodeType>::BlockHeader,
1075    ) -> anyhow::Result<()>;
1076    async fn add_state_cert(
1077        &self,
1078        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1079    ) -> anyhow::Result<()>;
1080
1081    fn enable_metrics(&mut self, metrics: &dyn Metrics);
1082}
1083
1084#[async_trait]
1085pub trait EventConsumer: Debug + Send + Sync {
1086    async fn handle_event(&self, event: &CoordinatorEvent<SeqTypes>) -> anyhow::Result<()>;
1087}
1088
1089#[async_trait]
1090impl<T> EventConsumer for Box<T>
1091where
1092    T: EventConsumer + ?Sized,
1093{
1094    async fn handle_event(&self, event: &CoordinatorEvent<SeqTypes>) -> anyhow::Result<()> {
1095        (**self).handle_event(event).await
1096    }
1097}
1098
1099#[derive(Clone, Copy, Debug)]
1100pub struct NullEventConsumer;
1101
1102#[async_trait]
1103impl EventConsumer for NullEventConsumer {
1104    async fn handle_event(&self, _event: &CoordinatorEvent<SeqTypes>) -> anyhow::Result<()> {
1105        Ok(())
1106    }
1107}
1108
1109#[async_trait]
1110impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
1111    async fn append_vid(
1112        &self,
1113        proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
1114    ) -> anyhow::Result<()> {
1115        (**self).append_vid(proposal).await
1116    }
1117
1118    async fn append_da(
1119        &self,
1120        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1121        vid_commit: VidCommitment,
1122    ) -> anyhow::Result<()> {
1123        (**self).append_da(proposal, vid_commit).await
1124    }
1125
1126    async fn append_da2(
1127        &self,
1128        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1129        vid_commit: VidCommitment,
1130    ) -> anyhow::Result<()> {
1131        (**self).append_da2(proposal, vid_commit).await
1132    }
1133
1134    async fn record_action(
1135        &self,
1136        view: ViewNumber,
1137        epoch: Option<EpochNumber>,
1138        action: HotShotAction,
1139    ) -> anyhow::Result<()> {
1140        (**self).record_action(view, epoch, action).await
1141    }
1142
1143    async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
1144        Ok(())
1145    }
1146
1147    async fn append_proposal(
1148        &self,
1149        proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
1150    ) -> anyhow::Result<()> {
1151        (**self)
1152            .append_quorum_proposal2(&convert_proposal(proposal.clone()))
1153            .await
1154    }
1155
1156    async fn append_proposal2(
1157        &self,
1158        proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
1159    ) -> anyhow::Result<()> {
1160        let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1161            convert_proposal(proposal.clone());
1162        (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
1163    }
1164
1165    async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
1166        Ok(())
1167    }
1168
1169    /// Update the current eQC in storage.
1170    async fn update_eqc(
1171        &self,
1172        high_qc: QuorumCertificate2<SeqTypes>,
1173        next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1174    ) -> anyhow::Result<()> {
1175        if let Some((existing_high_qc, _)) = (**self).load_eqc().await
1176            && high_qc.view_number() < existing_high_qc.view_number()
1177        {
1178            return Ok(());
1179        }
1180
1181        (**self).store_eqc(high_qc, next_epoch_high_qc).await
1182    }
1183
1184    async fn update_next_epoch_high_qc2(
1185        &self,
1186        _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1187    ) -> anyhow::Result<()> {
1188        Ok(())
1189    }
1190
1191    async fn update_decided_upgrade_certificate(
1192        &self,
1193        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1194    ) -> anyhow::Result<()> {
1195        (**self)
1196            .store_upgrade_certificate(decided_upgrade_certificate)
1197            .await
1198    }
1199
1200    async fn store_drb_result(
1201        &self,
1202        epoch: EpochNumber,
1203        drb_result: DrbResult,
1204    ) -> anyhow::Result<()> {
1205        (**self).store_drb_result(epoch, drb_result).await
1206    }
1207
1208    async fn store_epoch_root(
1209        &self,
1210        epoch: EpochNumber,
1211        block_header: <SeqTypes as NodeType>::BlockHeader,
1212    ) -> anyhow::Result<()> {
1213        (**self).store_epoch_root(epoch, block_header).await
1214    }
1215
1216    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1217        (**self).store_drb_input(drb_input).await
1218    }
1219
1220    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1221        (**self).load_drb_input(epoch).await
1222    }
1223
1224    async fn update_state_cert(
1225        &self,
1226        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1227    ) -> anyhow::Result<()> {
1228        (**self).add_state_cert(state_cert).await
1229    }
1230}
1231
1232#[async_trait]
1233impl<P: SequencerPersistence> NewProtocolStorage<SeqTypes> for Arc<P> {
1234    async fn append_cert2(
1235        &self,
1236        view: ViewNumber,
1237        cert: Certificate2<SeqTypes>,
1238    ) -> anyhow::Result<()> {
1239        (**self).append_cert2(view, cert).await
1240    }
1241}
1242
1243/// Data that can be deserialized from a subslice of namespace payload bytes.
1244///
1245/// Companion trait for [`NsPayloadBytesRange`], which specifies the subslice of
1246/// namespace payload bytes to read.
1247pub trait FromNsPayloadBytes<'a> {
1248    /// Deserialize `Self` from namespace payload bytes.
1249    fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1250}
1251
1252/// Specifies a subslice of namespace payload bytes to read.
1253///
1254/// Companion trait for [`FromNsPayloadBytes`], which holds data that can be
1255/// deserialized from that subslice of bytes.
1256pub trait NsPayloadBytesRange<'a> {
1257    type Output: FromNsPayloadBytes<'a>;
1258
1259    /// Range relative to this ns payload
1260    fn ns_payload_range(&self) -> Range<usize>;
1261}
1262
1263/// Types which can be deserialized from either integers or strings.
1264///
1265/// Some types can be represented as an integer or a string in human-readable formats like JSON or
1266/// TOML. For example, 1 GWEI might be represented by the integer `1000000000` or the string `"1
1267/// gwei"`. Such types can implement `FromStringOrInteger` and then use [`impl_string_or_integer`]
1268/// to derive this user-friendly serialization.
1269///
1270/// These types are assumed to have an efficient representation as an integral type in Rust --
1271/// [`Self::Binary`] -- and will be serialized to and from this type when using a non-human-readable
1272/// encoding. With human readable encodings, serialization is always to a string.
1273pub trait FromStringOrInteger: Sized {
1274    type Binary: Serialize + DeserializeOwned;
1275    type Integer: Serialize + DeserializeOwned;
1276
1277    fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1278    fn from_string(s: String) -> anyhow::Result<Self>;
1279    fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1280
1281    fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1282    fn to_string(&self) -> anyhow::Result<String>;
1283}