1use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc};
4
5use alloy::primitives::{Address, U256};
6use anyhow::{Context, bail, ensure};
7use async_trait::async_trait;
8use committable::Commitment;
9use futures::{FutureExt, TryFutureExt};
10use hotshot::{HotShotInitializer, InitializerEpochInfo, types::EventType};
11use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
12use hotshot_new_protocol::{message::Certificate2, storage::NewProtocolStorage};
13use hotshot_types::{
14 data::{
15 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposal2,
16 QuorumProposalWrapper, VidCommitment, VidDisperseShare, ViewNumber,
17 },
18 drb::{DrbInput, DrbResult},
19 event::{HotShotAction, LeafInfo},
20 message::{Proposal, convert_proposal},
21 new_protocol::CoordinatorEvent,
22 simple_certificate::{
23 CertificatePair, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
24 QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
25 },
26 stake_table::HSStakeTable,
27 traits::{
28 ValidatedState as HotShotState, metrics::Metrics, node_implementation::NodeType,
29 storage::Storage,
30 },
31 utils::genesis_epoch_from_version,
32 vote::HasViewNumber,
33};
34use indexmap::IndexMap;
35use serde::{Serialize, de::DeserializeOwned};
36use versions::Upgrade;
37
38use super::{
39 impls::NodeState,
40 utils::BackoffParams,
41 v0_3::{EventKey, IndexedStake, StakeTableEvent},
42};
43use crate::{
44 AuthenticatedValidatorMap, BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment,
45 Leaf2, NetworkConfig, PubKey, SeqTypes,
46 v0::impls::{StakeTableHash, ValidatedState},
47 v0_3::{
48 ChainConfig, RegisteredValidator, RewardAccountProofV1, RewardAccountV1, RewardAmount,
49 RewardMerkleCommitmentV1,
50 },
51 v0_4::{PermittedRewardMerkleTreeV2, RewardAccountV2, RewardMerkleCommitmentV2},
52};
53
54#[async_trait]
55pub trait StateCatchup: Send + Sync {
56 async fn try_fetch_leaf(
58 &self,
59 retry: usize,
60 height: u64,
61 stake_table: HSStakeTable<SeqTypes>,
62 success_threshold: U256,
63 ) -> anyhow::Result<Leaf2>;
64
65 async fn fetch_leaf(
67 &self,
68 height: u64,
69 stake_table: HSStakeTable<SeqTypes>,
70 success_threshold: U256,
71 ) -> anyhow::Result<Leaf2> {
72 self.backoff()
73 .retry(self, |provider, retry| {
74 let stake_table_clone = stake_table.clone();
75 async move {
76 provider
77 .try_fetch_leaf(retry, height, stake_table_clone, success_threshold)
78 .await
79 }
80 .boxed()
81 })
82 .await
83 }
84
85 async fn try_fetch_accounts(
87 &self,
88 retry: usize,
89 instance: &NodeState,
90 height: u64,
91 view: ViewNumber,
92 fee_merkle_tree_root: FeeMerkleCommitment,
93 accounts: &[FeeAccount],
94 ) -> anyhow::Result<Vec<FeeAccountProof>>;
95
96 async fn fetch_accounts(
98 &self,
99 instance: &NodeState,
100 height: u64,
101 view: ViewNumber,
102 fee_merkle_tree_root: FeeMerkleCommitment,
103 accounts: Vec<FeeAccount>,
104 ) -> anyhow::Result<Vec<FeeAccountProof>> {
105 self.backoff()
106 .retry(self, |provider, retry| {
107 let accounts = &accounts;
108 async move {
109 provider
110 .try_fetch_accounts(
111 retry,
112 instance,
113 height,
114 view,
115 fee_merkle_tree_root,
116 accounts,
117 )
118 .await
119 .map_err(|err| {
120 err.context(format!(
121 "fetching accounts {accounts:?}, height {height}, view {view}"
122 ))
123 })
124 }
125 .boxed()
126 })
127 .await
128 }
129
130 async fn try_remember_blocks_merkle_tree(
132 &self,
133 retry: usize,
134 instance: &NodeState,
135 height: u64,
136 view: ViewNumber,
137 mt: &mut BlockMerkleTree,
138 ) -> anyhow::Result<()>;
139
140 async fn remember_blocks_merkle_tree(
142 &self,
143 instance: &NodeState,
144 height: u64,
145 view: ViewNumber,
146 mt: &mut BlockMerkleTree,
147 ) -> anyhow::Result<()> {
148 self.backoff()
149 .retry(mt, |mt, retry| {
150 self.try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
151 .map_err(|err| err.context(format!("fetching frontier using {}", self.name())))
152 .boxed()
153 })
154 .await
155 }
156
157 async fn try_fetch_chain_config(
159 &self,
160 retry: usize,
161 commitment: Commitment<ChainConfig>,
162 ) -> anyhow::Result<ChainConfig>;
163
164 async fn fetch_chain_config(
166 &self,
167 commitment: Commitment<ChainConfig>,
168 ) -> anyhow::Result<ChainConfig> {
169 self.backoff()
170 .retry(self, |provider, retry| {
171 provider
172 .try_fetch_chain_config(retry, commitment)
173 .map_err(|err| err.context("fetching chain config"))
174 .boxed()
175 })
176 .await
177 }
178
179 async fn try_fetch_reward_merkle_tree_v2(
181 &self,
182 retry: usize,
183 height: u64,
184 view: ViewNumber,
185 reward_merkle_tree_root: RewardMerkleCommitmentV2,
186 accounts: Arc<Vec<RewardAccountV2>>,
187 ) -> anyhow::Result<PermittedRewardMerkleTreeV2>;
188
189 async fn fetch_reward_merkle_tree_v2(
190 &self,
191 height: u64,
192 view: ViewNumber,
193 reward_merkle_tree_root: RewardMerkleCommitmentV2,
194 accounts: Arc<Vec<RewardAccountV2>>,
195 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
196 self.backoff()
197 .retry(self, |provider, retry| {
198 let accounts = accounts.clone();
199 async move {
200 provider
201 .try_fetch_reward_merkle_tree_v2(
202 retry,
203 height,
204 view,
205 reward_merkle_tree_root,
206 accounts,
207 )
208 .await
209 .map_err(|err| {
210 err.context(format!("fetching reward merkle tree for height {height}"))
211 })
212 }
213 .boxed()
214 })
215 .await
216 }
217
218 async fn try_fetch_reward_accounts_v1(
220 &self,
221 retry: usize,
222 instance: &NodeState,
223 height: u64,
224 view: ViewNumber,
225 reward_merkle_tree_root: RewardMerkleCommitmentV1,
226 accounts: &[RewardAccountV1],
227 ) -> anyhow::Result<Vec<RewardAccountProofV1>>;
228
229 async fn fetch_reward_accounts_v1(
231 &self,
232 instance: &NodeState,
233 height: u64,
234 view: ViewNumber,
235 reward_merkle_tree_root: RewardMerkleCommitmentV1,
236 accounts: Vec<RewardAccountV1>,
237 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
238 self.backoff()
239 .retry(self, |provider, retry| {
240 let accounts = &accounts;
241 async move {
242 provider
243 .try_fetch_reward_accounts_v1(
244 retry,
245 instance,
246 height,
247 view,
248 reward_merkle_tree_root,
249 accounts,
250 )
251 .await
252 .map_err(|err| {
253 err.context(format!(
254 "fetching v1 reward accounts {accounts:?}, height {height}, view \
255 {view}"
256 ))
257 })
258 }
259 .boxed()
260 })
261 .await
262 }
263
264 async fn try_fetch_state_cert(
266 &self,
267 retry: usize,
268 epoch: u64,
269 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>>;
270
271 async fn fetch_state_cert(
273 &self,
274 epoch: u64,
275 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
276 self.backoff()
277 .retry(self, |provider, retry| {
278 provider
279 .try_fetch_state_cert(retry, epoch)
280 .map_err(|err| err.context(format!("fetching state cert for epoch {epoch}")))
281 .boxed()
282 })
283 .await
284 }
285
286 fn is_local(&self) -> bool;
288
289 fn backoff(&self) -> &BackoffParams;
291
292 fn name(&self) -> String;
294}
295
296#[async_trait]
297impl<T: StateCatchup + ?Sized> StateCatchup for Arc<T> {
298 async fn try_fetch_leaf(
299 &self,
300 retry: usize,
301 height: u64,
302 stake_table: HSStakeTable<SeqTypes>,
303 success_threshold: U256,
304 ) -> anyhow::Result<Leaf2> {
305 (**self)
306 .try_fetch_leaf(retry, height, stake_table, success_threshold)
307 .await
308 }
309
310 async fn fetch_leaf(
311 &self,
312 height: u64,
313 stake_table: HSStakeTable<SeqTypes>,
314 success_threshold: U256,
315 ) -> anyhow::Result<Leaf2> {
316 (**self)
317 .fetch_leaf(height, stake_table, success_threshold)
318 .await
319 }
320
321 async fn try_fetch_accounts(
322 &self,
323 retry: usize,
324 instance: &NodeState,
325 height: u64,
326 view: ViewNumber,
327 fee_merkle_tree_root: FeeMerkleCommitment,
328 accounts: &[FeeAccount],
329 ) -> anyhow::Result<Vec<FeeAccountProof>> {
330 (**self)
331 .try_fetch_accounts(
332 retry,
333 instance,
334 height,
335 view,
336 fee_merkle_tree_root,
337 accounts,
338 )
339 .await
340 }
341
342 async fn fetch_accounts(
343 &self,
344 instance: &NodeState,
345 height: u64,
346 view: ViewNumber,
347 fee_merkle_tree_root: FeeMerkleCommitment,
348 accounts: Vec<FeeAccount>,
349 ) -> anyhow::Result<Vec<FeeAccountProof>> {
350 (**self)
351 .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts)
352 .await
353 }
354
355 async fn try_remember_blocks_merkle_tree(
356 &self,
357 retry: usize,
358 instance: &NodeState,
359 height: u64,
360 view: ViewNumber,
361 mt: &mut BlockMerkleTree,
362 ) -> anyhow::Result<()> {
363 (**self)
364 .try_remember_blocks_merkle_tree(retry, instance, height, view, mt)
365 .await
366 }
367
368 async fn remember_blocks_merkle_tree(
369 &self,
370 instance: &NodeState,
371 height: u64,
372 view: ViewNumber,
373 mt: &mut BlockMerkleTree,
374 ) -> anyhow::Result<()> {
375 (**self)
376 .remember_blocks_merkle_tree(instance, height, view, mt)
377 .await
378 }
379
380 async fn try_fetch_chain_config(
381 &self,
382 retry: usize,
383 commitment: Commitment<ChainConfig>,
384 ) -> anyhow::Result<ChainConfig> {
385 (**self).try_fetch_chain_config(retry, commitment).await
386 }
387
388 async fn fetch_chain_config(
389 &self,
390 commitment: Commitment<ChainConfig>,
391 ) -> anyhow::Result<ChainConfig> {
392 (**self).fetch_chain_config(commitment).await
393 }
394
395 async fn try_fetch_reward_merkle_tree_v2(
396 &self,
397 retry: usize,
398 height: u64,
399 view: ViewNumber,
400 reward_merkle_tree_root: RewardMerkleCommitmentV2,
401 accounts: Arc<Vec<RewardAccountV2>>,
402 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
403 (**self)
404 .try_fetch_reward_merkle_tree_v2(retry, height, view, reward_merkle_tree_root, accounts)
405 .await
406 }
407
408 async fn fetch_reward_merkle_tree_v2(
409 &self,
410 height: u64,
411 view: ViewNumber,
412 reward_merkle_tree_root: RewardMerkleCommitmentV2,
413 accounts: Arc<Vec<RewardAccountV2>>,
414 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
415 (**self)
416 .fetch_reward_merkle_tree_v2(height, view, reward_merkle_tree_root, accounts)
417 .await
418 }
419
420 async fn try_fetch_reward_accounts_v1(
421 &self,
422 retry: usize,
423 instance: &NodeState,
424 height: u64,
425 view: ViewNumber,
426 reward_merkle_tree_root: RewardMerkleCommitmentV1,
427 accounts: &[RewardAccountV1],
428 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
429 (**self)
430 .try_fetch_reward_accounts_v1(
431 retry,
432 instance,
433 height,
434 view,
435 reward_merkle_tree_root,
436 accounts,
437 )
438 .await
439 }
440
441 async fn fetch_reward_accounts_v1(
442 &self,
443 instance: &NodeState,
444 height: u64,
445 view: ViewNumber,
446 reward_merkle_tree_root: RewardMerkleCommitmentV1,
447 accounts: Vec<RewardAccountV1>,
448 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
449 (**self)
450 .fetch_reward_accounts_v1(instance, height, view, reward_merkle_tree_root, accounts)
451 .await
452 }
453
454 async fn try_fetch_state_cert(
455 &self,
456 retry: usize,
457 epoch: u64,
458 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
459 (**self).try_fetch_state_cert(retry, epoch).await
460 }
461
462 async fn fetch_state_cert(
463 &self,
464 epoch: u64,
465 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
466 (**self).fetch_state_cert(epoch).await
467 }
468
469 fn backoff(&self) -> &BackoffParams {
470 (**self).backoff()
471 }
472
473 fn name(&self) -> String {
474 (**self).name()
475 }
476
477 fn is_local(&self) -> bool {
478 (**self).is_local()
479 }
480}
481
482#[async_trait]
483pub trait PersistenceOptions: Clone + Send + Sync + Debug + 'static {
484 type Persistence: SequencerPersistence + MembershipPersistence;
485
486 fn set_view_retention(&mut self, view_retention: u64);
487 async fn create(&mut self) -> anyhow::Result<Self::Persistence>;
488 async fn reset(self) -> anyhow::Result<()>;
489}
490
491#[derive(Clone, Copy, Debug, PartialEq, Eq)]
495pub enum EventsPersistenceRead {
496 Complete,
497 UntilL1Block(u64),
498}
499
500pub type StakeTuple = (
502 AuthenticatedValidatorMap,
503 Option<RewardAmount>,
504 Option<StakeTableHash>,
505);
506
507#[async_trait]
508pub trait MembershipPersistence: Send + Sync + 'static {
510 async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<StakeTuple>>;
512
513 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>>;
515
516 async fn store_stake(
518 &self,
519 epoch: EpochNumber,
520 stake: AuthenticatedValidatorMap,
521 block_reward: Option<RewardAmount>,
522 stake_table_hash: Option<StakeTableHash>,
523 ) -> anyhow::Result<()>;
524
525 async fn store_events(
526 &self,
527 l1_finalized: u64,
528 events: Vec<(EventKey, StakeTableEvent)>,
529 ) -> anyhow::Result<()>;
530 async fn load_events(
531 &self,
532 from_l1_block: u64,
533 l1_finalized: u64,
534 ) -> anyhow::Result<(
535 Option<EventsPersistenceRead>,
536 Vec<(EventKey, StakeTableEvent)>,
537 )>;
538
539 async fn delete_stake_tables(&self) -> anyhow::Result<()>;
541
542 async fn store_all_validators(
543 &self,
544 epoch: EpochNumber,
545 all_validators: IndexMap<Address, RegisteredValidator<PubKey>>,
546 ) -> anyhow::Result<()>;
547
548 async fn load_all_validators(
549 &self,
550 epoch: EpochNumber,
551 offset: u64,
552 limit: u64,
553 ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>>;
554}
555
556#[async_trait]
557pub trait SequencerPersistence:
558 Sized + Send + Sync + Clone + 'static + DhtPersistentStorage + MembershipPersistence
559{
560 async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()>;
561
562 fn into_catchup_provider(
564 self,
565 _backoff: BackoffParams,
566 ) -> anyhow::Result<Arc<dyn StateCatchup>> {
567 bail!("state catchup is not implemented for this persistence type");
568 }
569
570 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>>;
575
576 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()>;
578
579 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>>;
581
582 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>>;
584
585 async fn load_quorum_proposals(
587 &self,
588 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>;
589
590 async fn load_quorum_proposal(
591 &self,
592 view: ViewNumber,
593 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>;
594
595 async fn load_vid_share(
596 &self,
597 view: ViewNumber,
598 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>>;
599 async fn load_da_proposal(
600 &self,
601 view: ViewNumber,
602 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>>;
603 async fn load_upgrade_certificate(
604 &self,
605 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>>;
606 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>>;
607 async fn load_state_cert(
608 &self,
609 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
610
611 async fn get_state_cert_by_epoch(
613 &self,
614 epoch: u64,
615 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>>;
616
617 async fn insert_state_cert(
619 &self,
620 epoch: u64,
621 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
622 ) -> anyhow::Result<()>;
623
624 async fn load_consensus_state(
631 &self,
632 state: NodeState,
633 upgrade: Upgrade,
634 ) -> anyhow::Result<(HotShotInitializer<SeqTypes>, Option<ViewNumber>)> {
635 let genesis_validated_state = ValidatedState::genesis(&state).0;
636 let highest_voted_view = match self
637 .load_latest_acted_view()
638 .await
639 .context("loading last voted view")?
640 {
641 Some(view) => {
642 tracing::info!(?view, "starting with last actioned view");
643 view
644 },
645 None => {
646 tracing::info!("no saved view, starting from genesis");
647 ViewNumber::genesis()
648 },
649 };
650
651 let restart_view = match self
652 .load_restart_view()
653 .await
654 .context("loading restart view")?
655 {
656 Some(view) => {
657 tracing::info!(?view, "starting from saved view");
658 view
659 },
660 None => {
661 tracing::info!("no saved view, starting from genesis");
662 ViewNumber::genesis()
663 },
664 };
665 let next_epoch_high_qc = self
666 .load_next_epoch_quorum_certificate()
667 .await
668 .context("loading next epoch qc")?;
669 let (leaf, mut high_qc, anchor_view) = match self
670 .load_anchor_leaf()
671 .await
672 .context("loading anchor leaf")?
673 {
674 Some((leaf, high_qc)) => {
675 tracing::info!(?leaf, ?high_qc, "starting from saved leaf");
676 ensure!(
677 leaf.view_number() == high_qc.view_number,
678 format!(
679 "loaded anchor leaf from view {}, but high QC is from view {}",
680 leaf.view_number(),
681 high_qc.view_number
682 )
683 );
684
685 let anchor_view = leaf.view_number();
686 (leaf, high_qc, Some(anchor_view))
687 },
688 None => {
689 tracing::info!("no saved leaf, starting from genesis leaf");
690 (
691 hotshot_types::data::Leaf2::genesis(
692 &genesis_validated_state,
693 &state,
694 upgrade.base,
695 )
696 .await,
697 QuorumCertificate2::genesis(&genesis_validated_state, &state, upgrade).await,
698 None,
699 )
700 },
701 };
702
703 if let Some((extended_high_qc, _)) = self.load_eqc().await
704 && extended_high_qc.view_number() > high_qc.view_number()
705 {
706 high_qc = extended_high_qc
707 }
708
709 let validated_state = if leaf.block_header().height() == 0 {
710 genesis_validated_state
712 } else {
713 ValidatedState::from_header(leaf.block_header())
716 };
717
718 let restart_view = max(restart_view, leaf.view_number());
723 let epoch = genesis_epoch_from_version(upgrade.base);
725
726 let config = self.load_config().await.context("loading config")?;
727 let epoch_height = config
728 .as_ref()
729 .map(|c| c.config.epoch_height)
730 .unwrap_or_default();
731 let epoch_start_block = config
732 .as_ref()
733 .map(|c| c.config.epoch_start_block)
734 .unwrap_or_default();
735
736 let saved_proposals = self
737 .load_quorum_proposals()
738 .await
739 .context("loading saved proposals")?;
740
741 let upgrade_certificate = self
742 .load_upgrade_certificate()
743 .await
744 .context("loading upgrade certificate")?;
745
746 let start_epoch_info = self
747 .load_start_epoch_info()
748 .await
749 .context("loading start epoch info")?;
750
751 let state_cert = self
752 .load_state_cert()
753 .await
754 .context("loading light client state update certificate")?;
755
756 tracing::warn!(
757 ?leaf,
758 ?restart_view,
759 ?epoch,
760 ?high_qc,
761 ?validated_state,
762 ?state_cert,
763 "loaded consensus state"
764 );
765
766 Ok((
767 HotShotInitializer {
768 instance_state: state,
769 epoch_height,
770 epoch_start_block,
771 anchor_leaf: leaf,
772 anchor_state: Arc::new(validated_state),
773 anchor_state_delta: None,
774 start_view: restart_view,
775 start_epoch: epoch,
776 last_actioned_view: highest_voted_view,
777 saved_proposals,
778 high_qc,
779 next_epoch_high_qc,
780 decided_upgrade_certificate: upgrade_certificate,
781 undecided_leaves: Default::default(),
782 undecided_state: Default::default(),
783 saved_vid_shares: Default::default(), start_epoch_info,
785 state_cert,
786 },
787 anchor_view,
788 ))
789 }
790
791 async fn persist_event(
799 &self,
800 event: &CoordinatorEvent<SeqTypes>,
801 consumer: &(impl EventConsumer + 'static),
802 ) -> Option<(ViewNumber, Option<Arc<CertificatePair<SeqTypes>>>)> {
803 match event {
804 CoordinatorEvent::LegacyEvent(hotshot_event) => {
805 let EventType::Decide {
806 leaf_chain,
807 committing_qc,
808 deciding_qc,
809 ..
810 } = &hotshot_event.event
811 else {
812 return None;
813 };
814 let LeafInfo { leaf, .. } = leaf_chain.first()?;
815 let decided_view = leaf.view_number();
816
817 let chain = leaf_chain.iter().zip(
818 std::iter::once((**committing_qc).clone()).chain(
819 leaf_chain
820 .iter()
821 .map(|leaf| CertificatePair::for_parent(&leaf.leaf)),
822 ),
823 );
824
825 if let Err(err) = self
826 .persist_decided_leaves(decided_view, chain, deciding_qc.clone(), consumer)
827 .await
828 {
829 tracing::error!(
830 "failed to save decided leaves, chain may not be up to date: {err:#}"
831 );
832 return None;
833 }
834 Some((decided_view, deciding_qc.clone()))
835 },
836 CoordinatorEvent::NewDecide {
837 leaf_infos, cert1, ..
838 } => {
839 let first = leaf_infos.first()?;
840 let decided_view = first.leaf.view_number();
841
842 let certifying_qcs = std::iter::once(cert1.clone())
845 .chain(leaf_infos.iter().map(|info| info.leaf.justify_qc()))
846 .take(leaf_infos.len())
847 .map(CertificatePair::non_epoch_change);
848
849 if let Err(err) = self
850 .persist_decided_leaves(
851 decided_view,
852 leaf_infos.iter().zip(certifying_qcs),
853 None,
854 consumer,
855 )
856 .await
857 {
858 tracing::error!(
859 "failed to save decided leaves from new protocol, chain may not be up to \
860 date: {err:#}"
861 );
862 return None;
863 }
864 Some((decided_view, None))
865 },
866 _ => None,
867 }
868 }
869
870 async fn append_decided_leaves(
900 &self,
901 decided_view: ViewNumber,
902 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
903 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
904 consumer: &(impl EventConsumer + 'static),
905 ) -> anyhow::Result<()> {
906 self.persist_decided_leaves(decided_view, leaf_chain, deciding_qc.clone(), consumer)
907 .await?;
908 if let Err(err) = self
910 .process_decided_events(decided_view, deciding_qc, consumer)
911 .await
912 {
913 tracing::warn!(?decided_view, "decide event processing failed: {err:#}");
914 }
915 Ok(())
916 }
917
918 async fn persist_decided_leaves(
923 &self,
924 decided_view: ViewNumber,
925 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
926 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
927 consumer: &(impl EventConsumer + 'static),
928 ) -> anyhow::Result<()>;
929
930 async fn process_decided_events(
941 &self,
942 decided_view: ViewNumber,
943 _deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
944 _consumer: &(impl EventConsumer + 'static),
945 ) -> anyhow::Result<Option<ViewNumber>> {
946 Ok(Some(decided_view))
947 }
948
949 async fn load_anchor_leaf(
950 &self,
951 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>>;
952 async fn append_vid(
953 &self,
954 proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
955 ) -> anyhow::Result<()>;
956 async fn append_da(
957 &self,
958 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
959 vid_commit: VidCommitment,
960 ) -> anyhow::Result<()>;
961 async fn record_action(
962 &self,
963 view: ViewNumber,
964 epoch: Option<EpochNumber>,
965 action: HotShotAction,
966 ) -> anyhow::Result<()>;
967
968 async fn append_quorum_proposal2(
969 &self,
970 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
971 ) -> anyhow::Result<()>;
972
973 async fn append_cert2(
975 &self,
976 _view: ViewNumber,
977 _cert2: Certificate2<SeqTypes>,
978 ) -> anyhow::Result<()> {
979 Ok(())
980 }
981
982 async fn load_cert2(
984 &self,
985 _view: ViewNumber,
986 ) -> anyhow::Result<Option<Certificate2<SeqTypes>>> {
987 Ok(None)
988 }
989
990 async fn store_eqc(
992 &self,
993 _high_qc: QuorumCertificate2<SeqTypes>,
994 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
995 ) -> anyhow::Result<()>;
996
997 async fn load_eqc(
999 &self,
1000 ) -> Option<(
1001 QuorumCertificate2<SeqTypes>,
1002 NextEpochQuorumCertificate2<SeqTypes>,
1003 )>;
1004
1005 async fn store_upgrade_certificate(
1006 &self,
1007 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1008 ) -> anyhow::Result<()>;
1009
1010 async fn migrate_storage(&self) -> anyhow::Result<()> {
1011 tracing::warn!("migrating consensus data...");
1012
1013 self.migrate_anchor_leaf().await?;
1014 self.migrate_da_proposals().await?;
1015 self.migrate_vid_shares().await?;
1016 self.migrate_quorum_proposals().await?;
1017 self.migrate_quorum_certificates().await?;
1018 self.migrate_reward_merkle_tree_v2()
1019 .await
1020 .context("failed to migrate reward merkle tree v2")?;
1021 self.migrate_x25519_keys().await?;
1022 tracing::warn!("consensus storage has been migrated to new types");
1023
1024 Ok(())
1025 }
1026
1027 async fn migrate_x25519_keys(&self) -> anyhow::Result<()>;
1028
1029 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()>;
1030 async fn migrate_da_proposals(&self) -> anyhow::Result<()>;
1031 async fn migrate_vid_shares(&self) -> anyhow::Result<()>;
1032 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()>;
1033 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()>;
1034
1035 async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
1036 match self.load_anchor_leaf().await? {
1037 Some((leaf, _)) => Ok(leaf.view_number()),
1038 None => Ok(ViewNumber::genesis()),
1039 }
1040 }
1041
1042 async fn store_next_epoch_quorum_certificate(
1043 &self,
1044 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1045 ) -> anyhow::Result<()>;
1046
1047 async fn load_next_epoch_quorum_certificate(
1048 &self,
1049 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>>;
1050
1051 async fn append_da2(
1052 &self,
1053 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1054 vid_commit: VidCommitment,
1055 ) -> anyhow::Result<()>;
1056
1057 async fn append_proposal2(
1058 &self,
1059 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1060 ) -> anyhow::Result<()> {
1061 self.append_quorum_proposal2(proposal).await
1062 }
1063
1064 async fn store_drb_result(
1065 &self,
1066 epoch: EpochNumber,
1067 drb_result: DrbResult,
1068 ) -> anyhow::Result<()>;
1069 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()>;
1070 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput>;
1071 async fn store_epoch_root(
1072 &self,
1073 epoch: EpochNumber,
1074 block_header: <SeqTypes as NodeType>::BlockHeader,
1075 ) -> anyhow::Result<()>;
1076 async fn add_state_cert(
1077 &self,
1078 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1079 ) -> anyhow::Result<()>;
1080
1081 fn enable_metrics(&mut self, metrics: &dyn Metrics);
1082}
1083
1084#[async_trait]
1085pub trait EventConsumer: Debug + Send + Sync {
1086 async fn handle_event(&self, event: &CoordinatorEvent<SeqTypes>) -> anyhow::Result<()>;
1087}
1088
1089#[async_trait]
1090impl<T> EventConsumer for Box<T>
1091where
1092 T: EventConsumer + ?Sized,
1093{
1094 async fn handle_event(&self, event: &CoordinatorEvent<SeqTypes>) -> anyhow::Result<()> {
1095 (**self).handle_event(event).await
1096 }
1097}
1098
1099#[derive(Clone, Copy, Debug)]
1100pub struct NullEventConsumer;
1101
1102#[async_trait]
1103impl EventConsumer for NullEventConsumer {
1104 async fn handle_event(&self, _event: &CoordinatorEvent<SeqTypes>) -> anyhow::Result<()> {
1105 Ok(())
1106 }
1107}
1108
1109#[async_trait]
1110impl<P: SequencerPersistence> Storage<SeqTypes> for Arc<P> {
1111 async fn append_vid(
1112 &self,
1113 proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
1114 ) -> anyhow::Result<()> {
1115 (**self).append_vid(proposal).await
1116 }
1117
1118 async fn append_da(
1119 &self,
1120 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1121 vid_commit: VidCommitment,
1122 ) -> anyhow::Result<()> {
1123 (**self).append_da(proposal, vid_commit).await
1124 }
1125
1126 async fn append_da2(
1127 &self,
1128 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
1129 vid_commit: VidCommitment,
1130 ) -> anyhow::Result<()> {
1131 (**self).append_da2(proposal, vid_commit).await
1132 }
1133
1134 async fn record_action(
1135 &self,
1136 view: ViewNumber,
1137 epoch: Option<EpochNumber>,
1138 action: HotShotAction,
1139 ) -> anyhow::Result<()> {
1140 (**self).record_action(view, epoch, action).await
1141 }
1142
1143 async fn update_high_qc(&self, _high_qc: QuorumCertificate<SeqTypes>) -> anyhow::Result<()> {
1144 Ok(())
1145 }
1146
1147 async fn append_proposal(
1148 &self,
1149 proposal: &Proposal<SeqTypes, QuorumProposal<SeqTypes>>,
1150 ) -> anyhow::Result<()> {
1151 (**self)
1152 .append_quorum_proposal2(&convert_proposal(proposal.clone()))
1153 .await
1154 }
1155
1156 async fn append_proposal2(
1157 &self,
1158 proposal: &Proposal<SeqTypes, QuorumProposal2<SeqTypes>>,
1159 ) -> anyhow::Result<()> {
1160 let proposal_qp_wrapper: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1161 convert_proposal(proposal.clone());
1162 (**self).append_quorum_proposal2(&proposal_qp_wrapper).await
1163 }
1164
1165 async fn update_high_qc2(&self, _high_qc: QuorumCertificate2<SeqTypes>) -> anyhow::Result<()> {
1166 Ok(())
1167 }
1168
1169 async fn update_eqc(
1171 &self,
1172 high_qc: QuorumCertificate2<SeqTypes>,
1173 next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1174 ) -> anyhow::Result<()> {
1175 if let Some((existing_high_qc, _)) = (**self).load_eqc().await
1176 && high_qc.view_number() < existing_high_qc.view_number()
1177 {
1178 return Ok(());
1179 }
1180
1181 (**self).store_eqc(high_qc, next_epoch_high_qc).await
1182 }
1183
1184 async fn update_next_epoch_high_qc2(
1185 &self,
1186 _next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
1187 ) -> anyhow::Result<()> {
1188 Ok(())
1189 }
1190
1191 async fn update_decided_upgrade_certificate(
1192 &self,
1193 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1194 ) -> anyhow::Result<()> {
1195 (**self)
1196 .store_upgrade_certificate(decided_upgrade_certificate)
1197 .await
1198 }
1199
1200 async fn store_drb_result(
1201 &self,
1202 epoch: EpochNumber,
1203 drb_result: DrbResult,
1204 ) -> anyhow::Result<()> {
1205 (**self).store_drb_result(epoch, drb_result).await
1206 }
1207
1208 async fn store_epoch_root(
1209 &self,
1210 epoch: EpochNumber,
1211 block_header: <SeqTypes as NodeType>::BlockHeader,
1212 ) -> anyhow::Result<()> {
1213 (**self).store_epoch_root(epoch, block_header).await
1214 }
1215
1216 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
1217 (**self).store_drb_input(drb_input).await
1218 }
1219
1220 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
1221 (**self).load_drb_input(epoch).await
1222 }
1223
1224 async fn update_state_cert(
1225 &self,
1226 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
1227 ) -> anyhow::Result<()> {
1228 (**self).add_state_cert(state_cert).await
1229 }
1230}
1231
1232#[async_trait]
1233impl<P: SequencerPersistence> NewProtocolStorage<SeqTypes> for Arc<P> {
1234 async fn append_cert2(
1235 &self,
1236 view: ViewNumber,
1237 cert: Certificate2<SeqTypes>,
1238 ) -> anyhow::Result<()> {
1239 (**self).append_cert2(view, cert).await
1240 }
1241}
1242
1243pub trait FromNsPayloadBytes<'a> {
1248 fn from_payload_bytes(bytes: &'a [u8]) -> Self;
1250}
1251
1252pub trait NsPayloadBytesRange<'a> {
1257 type Output: FromNsPayloadBytes<'a>;
1258
1259 fn ns_payload_range(&self) -> Range<usize>;
1261}
1262
1263pub trait FromStringOrInteger: Sized {
1274 type Binary: Serialize + DeserializeOwned;
1275 type Integer: Serialize + DeserializeOwned;
1276
1277 fn from_binary(b: Self::Binary) -> anyhow::Result<Self>;
1278 fn from_string(s: String) -> anyhow::Result<Self>;
1279 fn from_integer(i: Self::Integer) -> anyhow::Result<Self>;
1280
1281 fn to_binary(&self) -> anyhow::Result<Self::Binary>;
1282 fn to_string(&self) -> anyhow::Result<String>;
1283}