1use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::{Address, U256};
6use anyhow::{Context, bail, ensure};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{HotShotInitializer, InitializerEpochInfo, types::EventType};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_types::{
13 data::{
14 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
15 QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
16 },
17 drb::{DrbInput, DrbResult},
18 event::{HotShotAction, LeafInfo},
19 message::{Proposal, convert_proposal},
20 simple_certificate::{
21 CertificatePair, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
22 QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
23 },
24 stake_table::HSStakeTable,
25 traits::{
26 ValidatedState as HotShotState, metrics::Metrics, node_implementation::NodeType,
27 storage::Storage,
28 },
29 utils::genesis_epoch_from_version,
30 vote::HasViewNumber,
31};
32use indexmap::IndexMap;
33use serde::{Serialize, de::DeserializeOwned};
34use versions::Upgrade;
35
36use super::{
37 impls::NodeState,
38 utils::BackoffParams,
39 v0_3::{EventKey, IndexedStake, StakeTableEvent},
40};
41use crate::{
42 AuthenticatedValidatorMap, BlockMerkleTree, Event, FeeAccount, FeeAccountProof,
43 FeeMerkleCommitment, Leaf2, NetworkConfig, PubKey, SeqTypes,
44 v0::impls::{StakeTableHash, ValidatedState},
45 v0_3::{
46 ChainConfig, RegisteredValidator, RewardAccountProofV1, RewardAccountV1, RewardAmount,
47 RewardMerkleCommitmentV1,
48 },
49 v0_4::{PermittedRewardMerkleTreeV2, RewardAccountV2, RewardMerkleCommitmentV2},
50};
51
52#[async_trait]
53pub trait StateCatchup: Send + Sync {
54 async fn try_fetch_leaf(
56 &self,
57 retry: usize,
58 height: u64,
59 stake_table: HSStakeTable<SeqTypes>,
60 success_threshold: U256,
61 ) -> anyhow::Result<Leaf2>;
62
63 async fn fetch_leaf(
65 &self,
66 height: u64,
67 stake_table: HSStakeTable<SeqTypes>,
68 success_threshold: U256,
69 ) -> anyhow::Result<Leaf2> {
70 self.backoff()
71 .retry(self, |provider, retry| {
72 let stake_table_clone = stake_table.clone();
73 async move {
74 provider
75 .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
76 .await
77 }
78 .boxed()
79 })
80 .await
81 }
82
83 async fn try_fetch_accounts(
85 &self,
86 retry: usize,
87 instance: &NodeState,
88 height: u64,
89 view: ViewNumber,
90 fee_merkle_tree_root: FeeMerkleCommitment,
91 accounts: &[FeeAccount],
92 ) -> anyhow::Result<Vec<FeeAccountProof>>;
93
94 async fn fetch_accounts(
96 &self,
97 instance: &NodeState,
98 height: u64,
99 view: ViewNumber,
100 fee_merkle_tree_root: FeeMerkleCommitment,
101 accounts: Vec<FeeAccount>,
102 ) -> anyhow::Result<Vec<FeeAccountProof>> {
103 self.backoff()
104 .retry(self, |provider, retry| {
105 let accounts = &accounts;
106 async move {
107 provider
108 .try_fetch_accounts(
109 retry,
110 instance,
111 height,
112 view,
113 fee_merkle_tree_root,
114 accounts,
115 )
116 .await
117 .map_err(|err| {
118 err.context(format!(
119 "fetching accounts {accounts:?}, height {height}, view {view}"
120 ))
121 })
122 }
123 .boxed()
124 })
125 .await
126 }
127
128 async fn try_remember_blocks_merkle_tree(
130 &self,
131 retry: usize,
132 instance: &NodeState,
133 height: u64,
134 view: ViewNumber,
135 mt: &mut BlockMerkleTree,
136 ) -> anyhow::Result<()>;
137
138 async fn remember_blocks_merkle_tree(
140 &self,
141 instance: &NodeState,
142 height: u64,
143 view: ViewNumber,
144 mt: &mut BlockMerkleTree,
145 ) -> anyhow::Result<()> {
146 self.backoff()
147 .retry(mt, |mt, retry| {
148 self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
149 .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
150 .boxed()
151 })
152 .await
153 }
154
155 async fn try_fetch_chain_config(
157 &self,
158 retry: usize,
159 commitment: Commitment<ChainConfig>,
160 ) -> anyhow::Result<ChainConfig>;
161
162 async fn fetch_chain_config(
164 &self,
165 commitment: Commitment<ChainConfig>,
166 ) -> anyhow::Result<ChainConfig> {
167 self.backoff()
168 .retry(self, |provider, retry| {
169 provider
170 .try_fetch_chain_config(retry, commitment)
171 .map_err(|err| err.context("fetching chain config"))
172 .boxed()
173 })
174 .await
175 }
176
177 async fn try_fetch_reward_merkle_tree_v2(
179 &self,
180 retry: usize,
181 height: u64,
182 view: ViewNumber,
183 reward_merkle_tree_root: RewardMerkleCommitmentV2,
184 accounts: Arc<Vec<RewardAccountV2>>,
185 ) -> anyhow::Result<PermittedRewardMerkleTreeV2>;
186
187 async fn fetch_reward_merkle_tree_v2(
188 &self,
189 height: u64,
190 view: ViewNumber,
191 reward_merkle_tree_root: RewardMerkleCommitmentV2,
192 accounts: Arc<Vec<RewardAccountV2>>,
193 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
194 self.backoff()
195 .retry(self, |provider, retry| {
196 let accounts = accounts.clone();
197 async move {
198 provider
199 .try_fetch_reward_merkle_tree_v2(
200 retry,
201 height,
202 view,
203 reward_merkle_tree_root,
204 accounts,
205 )
206 .await
207 .map_err(|err| {
208 err.context(format!("fetching reward merkle tree for height {height}"))
209 })
210 }
211 .boxed()
212 })
213 .await
214 }
215
216 async fn try_fetch_reward_accounts_v1(
218 &self,
219 retry: usize,
220 instance: &NodeState,
221 height: u64,
222 view: ViewNumber,
223 reward_merkle_tree_root: RewardMerkleCommitmentV1,
224 accounts: &[RewardAccountV1],
225 ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
226
227 async fn fetch_reward_accounts_v1(
229 &self,
230 instance: &NodeState,
231 height: u64,
232 view: ViewNumber,
233 reward_merkle_tree_root: RewardMerkleCommitmentV1,
234 accounts: Vec<RewardAccountV1>,
235 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
236 self.backoff()
237 .retry(self, |provider, retry| {
238 let accounts = &accounts;
239 async move {
240 provider
241 .try_fetch_reward_accounts_v1(
242 retry,
243 instance,
244 height,
245 view,
246 reward_merkle_tree_root,
247 accounts,
248 )
249 .await
250 .map_err(|err| {
251 err.context(format!(
252 "fetching v1 reward accounts {accounts:?}, height {height}, view \
253 {view}"
254 ))
255 })
256 }
257 .boxed()
258 })
259 .await
260 }
261
262 async fn try_fetch_state_cert(
264 &self,
265 retry: usize,
266 epoch: u64,
267 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>>;
268
269 async fn fetch_state_cert(
271 &self,
272 epoch: u64,
273 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
274 self.backoff()
275 .retry(self, |provider, retry| {
276 provider
277 .try_fetch_state_cert(retry, epoch)
278 .map_err(|err| err.context(format!("fetching state cert for epoch {epoch}")))
279 .boxed()
280 })
281 .await
282 }
283
284 fn is_local(&self) -> bool;
286
287 fn backoff(&self) -> &BackoffParams;
289
290 fn name(&self) -> String;
292}
293
294#[async_trait]
295impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
296 async fn try_fetch_leaf(
297 &self,
298 retry: usize,
299 height: u64,
300 stake_table: HSStakeTable<SeqTypes>,
301 success_threshold: U256,
302 ) -> anyhow::Result<Leaf2> {
303 (**self)
304 .try_fetch_leaf(retry, height, stake_table, success_threshold)
305 .await
306 }
307
308 async fn fetch_leaf(
309 &self,
310 height: u64,
311 stake_table: HSStakeTable<SeqTypes>,
312 success_threshold: U256,
313 ) -> anyhow::Result<Leaf2> {
314 (**self)
315 .fetch_leaf(height, stake_table, success_threshold)
316 .await
317 }
318
319 async fn try_fetch_accounts(
320 &self,
321 retry: usize,
322 instance: &NodeState,
323 height: u64,
324 view: ViewNumber,
325 fee_merkle_tree_root: FeeMerkleCommitment,
326 accounts: &[FeeAccount],
327 ) -> anyhow::Result<Vec<FeeAccountProof>> {
328 (**self)
329 .try_fetch_accounts(
330 retry,
331 instance,
332 height,
333 view,
334 fee_merkle_tree_root,
335 accounts,
336 )
337 .await
338 }
339
340 async fn fetch_accounts(
341 &self,
342 instance: &NodeState,
343 height: u64,
344 view: ViewNumber,
345 fee_merkle_tree_root: FeeMerkleCommitment,
346 accounts: Vec<FeeAccount>,
347 ) -> anyhow::Result<Vec<FeeAccountProof>> {
348 (**self)
349 .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
350 .await
351 }
352
353 async fn try_remember_blocks_merkle_tree(
354 &self,
355 retry: usize,
356 instance: &NodeState,
357 height: u64,
358 view: ViewNumber,
359 mt: &mut BlockMerkleTree,
360 ) -> anyhow::Result<()> {
361 (**self)
362 .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
363 .await
364 }
365
366 async fn remember_blocks_merkle_tree(
367 &self,
368 instance: &NodeState,
369 height: u64,
370 view: ViewNumber,
371 mt: &mut BlockMerkleTree,
372 ) -> anyhow::Result<()> {
373 (**self)
374 .remember_blocks_merkle_tree(instance, height, view, mt)
375 .await
376 }
377
378 async fn try_fetch_chain_config(
379 &self,
380 retry: usize,
381 commitment: Commitment<ChainConfig>,
382 ) -> anyhow::Result<ChainConfig> {
383 (**self).try_fetch_chain_config(retry, commitment).await
384 }
385
386 async fn fetch_chain_config(
387 &self,
388 commitment: Commitment<ChainConfig>,
389 ) -> anyhow::Result<ChainConfig> {
390 (**self).fetch_chain_config(commitment).await
391 }
392
393 async fn try_fetch_reward_merkle_tree_v2(
394 &self,
395 retry: usize,
396 height: u64,
397 view: ViewNumber,
398 reward_merkle_tree_root: RewardMerkleCommitmentV2,
399 accounts: Arc<Vec<RewardAccountV2>>,
400 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
401 (**self)
402 .try_fetch_reward_merkle_tree_v2(retry, height, view, reward_merkle_tree_root, accounts)
403 .await
404 }
405
406 async fn fetch_reward_merkle_tree_v2(
407 &self,
408 height: u64,
409 view: ViewNumber,
410 reward_merkle_tree_root: RewardMerkleCommitmentV2,
411 accounts: Arc<Vec<RewardAccountV2>>,
412 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
413 (**self)
414 .fetch_reward_merkle_tree_v2(height, view, reward_merkle_tree_root, accounts)
415 .await
416 }
417
418 async fn try_fetch_reward_accounts_v1(
419 &self,
420 retry: usize,
421 instance: &NodeState,
422 height: u64,
423 view: ViewNumber,
424 reward_merkle_tree_root: RewardMerkleCommitmentV1,
425 accounts: &[RewardAccountV1],
426 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
427 (**self)
428 .try_fetch_reward_accounts_v1(
429 retry,
430 instance,
431 height,
432 view,
433 reward_merkle_tree_root,
434 accounts,
435 )
436 .await
437 }
438
439 async fn fetch_reward_accounts_v1(
440 &self,
441 instance: &NodeState,
442 height: u64,
443 view: ViewNumber,
444 reward_merkle_tree_root: RewardMerkleCommitmentV1,
445 accounts: Vec<RewardAccountV1>,
446 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
447 (**self)
448 .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
449 .await
450 }
451
452 async fn try_fetch_state_cert(
453 &self,
454 retry: usize,
455 epoch: u64,
456 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
457 (**self).try_fetch_state_cert(retry, epoch).await
458 }
459
460 async fn fetch_state_cert(
461 &self,
462 epoch: u64,
463 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
464 (**self).fetch_state_cert(epoch).await
465 }
466
467 fn backoff(&self) -> &BackoffParams {
468 (**self).backoff()
469 }
470
471 fn name(&self) -> String {
472 (**self).name()
473 }
474
475 fn is_local(&self) -> bool {
476 (**self).is_local()
477 }
478}
479
480#[async_trait]
481pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
482 type Persistence: SequencerPersistence + MembershipPersistence;
483
484 fn set_view_retention(&mut self, view_retention: u64);
485 async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
486 async fn reset(self) -> anyhow::Result<()>;
487}
488
489#[derive(Clone, Copy, Debug, PartialEq, Eq)]
493pub enum EventsPersistenceRead {
494 Complete,
495 UntilL1Block(u64),
496}
497
498pub type StakeTuple = (
500 AuthenticatedValidatorMap,
501 Option<RewardAmount>,
502 Option<StakeTableHash>,
503);
504
505#[async_trait]
506pub trait MembershipPersistence: Send + Sync + 'static {
508 async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<StakeTuple>>;
510
511 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
513
514 async fn store_stake(
516 &self,
517 epoch: EpochNumber,
518 stake: AuthenticatedValidatorMap,
519 block_reward: Option<RewardAmount>,
520 stake_table_hash: Option<StakeTableHash>,
521 ) -> anyhow::Result<()>;
522
523 async fn store_events(
524 &self,
525 l1_finalized: u64,
526 events: Vec<(EventKey, StakeTableEvent)>,
527 ) -> anyhow::Result<()>;
528 async fn load_events(
529 &self,
530 from_l1_block: u64,
531 l1_finalized: u64,
532 ) -> anyhow::Result<(
533 Option<EventsPersistenceRead>,
534 Vec<(EventKey, StakeTableEvent)>,
535 )>;
536
537 async fn delete_stake_tables(&self) -> anyhow::Result<()>;
539
540 async fn store_all_validators(
541 &self,
542 epoch: EpochNumber,
543 all_validators: IndexMap<Address, RegisteredValidator<PubKey>>,
544 ) -> anyhow::Result<()>;
545
546 async fn load_all_validators(
547 &self,
548 epoch: EpochNumber,
549 offset: u64,
550 limit: u64,
551 ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>>;
552}
553
554#[async_trait]
555pub trait SequencerPersistence:
556 Sized + Send + Sync + Clone + 'static + DhtPersistentStorage + MembershipPersistence
557{
558 async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()>;
559
560 fn into_catchup_provider(
562 self,
563 _backoff: BackoffParams,
564 ) -> anyhow::Result<Arc<dyn StateCatchup>> {
565 bail!("state catchup is not implemented for this persistence type");
566 }
567
568 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
573
574 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
576
577 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
579
580 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
582
583 async fn load_quorum_proposals(
585 &self,
586 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
587
588 async fn load_quorum_proposal(
589 &self,
590 view: ViewNumber,
591 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
592
593 async fn load_vid_share(
594 &self,
595 view: ViewNumber,
596 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
597 async fn load_da_proposal(
598 &self,
599 view: ViewNumber,
600 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
601 async fn load_upgrade_certificate(
602 &self,
603 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
604 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
605 async fn load_state_cert(
606 &self,
607 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
608
609 async fn get_state_cert_by_epoch(
611 &self,
612 epoch: u64,
613 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
614
615 async fn insert_state_cert(
617 &self,
618 epoch: u64,
619 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
620 ) -> anyhow::Result<()>;
621
622 async fn load_consensus_state(
629 &self,
630 state: NodeState,
631 upgrade: Upgrade,
632 ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
633 let genesis_validated_state = ValidatedState::genesis(&state).0;
634 let highest_voted_view = match self
635 .load_latest_acted_view()
636 .await
637 .context("loading last voted view")?
638 {
639 Some(view) => {
640 tracing::info!(?view, "starting with last actioned view");
641 view
642 },
643 None => {
644 tracing::info!("no saved view, starting from genesis");
645 ViewNumber::genesis()
646 },
647 };
648
649 let restart_view = match self
650 .load_restart_view()
651 .await
652 .context("loading restart view")?
653 {
654 Some(view) => {
655 tracing::info!(?view, "starting from saved view");
656 view
657 },
658 None => {
659 tracing::info!("no saved view, starting from genesis");
660 ViewNumber::genesis()
661 },
662 };
663 let next_epoch_high_qc = self
664 .load_next_epoch_quorum_certificate()
665 .await
666 .context("loading next epoch qc")?;
667 let (leaf, mut high_qc, anchor_view) = match self
668 .load_anchor_leaf()
669 .await
670 .context("loading anchor leaf")?
671 {
672 Some((leaf, high_qc)) => {
673 tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
674 ensure!(
675 leaf.view_number() == high_qc.view_number,
676 format!(
677 "loaded anchor leaf from view {}, but high QC is from view {}",
678 leaf.view_number(),
679 high_qc.view_number
680 )
681 );
682
683 let anchor_view = leaf.view_number();
684 (leaf, high_qc, Some(anchor_view))
685 },
686 None => {
687 tracing::info!("no saved leaf, starting from genesis leaf");
688 (
689 hotshot_types::data::Leaf2::genesis(
690 &genesis_validated_state,
691 &state,
692 upgrade.base,
693 )
694 .await,
695 QuorumCertificate2::genesis(&genesis_validated_state, &state, upgrade).await,
696 None,
697 )
698 },
699 };
700
701 if let Some((extended_high_qc, _)) = self.load_eqc().await
702 && extended_high_qc.view_number() > high_qc.view_number()
703 {
704 high_qc = extended_high_qc
705 }
706
707 let validated_state = if leaf.block_header().height() == 0 {
708 genesis_validated_state
710 } else {
711 ValidatedState::from_header(leaf.block_header())
714 };
715
716 let restart_view = max(restart_view, leaf.view_number());
721 let epoch = genesis_epoch_from_version(upgrade.base);
723
724 let config = self.load_config().await.context("loading config")?;
725 let epoch_height = config
726 .as_ref()
727 .map(|c| c.config.epoch_height)
728 .unwrap_or_default();
729 let epoch_start_block = config
730 .as_ref()
731 .map(|c| c.config.epoch_start_block)
732 .unwrap_or_default();
733
734 let saved_proposals = self
735 .load_quorum_proposals()
736 .await
737 .context("loading saved proposals")?;
738
739 let upgrade_certificate = self
740 .load_upgrade_certificate()
741 .await
742 .context("loading upgrade certificate")?;
743
744 let start_epoch_info = self
745 .load_start_epoch_info()
746 .await
747 .context("loading start epoch info")?;
748
749 let state_cert = self
750 .load_state_cert()
751 .await
752 .context("loading light client state update certificate")?;
753
754 tracing::warn!(
755 ?leaf,
756 ?restart_view,
757 ?epoch,
758 ?high_qc,
759 ?validated_state,
760 ?state_cert,
761 "loaded consensus state"
762 );
763
764 Ok((
765 HotShotInitializer {
766 instance_state: state,
767 epoch_height,
768 epoch_start_block,
769 anchor_leaf: leaf,
770 anchor_state: Arc::new(validated_state),
771 anchor_state_delta: None,
772 start_view: restart_view,
773 start_epoch: epoch,
774 last_actioned_view: highest_voted_view,
775 saved_proposals,
776 high_qc,
777 next_epoch_high_qc,
778 decided_upgrade_certificate: upgrade_certificate,
779 undecided_leaves: Default::default(),
780 undecided_state: Default::default(),
781 saved_vid_shares: Default::default(), start_epoch_info,
783 state_cert,
784 },
785 anchor_view,
786 ))
787 }
788
789 async fn handle_event(&self, event: &Event, consumer: &(impl EventConsumer + 'static)) {
791 if let EventType::Decide {
792 leaf_chain,
793 committing_qc,
794 deciding_qc,
795 ..
796 } = &event.event
797 {
798 let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else {
799 return;
801 };
802
803 let chain = leaf_chain.iter().zip(
805 std::iter::once((**committing_qc).clone())
807 .chain(leaf_chain.iter().map(|leaf| CertificatePair::for_parent(&leaf.leaf))),
810 );
811
812 if let Err(err) = self
813 .append_decided_leaves(leaf.view_number(), chain, deciding_qc.clone(), consumer)
814 .await
815 {
816 tracing::error!(
817 "failed to save decided leaves, chain may not be up to date: {err:#}"
818 );
819 return;
820 }
821 }
822 }
823
824 async fn append_decided_leaves(
850 &self,
851 decided_view: ViewNumber,
852 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
853 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
854 consumer: &(impl EventConsumer + 'static),
855 ) -> anyhow::Result<()>;
856
857 async fn load_anchor_leaf(
858 &self,
859 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
860 async fn append_vid(
861 &self,
862 proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
863 ) -> anyhow::Result<()>;
864 async fn append_da(
865 &self,
866 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
867 vid_commit: VidCommitment,
868 ) -> anyhow::Result<()>;
869 async fn record_action(
870 &self,
871 view: ViewNumber,
872 epoch: Option<EpochNumber>,
873 action: HotShotAction,
874 ) -> anyhow::Result<()>;
875
876 async fn append_quorum_proposal2(
877 &self,
878 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
879 ) -> anyhow::Result<()>;
880
881 async fn store_eqc(
883 &self,
884 _high_qc: QuorumCertificate2<SeqTypes>,
885 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
886 ) -> anyhow::Result<()>;
887
888 async fn load_eqc(
890 &self,
891 ) -> Option<(
892 QuorumCertificate2<SeqTypes>,
893 NextEpochQuorumCertificate2<SeqTypes>,
894 )>;
895
896 async fn store_upgrade_certificate(
897 &self,
898 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
899 ) -> anyhow::Result<()>;
900
901 async fn migrate_storage(&self) -> anyhow::Result<()> {
902 tracing::warn!("migrating consensus data...");
903
904 self.migrate_anchor_leaf().await?;
905 self.migrate_da_proposals().await?;
906 self.migrate_vid_shares().await?;
907 self.migrate_quorum_proposals().await?;
908 self.migrate_quorum_certificates().await?;
909 self.migrate_reward_merkle_tree_v2()
910 .await
911 .context("failed to migrate reward merkle tree v2")?;
912 self.migrate_x25519_keys().await?;
913 tracing::warn!("consensus storage has been migrated to new types");
914
915 Ok(())
916 }
917
918 async fn migrate_x25519_keys(&self) -> anyhow::Result<()>;
919
920 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
921 async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
922 async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
923 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
924 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
925
926 async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
927 match self.load_anchor_leaf().await? {
928 Some((leaf, _)) => Ok(leaf.view_number()),
929 None => Ok(ViewNumber::genesis()),
930 }
931 }
932
933 async fn store_next_epoch_quorum_certificate(
934 &self,
935 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
936 ) -> anyhow::Result<()>;
937
938 async fn load_next_epoch_quorum_certificate(
939 &self,
940 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
941
942 async fn append_da2(
943 &self,
944 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
945 vid_commit: VidCommitment,
946 ) -> anyhow::Result<()>;
947
948 async fn append_proposal2(
949 &self,
950 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
951 ) -> anyhow::Result<()> {
952 self.append_quorum_proposal2(proposal).await
953 }
954
955 async fn store_drb_result(
956 &self,
957 epoch: EpochNumber,
958 drb_result: DrbResult,
959 ) -> anyhow::Result<()>;
960 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
961 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
962 async fn store_epoch_root(
963 &self,
964 epoch: EpochNumber,
965 block_header: <SeqTypes as NodeType>::BlockHeader,
966 ) -> anyhow::Result<()>;
967 async fn add_state_cert(
968 &self,
969 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
970 ) -> anyhow::Result<()>;
971
972 fn enable_metrics(&mut self, metrics: &dyn Metrics);
973}
974
975#[async_trait]
976pub trait EventConsumer: Debug + Send + Sync {
977 async fn handle_event(&self, event: &Event) -> anyhow::Result<()>;
978}
979
980#[async_trait]
981impl<T> EventConsumer for Box<T>
982where
983 T: EventConsumer + ?Sized,
984{
985 async fn handle_event(&self, event: &Event) -> anyhow::Result<()> {
986 (**self).handle_event(event).await
987 }
988}
989
990#[derive(Clone, Copy, Debug)]
991pub struct NullEventConsumer;
992
993#[async_trait]
994impl EventConsumer for NullEventConsumer {
995 async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> {
996 Ok(())
997 }
998}
999
1000#[async_trait]
1001impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
1002 async fn append_vid(
1003 &self,
1004 proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
1005 ) -> anyhow::Result<()> {
1006 (**self).append_vid(proposal).await
1007 }
1008
1009 async fn append_da(
1010 &self,
1011 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1012 vid_commit: VidCommitment,
1013 ) -> anyhow::Result<()> {
1014 (**self).append_da(proposal, vid_commit).await
1015 }
1016
1017 async fn append_da2(
1018 &self,
1019 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1020 vid_commit: VidCommitment,
1021 ) -> anyhow::Result<()> {
1022 (**self).append_da2(proposal, vid_commit).await
1023 }
1024
1025 async fn record_action(
1026 &self,
1027 view: ViewNumber,
1028 epoch: Option<EpochNumber>,
1029 action: HotShotAction,
1030 ) -> anyhow::Result<()> {
1031 (**self).record_action(view, epoch, action).await
1032 }
1033
1034 async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
1035 Ok(())
1036 }
1037
1038 async fn append_proposal(
1039 &self,
1040 proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
1041 ) -> anyhow::Result<()> {
1042 (**self)
1043 .append_quorum_proposal2(&convert_proposal(proposal.clone()))
1044 .await
1045 }
1046
1047 async fn append_proposal2(
1048 &self,
1049 proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
1050 ) -> anyhow::Result<()> {
1051 let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1052 convert_proposal(proposal.clone());
1053 (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
1054 }
1055
1056 async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
1057 Ok(())
1058 }
1059
1060 async fn update_eqc(
1062 &self,
1063 high_qc: QuorumCertificate2<SeqTypes>,
1064 next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1065 ) -> anyhow::Result<()> {
1066 if let Some((existing_high_qc, _)) = (**self).load_eqc().await
1067 && high_qc.view_number() < existing_high_qc.view_number()
1068 {
1069 return Ok(());
1070 }
1071
1072 (**self).store_eqc(high_qc, next_epoch_high_qc).await
1073 }
1074
1075 async fn update_next_epoch_high_qc2(
1076 &self,
1077 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1078 ) -> anyhow::Result<()> {
1079 Ok(())
1080 }
1081
1082 async fn update_decided_upgrade_certificate(
1083 &self,
1084 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1085 ) -> anyhow::Result<()> {
1086 (**self)
1087 .store_upgrade_certificate(decided_upgrade_certificate)
1088 .await
1089 }
1090
1091 async fn store_drb_result(
1092 &self,
1093 epoch: EpochNumber,
1094 drb_result: DrbResult,
1095 ) -> anyhow::Result<()> {
1096 (**self).store_drb_result(epoch, drb_result).await
1097 }
1098
1099 async fn store_epoch_root(
1100 &self,
1101 epoch: EpochNumber,
1102 block_header: <SeqTypes as NodeType>::BlockHeader,
1103 ) -> anyhow::Result<()> {
1104 (**self).store_epoch_root(epoch, block_header).await
1105 }
1106
1107 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1108 (**self).store_drb_input(drb_input).await
1109 }
1110
1111 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1112 (**self).load_drb_input(epoch).await
1113 }
1114
1115 async fn update_state_cert(
1116 &self,
1117 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1118 ) -> anyhow::Result<()> {
1119 (**self).add_state_cert(state_cert).await
1120 }
1121}
1122
1123pub trait FromNsPayloadBytes<'a> {
1128 fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1130}
1131
1132pub trait NsPayloadBytesRange<'a> {
1137 type Output: FromNsPayloadBytes<'a>;
1138
1139 fn ns_payload_range(&self) -> Range<usize>;
1141}
1142
1143pub trait FromStringOrInteger: Sized {
1154 type Binary: Serialize + DeserializeOwned;
1155 type Integer: Serialize + DeserializeOwned;
1156
1157 fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1158 fn from_string(s: String) -> anyhow::Result<Self>;
1159 fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1160
1161 fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1162 fn to_string(&self) -> anyhow::Result<String>;
1163}