Skip to main content

hotshot/
lib.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Provides a generic rust implementation of the `HotShot` BFT protocol
8//!
9
10// Documentation module
11#[cfg(feature = "docs")]
12pub mod documentation;
13
14use committable::Committable;
15use futures::future::{Either, select};
16use hotshot_types::{
17    drb::{DrbResult, INITIAL_DRB_RESULT, drb_difficulty_selector},
18    epoch_membership::EpochMembershipCoordinator,
19    message::UpgradeLock,
20    simple_certificate::{CertificatePair, LightClientStateUpdateCertificateV2},
21    traits::{
22        block_contents::BlockHeader, election::Membership, network::BroadcastDelay,
23        signature_key::StateSignatureKey, storage::Storage,
24    },
25    utils::{epoch_from_block_number, is_ge_epoch_root},
26};
27use rand::Rng;
28
29/// Contains traits consumed by [`SystemContext`]
30pub mod traits;
31/// Contains types used by the crate
32pub mod types;
33
34pub mod tasks;
35use hotshot_types::data::QuorumProposalWrapper;
36use versions::{EPOCH_VERSION, Upgrade};
37
38/// Contains helper functions for the crate
39pub mod helpers;
40
41use std::{
42    collections::{BTreeMap, HashMap},
43    num::NonZeroUsize,
44    sync::Arc,
45    time::Duration,
46};
47
48use alloy::primitives::U256;
49use async_broadcast::{InactiveReceiver, Receiver, Sender, broadcast};
50use async_lock::RwLock;
51use async_trait::async_trait;
52use futures::join;
53use hotshot_task::task::{ConsensusTaskRegistry, NetworkTaskRegistry};
54use hotshot_task_impls::{events::HotShotEvent, helpers::broadcast_event};
55// Internal
56/// Reexport error type
57pub use hotshot_types::error::HotShotError;
58use hotshot_types::{
59    HotShotConfig,
60    consensus::{
61        Consensus, ConsensusMetricsValue, OuterConsensus, PayloadWithMetadata, VidShares, View,
62        ViewInner,
63    },
64    constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
65    data::{EpochNumber, Leaf2, ViewNumber},
66    event::{EventType, LeafInfo},
67    message::{DataMessage, Message, MessageKind, Proposal},
68    simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
69    stake_table::HSStakeTable,
70    storage_metrics::StorageMetricsValue,
71    traits::{
72        consensus_api::ConsensusApi, network::ConnectedNetwork, node_implementation::NodeType,
73        signature_key::SignatureKey, states::ValidatedState,
74    },
75    utils::{genesis_epoch_from_version, option_epoch_from_block_number},
76};
77use hotshot_utils::warn;
78/// Reexport rand crate
79pub use rand;
80use tokio::{spawn, time::sleep};
81use tracing::{debug, instrument, trace};
82
83// -- Rexports
84// External
85use crate::{
86    tasks::{add_consensus_tasks, add_network_tasks},
87    traits::NodeImplementation,
88    types::{Event, SystemContextHandle},
89};
90
91/// Length, in bytes, of a 512 bit hash
92pub const H_512: usize = 64;
93/// Length, in bytes, of a 256 bit hash
94pub const H_256: usize = 32;
95
96/// Holds the state needed to participate in `HotShot` consensus
97pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>> {
98    /// The public key of this node
99    public_key: TYPES::SignatureKey,
100
101    /// The private key of this node
102    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
103
104    /// The private key to sign the light client state
105    state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
106
107    /// Configuration items for this hotshot instance
108    pub config: HotShotConfig<TYPES>,
109
110    /// The underlying network
111    pub network: Arc<I::Network>,
112
113    /// Memberships used by consensus
114    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
115
116    /// the metrics that the implementor is using.
117    metrics: Arc<ConsensusMetricsValue>,
118
119    /// The hotstuff implementation
120    consensus: OuterConsensus<TYPES>,
121
122    /// Immutable instance state
123    instance_state: Arc<TYPES::InstanceState>,
124
125    /// The view to enter when first starting consensus
126    start_view: ViewNumber,
127
128    /// The epoch to enter when first starting consensus
129    start_epoch: Option<EpochNumber>,
130
131    /// Access to the output event stream.
132    output_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
133
134    /// External event stream for communication with the application.
135    pub(crate) external_event_stream: (Sender<Event<TYPES>>, InactiveReceiver<Event<TYPES>>),
136
137    /// Anchored leaf provided by the initializer.
138    anchored_leaf: Leaf2<TYPES>,
139
140    /// access to the internal event stream, in case we need to, say, shut something down
141    #[allow(clippy::type_complexity)]
142    internal_event_stream: (
143        Sender<Arc<HotShotEvent<TYPES>>>,
144        InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
145    ),
146
147    /// uid for instrumentation
148    pub id: u64,
149
150    /// Reference to the internal storage for consensus datum.
151    pub storage: I::Storage,
152
153    /// Storage metrics
154    pub storage_metrics: Arc<StorageMetricsValue>,
155
156    /// shared lock for upgrade information
157    pub upgrade_lock: UpgradeLock<TYPES>,
158}
159impl<TYPES: NodeType, I: NodeImplementation<TYPES>> Clone for SystemContext<TYPES, I> {
160    #![allow(deprecated)]
161    fn clone(&self) -> Self {
162        Self {
163            public_key: self.public_key.clone(),
164            private_key: self.private_key.clone(),
165            state_private_key: self.state_private_key.clone(),
166            config: self.config.clone(),
167            network: Arc::clone(&self.network),
168            membership_coordinator: self.membership_coordinator.clone(),
169            metrics: Arc::clone(&self.metrics),
170            consensus: self.consensus.clone(),
171            instance_state: Arc::clone(&self.instance_state),
172            start_view: self.start_view,
173            start_epoch: self.start_epoch,
174            output_event_stream: self.output_event_stream.clone(),
175            external_event_stream: self.external_event_stream.clone(),
176            anchored_leaf: self.anchored_leaf.clone(),
177            internal_event_stream: self.internal_event_stream.clone(),
178            id: self.id,
179            storage: self.storage.clone(),
180            storage_metrics: Arc::clone(&self.storage_metrics),
181            upgrade_lock: self.upgrade_lock.clone(),
182        }
183    }
184}
185
186impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
187    #![allow(deprecated)]
188    /// Creates a new [`Arc<SystemContext>`] with the given configuration options.
189    ///
190    /// To do a full initialization, use `fn init` instead, which will set up background tasks as
191    /// well.
192    ///
193    /// Use this instead of `init` if you want to start the tasks manually
194    ///
195    /// # Panics
196    ///
197    /// Panics if storage migration fails.
198    #[allow(clippy::too_many_arguments)]
199    pub async fn new(
200        public_key: TYPES::SignatureKey,
201        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
202        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
203        nonce: u64,
204        config: HotShotConfig<TYPES>,
205        upgrade: versions::Upgrade,
206        memberships: EpochMembershipCoordinator<TYPES>,
207        network: Arc<I::Network>,
208        initializer: HotShotInitializer<TYPES>,
209        consensus_metrics: ConsensusMetricsValue,
210        storage: I::Storage,
211        storage_metrics: StorageMetricsValue,
212    ) -> Arc<Self> {
213        let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
214        let external_chan = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
215
216        Self::new_from_channels(
217            public_key,
218            private_key,
219            state_private_key,
220            nonce,
221            config,
222            upgrade,
223            memberships,
224            network,
225            initializer,
226            consensus_metrics,
227            storage,
228            storage_metrics,
229            internal_chan,
230            external_chan,
231        )
232        .await
233    }
234
235    /// Creates a new [`Arc<SystemContext>`] with the given configuration options.
236    ///
237    /// To do a full initialization, use `fn init` instead, which will set up background tasks as
238    /// well.
239    ///
240    /// Use this function if you want to use some preexisting channels and to spin up the tasks
241    /// and start consensus manually.  Mostly useful for tests
242    #[allow(clippy::too_many_arguments, clippy::type_complexity)]
243    pub async fn new_from_channels(
244        public_key: TYPES::SignatureKey,
245        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
246        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
247        nonce: u64,
248        config: HotShotConfig<TYPES>,
249        upgrade: versions::Upgrade,
250        membership_coordinator: EpochMembershipCoordinator<TYPES>,
251        network: Arc<I::Network>,
252        initializer: HotShotInitializer<TYPES>,
253        consensus_metrics: ConsensusMetricsValue,
254        storage: I::Storage,
255        storage_metrics: StorageMetricsValue,
256        internal_channel: (
257            Sender<Arc<HotShotEvent<TYPES>>>,
258            Receiver<Arc<HotShotEvent<TYPES>>>,
259        ),
260        external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
261    ) -> Arc<Self> {
262        debug!("Creating a new hotshot");
263
264        tracing::warn!("Starting consensus with HotShotConfig:\n\n {config:?}");
265
266        let consensus_metrics = Arc::new(consensus_metrics);
267        let storage_metrics = Arc::new(storage_metrics);
268        let anchored_leaf = initializer.anchor_leaf;
269        let instance_state = initializer.instance_state;
270
271        let (internal_tx, internal_rx) = internal_channel;
272        let (mut external_tx, external_rx) = external_channel;
273
274        let mut internal_rx = internal_rx.new_receiver();
275
276        let mut external_rx = external_rx.new_receiver();
277
278        // Allow overflow on the internal channel as well. We don't want to block consensus if we
279        // have a slow receiver
280        internal_rx.set_overflow(true);
281        // Allow overflow on the external channel, otherwise sending to it may block.
282        external_rx.set_overflow(true);
283
284        tracing::warn!(
285            "Starting consensus with versions:\n\n Base: {:?}\nUpgrade: {:?}.",
286            upgrade.base,
287            upgrade.target
288        );
289        tracing::warn!(
290            "Loading previously decided upgrade certificate from storage: {:?}",
291            initializer.decided_upgrade_certificate
292        );
293
294        let upgrade_lock = UpgradeLock::<TYPES>::from_certificate(
295            upgrade,
296            &initializer.decided_upgrade_certificate,
297        );
298
299        let current_version = if let Some(cert) = initializer.decided_upgrade_certificate {
300            cert.data.new_version
301        } else {
302            upgrade.base
303        };
304
305        debug!("Setting DRB difficulty selector in membership");
306        let drb_difficulty_selector = drb_difficulty_selector(&config);
307
308        membership_coordinator.set_drb_difficulty_selector(drb_difficulty_selector);
309
310        for da_committee in &config.da_committees {
311            if current_version >= da_committee.start_version {
312                membership_coordinator.membership().add_da_committee(
313                    da_committee.start_epoch.into(),
314                    da_committee.committee.clone(),
315                );
316            }
317        }
318
319        // Get the validated state from the initializer or construct an incomplete one from the
320        // block header.
321        let validated_state = initializer.anchor_state;
322
323        load_start_epoch_info(
324            membership_coordinator.membership(),
325            &initializer.start_epoch_info,
326            config.epoch_height,
327            config.epoch_start_block,
328        )
329        .await;
330
331        // #3967 REVIEW NOTE: Should this actually be Some()? How do we know?
332        let epoch = initializer.high_qc.data.block_number.map(|block_number| {
333            EpochNumber::new(epoch_from_block_number(
334                block_number + 1,
335                config.epoch_height,
336            ))
337        });
338
339        // Insert the validated state to state map.
340        let mut validated_state_map = BTreeMap::default();
341        validated_state_map.insert(
342            anchored_leaf.view_number(),
343            View {
344                view_inner: ViewInner::Leaf {
345                    leaf: anchored_leaf.commit(),
346                    state: Arc::clone(&validated_state),
347                    delta: initializer.anchor_state_delta,
348                    epoch,
349                },
350            },
351        );
352        for (view_num, inner) in initializer.undecided_state {
353            validated_state_map.insert(view_num, inner);
354        }
355
356        let mut saved_leaves = HashMap::new();
357        let mut saved_payloads = BTreeMap::new();
358        saved_leaves.insert(anchored_leaf.commit(), anchored_leaf.clone());
359
360        for (_, leaf) in initializer.undecided_leaves {
361            saved_leaves.insert(leaf.commit(), leaf.clone());
362        }
363        if let Some(payload) = anchored_leaf.block_payload() {
364            let metadata = anchored_leaf.block_header().metadata().clone();
365            saved_payloads.insert(
366                anchored_leaf.view_number(),
367                Arc::new(PayloadWithMetadata { payload, metadata }),
368            );
369        }
370        let high_qc_block_number = initializer.high_qc.data.block_number;
371        let (stake_table, success_threshold) =
372            if let Ok(epoch_membership) = membership_coordinator.stake_table_for_epoch(epoch) {
373                (
374                    HSStakeTable::from_iter(epoch_membership.stake_table()),
375                    epoch_membership.success_threshold(),
376                )
377            } else {
378                tracing::warn!(
379                    "Failed to get stake table for epoch {:?} while creating vote participation",
380                    epoch
381                );
382                (HSStakeTable::default(), U256::MAX)
383            };
384
385        let consensus = Consensus::new(
386            validated_state_map,
387            Some(initializer.saved_vid_shares),
388            anchored_leaf.view_number(),
389            epoch,
390            anchored_leaf.view_number(),
391            anchored_leaf.view_number(),
392            initializer.last_actioned_view,
393            initializer.saved_proposals,
394            saved_leaves,
395            saved_payloads,
396            initializer.high_qc,
397            initializer.next_epoch_high_qc,
398            Arc::clone(&consensus_metrics),
399            config.epoch_height,
400            initializer.state_cert,
401            config.drb_difficulty,
402            config.drb_upgrade_difficulty,
403            stake_table,
404            success_threshold,
405        );
406
407        let consensus = Arc::new(RwLock::new(consensus));
408
409        if let Some(epoch) = epoch {
410            tracing::info!(
411                "Triggering catchup for epoch {} and next epoch {}",
412                epoch,
413                epoch + 1
414            );
415            // trigger catchup for the current and next epoch if needed
416            let _ = membership_coordinator.membership_for_epoch(Some(epoch));
417            let _ = membership_coordinator.membership_for_epoch(Some(epoch + 1));
418            // If we already have an epoch root, we can trigger catchup for the epoch
419            // which that root applies to.
420            if let Some(high_qc_block_number) = high_qc_block_number
421                && is_ge_epoch_root(high_qc_block_number, config.epoch_height)
422            {
423                let _ = membership_coordinator.stake_table_for_epoch(Some(epoch + 2));
424            }
425
426            if let Ok(drb_result) = storage.load_drb_result(epoch + 1).await {
427                tracing::error!("Writing DRB result for epoch {}", epoch + 1);
428                if let Ok(mem) = membership_coordinator.stake_table_for_epoch(Some(epoch + 1)) {
429                    mem.add_drb_result(drb_result);
430                }
431            }
432        }
433
434        // This makes it so we won't block on broadcasting if there is not a receiver
435        // Our own copy of the receiver is inactive so it doesn't count.
436        external_tx.set_await_active(false);
437
438        let inner: Arc<SystemContext<TYPES, I>> = Arc::new(SystemContext {
439            id: nonce,
440            consensus: OuterConsensus::new(consensus),
441            instance_state: Arc::new(instance_state),
442            public_key,
443            private_key,
444            state_private_key,
445            config,
446            start_view: initializer.start_view,
447            start_epoch: initializer.start_epoch,
448            network,
449            membership_coordinator,
450            metrics: Arc::clone(&consensus_metrics),
451            internal_event_stream: (internal_tx, internal_rx.deactivate()),
452            output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
453            external_event_stream: (external_tx, external_rx.deactivate()),
454            anchored_leaf: anchored_leaf.clone(),
455            storage,
456            storage_metrics,
457            upgrade_lock,
458        });
459
460        inner
461    }
462
463    /// "Starts" consensus by sending a `Qc2Formed`, `ViewChange` events
464    ///
465    /// # Panics
466    /// Panics if sending genesis fails
467    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
468    pub async fn start_consensus(&self) {
469        #[cfg(all(feature = "rewind", not(debug_assertions)))]
470        compile_error!("Cannot run rewind in production builds!");
471
472        debug!("Starting Consensus");
473        let consensus = self.consensus.read().await;
474
475        let first_epoch = option_epoch_from_block_number(
476            self.upgrade_lock.upgrade().base >= EPOCH_VERSION,
477            self.config.epoch_start_block,
478            self.config.epoch_height,
479        );
480        // `start_epoch` comes from the initializer, it might be the last seen epoch before restart
481        // `first_epoch` is the first epoch after the transition to the epoch version
482        // `initial_view_change_epoch` is the greater of the two, we use it with the initial view change
483        let initial_view_change_epoch = self.start_epoch.max(first_epoch);
484        #[allow(clippy::panic)]
485        self.internal_event_stream
486            .0
487            .broadcast_direct(Arc::new(HotShotEvent::ViewChange(
488                self.start_view,
489                initial_view_change_epoch,
490            )))
491            .await
492            .unwrap_or_else(|_| {
493                panic!(
494                    "Genesis Broadcast failed; event = ViewChange({:?}, {:?})",
495                    self.start_view, initial_view_change_epoch,
496                )
497            });
498
499        // Clone the event stream that we send the timeout event to
500        let event_stream = self.internal_event_stream.0.clone();
501        let next_view_timeout = self.config.next_view_timeout;
502        let start_view = self.start_view;
503        let start_epoch = self.start_epoch;
504
505        // Spawn a task that will sleep for the next view timeout and then send a timeout event
506        // if not cancelled
507        spawn({
508            async move {
509                sleep(Duration::from_millis(next_view_timeout)).await;
510                broadcast_event(
511                    Arc::new(HotShotEvent::Timeout(start_view, start_epoch)),
512                    &event_stream,
513                )
514                .await;
515            }
516        });
517        #[allow(clippy::panic)]
518        self.internal_event_stream
519            .0
520            .broadcast_direct(Arc::new(HotShotEvent::Qc2Formed(either::Left(
521                consensus.high_qc().clone(),
522            ))))
523            .await
524            .unwrap_or_else(|_| {
525                panic!(
526                    "Genesis Broadcast failed; event = Qc2Formed(either::Left({:?}))",
527                    consensus.high_qc()
528                )
529            });
530
531        {
532            // Some applications seem to expect a leaf decide event for the genesis leaf,
533            // which contains only that leaf and nothing else.
534            if self.anchored_leaf.view_number() == ViewNumber::genesis() {
535                let (validated_state, state_delta) =
536                    TYPES::ValidatedState::genesis(&self.instance_state);
537
538                let qc = QuorumCertificate2::genesis(
539                    &validated_state,
540                    self.instance_state.as_ref(),
541                    self.upgrade_lock.upgrade(),
542                )
543                .await;
544
545                broadcast_event(
546                    Event {
547                        view_number: self.anchored_leaf.view_number(),
548                        event: EventType::Decide {
549                            leaf_chain: Arc::new(vec![LeafInfo::new(
550                                self.anchored_leaf.clone(),
551                                Arc::new(validated_state),
552                                Some(Arc::new(state_delta)),
553                                None,
554                                None,
555                            )]),
556                            committing_qc: Arc::new(CertificatePair::non_epoch_change(qc)),
557                            deciding_qc: None,
558                            block_size: None,
559                        },
560                    },
561                    &self.external_event_stream.0,
562                )
563                .await;
564            }
565        }
566    }
567
568    /// Emit an external event
569    async fn send_external_event(&self, event: Event<TYPES>) {
570        debug!(?event, "send_external_event");
571        broadcast_event(event, &self.external_event_stream.0).await;
572    }
573
574    /// Publishes a transaction asynchronously to the network.
575    ///
576    /// # Errors
577    ///
578    /// Always returns Ok; does not return an error if the transaction couldn't be published to the network
579    #[instrument(skip(self), err, target = "SystemContext", fields(id = self.id))]
580    pub async fn publish_transaction_async(
581        &self,
582        transaction: TYPES::Transaction,
583    ) -> Result<(), HotShotError<TYPES>> {
584        trace!("Adding transaction to our own queue");
585
586        let api = self.clone();
587
588        let consensus_reader = api.consensus.read().await;
589        let view_number = consensus_reader.cur_view();
590        let epoch = consensus_reader.cur_epoch();
591        drop(consensus_reader);
592
593        // Wrap up a message
594        let message_kind: DataMessage<TYPES> =
595            DataMessage::SubmitTransaction(transaction.clone(), view_number);
596        let message = Message {
597            sender: api.public_key.clone(),
598            kind: MessageKind::from(message_kind),
599        };
600
601        let serialized_message = self.upgrade_lock.serialize(&message).map_err(|err| {
602            HotShotError::FailedToSerialize(format!("failed to serialize transaction: {err}"))
603        })?;
604
605        let membership = match api.membership_coordinator.membership_for_epoch(epoch) {
606            Ok(m) => m,
607            Err(e) => return Err(HotShotError::InvalidState(e.message)),
608        };
609
610        spawn(async move {
611            let memberships_da_committee_members = membership
612                .da_committee_members(view_number)
613                .cloned()
614                .collect();
615
616            join! {
617                // TODO We should have a function that can return a network error if there is one
618                // but first we'd need to ensure our network implementations can support that
619                // (and not hang instead)
620
621                // version <0, 1> currently fixed; this is the same as VERSION_0_1,
622                // and will be updated to be part of SystemContext. I wanted to use associated
623                // constants in NodeType, but that seems to be unavailable in the current Rust.
624                api
625                    .network.da_broadcast_message(
626                        view_number.u64().into(),
627                        serialized_message,
628                        memberships_da_committee_members,
629                        BroadcastDelay::None,
630                    ),
631                api
632                    .send_external_event(Event {
633                        view_number,
634                        event: EventType::Transactions {
635                            transactions: vec![transaction],
636                        },
637                    }),
638            }
639        });
640        Ok(())
641    }
642
643    /// Returns a copy of the consensus struct
644    #[must_use]
645    pub fn consensus(&self) -> Arc<RwLock<Consensus<TYPES>>> {
646        Arc::clone(&self.consensus.inner_consensus)
647    }
648
649    /// Returns a copy of the instance state
650    pub fn instance_state(&self) -> Arc<TYPES::InstanceState> {
651        Arc::clone(&self.instance_state)
652    }
653
654    /// Returns a copy of the last decided leaf
655    /// # Panics
656    /// Panics if internal leaf for consensus is inconsistent
657    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
658    pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
659        self.consensus.read().await.decided_leaf()
660    }
661
662    /// [Non-blocking] instantly returns a copy of the last decided leaf if
663    /// it is available to be read. If not, we return `None`.
664    ///
665    /// # Panics
666    /// Panics if internal state for consensus is inconsistent
667    #[must_use]
668    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
669    pub fn try_decided_leaf(&self) -> Option<Leaf2<TYPES>> {
670        self.consensus.try_read().map(|guard| guard.decided_leaf())
671    }
672
673    /// Returns the last decided validated state.
674    ///
675    /// # Panics
676    /// Panics if internal state for consensus is inconsistent
677    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
678    pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
679        Arc::clone(&self.consensus.read().await.decided_state())
680    }
681
682    /// Get the validated state from a given `view`.
683    ///
684    /// Returns the requested state, if the [`SystemContext`] is tracking this view. Consensus
685    /// tracks views that have not yet been decided but could be in the future. This function may
686    /// return [`None`] if the requested view has already been decided (but see
687    /// [`decided_state`](Self::decided_state)) or if there is no path for the requested
688    /// view to ever be decided.
689    #[instrument(skip_all, target = "SystemContext", fields(id = self.id))]
690    pub async fn state(&self, view: ViewNumber) -> Option<Arc<TYPES::ValidatedState>> {
691        self.consensus.read().await.state(view).cloned()
692    }
693
694    /// Initializes a new [`SystemContext`] and does the work of setting up all the background tasks
695    ///
696    /// Assumes networking implementation is already primed.
697    ///
698    /// Underlying `HotShot` instance starts out paused, and must be unpaused
699    ///
700    /// Upon encountering an unrecoverable error, such as a failure to send to a broadcast channel,
701    /// the `HotShot` instance will log the error and shut down.
702    ///
703    /// To construct a [`SystemContext`] without setting up tasks, use `fn new` instead.
704    /// # Errors
705    ///
706    /// Can throw an error if `Self::new` fails.
707    #[allow(clippy::too_many_arguments)]
708    pub async fn init(
709        public_key: TYPES::SignatureKey,
710        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
711        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
712        node_id: u64,
713        config: HotShotConfig<TYPES>,
714        upgrade: versions::Upgrade,
715        memberships: EpochMembershipCoordinator<TYPES>,
716        network: Arc<I::Network>,
717        initializer: HotShotInitializer<TYPES>,
718        consensus_metrics: ConsensusMetricsValue,
719        storage: I::Storage,
720        storage_metrics: StorageMetricsValue,
721    ) -> Result<
722        (
723            SystemContextHandle<TYPES, I>,
724            Sender<Arc<HotShotEvent<TYPES>>>,
725            Receiver<Arc<HotShotEvent<TYPES>>>,
726        ),
727        HotShotError<TYPES>,
728    > {
729        let hotshot = Self::new(
730            public_key,
731            private_key,
732            state_private_key,
733            node_id,
734            config,
735            upgrade,
736            memberships,
737            network,
738            initializer,
739            consensus_metrics,
740            storage,
741            storage_metrics,
742        )
743        .await;
744        let handle = Arc::clone(&hotshot).run_tasks().await;
745        let (tx, rx) = hotshot.internal_event_stream.clone();
746
747        Ok((handle, tx, rx.activate()))
748    }
749    /// return the timeout for a view for `self`
750    #[must_use]
751    pub fn next_view_timeout(&self) -> u64 {
752        self.config.next_view_timeout
753    }
754}
755
756impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
757    /// Spawn all tasks that operate on [`SystemContextHandle`].
758    ///
759    /// For a list of which tasks are being spawned, see this module's documentation.
760    pub async fn run_tasks(&self) -> SystemContextHandle<TYPES, I> {
761        let consensus_registry = ConsensusTaskRegistry::new();
762        let network_registry = NetworkTaskRegistry::new();
763
764        let output_event_stream = self.external_event_stream.clone();
765        let internal_event_stream = self.internal_event_stream.clone();
766
767        let mut handle = SystemContextHandle {
768            consensus_registry,
769            network_registry,
770            output_event_stream: output_event_stream.clone(),
771            internal_event_stream: internal_event_stream.clone(),
772            hotshot: self.clone().into(),
773            storage: self.storage.clone(),
774            network: Arc::clone(&self.network),
775            membership_coordinator: self.membership_coordinator.clone(),
776            epoch_height: self.config.epoch_height,
777        };
778
779        add_network_tasks::<TYPES, I>(&mut handle).await;
780        add_consensus_tasks::<TYPES, I>(&mut handle).await;
781
782        handle
783    }
784}
785
786/// An async broadcast channel
787type Channel<S> = (Sender<Arc<S>>, Receiver<Arc<S>>);
788
789/// Trait for handling messages for a node with a twin copy of consensus
790#[async_trait]
791pub trait TwinsHandlerState<TYPES, I>
792where
793    Self: std::fmt::Debug + Send + Sync,
794    TYPES: NodeType,
795    I: NodeImplementation<TYPES>,
796{
797    /// Handle a message sent to the twin from the network task, forwarding it to one of the two twins.
798    async fn send_handler(
799        &mut self,
800        event: &HotShotEvent<TYPES>,
801    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>>;
802
803    /// Handle a message from either twin, forwarding it to the network task
804    async fn recv_handler(
805        &mut self,
806        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
807    ) -> Vec<HotShotEvent<TYPES>>;
808
809    /// Fuse two channels into a single channel
810    ///
811    /// Note: the channels are fused using two async loops, whose `JoinHandle`s are dropped.
812    fn fuse_channels(
813        &'static mut self,
814        left: Channel<HotShotEvent<TYPES>>,
815        right: Channel<HotShotEvent<TYPES>>,
816    ) -> Channel<HotShotEvent<TYPES>> {
817        let send_state = Arc::new(RwLock::new(self));
818        let recv_state = Arc::clone(&send_state);
819
820        let (left_sender, mut left_receiver) = (left.0, left.1);
821        let (right_sender, mut right_receiver) = (right.0, right.1);
822
823        // channel to the network task
824        let (sender_to_network, network_task_receiver) = broadcast(EVENT_CHANNEL_SIZE);
825        // channel from the network task
826        let (network_task_sender, mut receiver_from_network): Channel<HotShotEvent<TYPES>> =
827            broadcast(EVENT_CHANNEL_SIZE);
828
829        let _recv_loop_handle = spawn(async move {
830            loop {
831                let msg = match select(left_receiver.recv(), right_receiver.recv()).await {
832                    Either::Left(msg) => Either::Left(msg.0.unwrap().as_ref().clone()),
833                    Either::Right(msg) => Either::Right(msg.0.unwrap().as_ref().clone()),
834                };
835
836                let mut state = recv_state.write().await;
837                let mut result = state.recv_handler(&msg).await;
838
839                while let Some(event) = result.pop() {
840                    let _ = sender_to_network.broadcast(event.into()).await;
841                }
842            }
843        });
844
845        let _send_loop_handle = spawn(async move {
846            loop {
847                if let Ok(msg) = receiver_from_network.recv().await {
848                    let mut state = send_state.write().await;
849
850                    let mut result = state.send_handler(&msg).await;
851
852                    while let Some(event) = result.pop() {
853                        match event {
854                            Either::Left(msg) => {
855                                let _ = left_sender.broadcast(msg.into()).await;
856                            },
857                            Either::Right(msg) => {
858                                let _ = right_sender.broadcast(msg.into()).await;
859                            },
860                        }
861                    }
862                }
863            }
864        });
865
866        (network_task_sender, network_task_receiver)
867    }
868
869    #[allow(clippy::too_many_arguments)]
870    /// Spawn all tasks that operate on [`SystemContextHandle`].
871    ///
872    /// For a list of which tasks are being spawned, see this module's documentation.
873    async fn spawn_twin_handles(
874        &'static mut self,
875        public_key: TYPES::SignatureKey,
876        private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
877        state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
878        nonce: u64,
879        config: HotShotConfig<TYPES>,
880        upgrade: versions::Upgrade,
881        memberships: EpochMembershipCoordinator<TYPES>,
882        network: Arc<I::Network>,
883        initializer: HotShotInitializer<TYPES>,
884        consensus_metrics: ConsensusMetricsValue,
885        storage: I::Storage,
886        storage_metrics: StorageMetricsValue,
887    ) -> (SystemContextHandle<TYPES, I>, SystemContextHandle<TYPES, I>) {
888        let epoch_height = config.epoch_height;
889        let left_system_context = SystemContext::new(
890            public_key.clone(),
891            private_key.clone(),
892            state_private_key.clone(),
893            nonce,
894            config.clone(),
895            upgrade,
896            memberships.clone(),
897            Arc::clone(&network),
898            initializer.clone(),
899            consensus_metrics.clone(),
900            storage.clone(),
901            storage_metrics.clone(),
902        )
903        .await;
904        let right_system_context = SystemContext::new(
905            public_key,
906            private_key,
907            state_private_key,
908            nonce,
909            config,
910            upgrade,
911            memberships,
912            network,
913            initializer,
914            consensus_metrics,
915            storage,
916            storage_metrics,
917        )
918        .await;
919
920        // create registries for both handles
921        let left_consensus_registry = ConsensusTaskRegistry::new();
922        let left_network_registry = NetworkTaskRegistry::new();
923
924        let right_consensus_registry = ConsensusTaskRegistry::new();
925        let right_network_registry = NetworkTaskRegistry::new();
926
927        // create external channels for both handles
928        let (left_external_sender, left_external_receiver) = broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
929        let left_external_event_stream =
930            (left_external_sender, left_external_receiver.deactivate());
931
932        let (right_external_sender, right_external_receiver) =
933            broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
934        let right_external_event_stream =
935            (right_external_sender, right_external_receiver.deactivate());
936
937        // create internal channels for both handles
938        let (left_internal_sender, left_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
939        let left_internal_event_stream = (
940            left_internal_sender.clone(),
941            left_internal_receiver.clone().deactivate(),
942        );
943
944        let (right_internal_sender, right_internal_receiver) = broadcast(EVENT_CHANNEL_SIZE);
945        let right_internal_event_stream = (
946            right_internal_sender.clone(),
947            right_internal_receiver.clone().deactivate(),
948        );
949
950        // create each handle
951        let mut left_handle = SystemContextHandle::<_, I> {
952            consensus_registry: left_consensus_registry,
953            network_registry: left_network_registry,
954            output_event_stream: left_external_event_stream.clone(),
955            internal_event_stream: left_internal_event_stream.clone(),
956            hotshot: Arc::clone(&left_system_context),
957            storage: left_system_context.storage.clone(),
958            network: Arc::clone(&left_system_context.network),
959            membership_coordinator: left_system_context.membership_coordinator.clone(),
960            epoch_height,
961        };
962
963        let mut right_handle = SystemContextHandle::<_, I> {
964            consensus_registry: right_consensus_registry,
965            network_registry: right_network_registry,
966            output_event_stream: right_external_event_stream.clone(),
967            internal_event_stream: right_internal_event_stream.clone(),
968            hotshot: Arc::clone(&right_system_context),
969            storage: right_system_context.storage.clone(),
970            network: Arc::clone(&right_system_context.network),
971            membership_coordinator: right_system_context.membership_coordinator.clone(),
972            epoch_height,
973        };
974
975        // add consensus tasks to each handle, using their individual internal event streams
976        add_consensus_tasks::<TYPES, I>(&mut left_handle).await;
977        add_consensus_tasks::<TYPES, I>(&mut right_handle).await;
978
979        // fuse the event streams from both handles before initializing the network tasks
980        let fused_internal_event_stream = self.fuse_channels(
981            (left_internal_sender, left_internal_receiver),
982            (right_internal_sender, right_internal_receiver),
983        );
984
985        // swap out the event stream on the left handle
986        left_handle.internal_event_stream = (
987            fused_internal_event_stream.0,
988            fused_internal_event_stream.1.deactivate(),
989        );
990
991        // add the network tasks to the left handle. note: because the left handle has the fused event stream, the network tasks on the left handle will handle messages from both handles.
992        add_network_tasks::<TYPES, I>(&mut left_handle).await;
993
994        // revert to the original event stream on the left handle, for any applications that want to listen to it
995        left_handle.internal_event_stream = left_internal_event_stream.clone();
996
997        (left_handle, right_handle)
998    }
999}
1000
1001#[derive(Debug)]
1002/// A `TwinsHandlerState` that randomly forwards a message to either twin,
1003/// and returns messages from both.
1004pub struct RandomTwinsHandler;
1005
1006#[async_trait]
1007impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TwinsHandlerState<TYPES, I>
1008    for RandomTwinsHandler
1009{
1010    async fn send_handler(
1011        &mut self,
1012        event: &HotShotEvent<TYPES>,
1013    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1014        let random: bool = rand::thread_rng().r#gen();
1015
1016        #[allow(clippy::match_bool)]
1017        match random {
1018            true => vec![Either::Left(event.clone())],
1019            false => vec![Either::Right(event.clone())],
1020        }
1021    }
1022
1023    async fn recv_handler(
1024        &mut self,
1025        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1026    ) -> Vec<HotShotEvent<TYPES>> {
1027        match event {
1028            Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1029        }
1030    }
1031}
1032
1033/// A `TwinsHandlerState` that forwards each message to both twins,
1034/// and returns messages from each of them.
1035#[derive(Debug)]
1036pub struct DoubleTwinsHandler;
1037
1038#[async_trait]
1039impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TwinsHandlerState<TYPES, I>
1040    for DoubleTwinsHandler
1041{
1042    async fn send_handler(
1043        &mut self,
1044        event: &HotShotEvent<TYPES>,
1045    ) -> Vec<Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>> {
1046        vec![Either::Left(event.clone()), Either::Right(event.clone())]
1047    }
1048
1049    async fn recv_handler(
1050        &mut self,
1051        event: &Either<HotShotEvent<TYPES>, HotShotEvent<TYPES>>,
1052    ) -> Vec<HotShotEvent<TYPES>> {
1053        match event {
1054            Either::Left(msg) | Either::Right(msg) => vec![msg.clone()],
1055        }
1056    }
1057}
1058
1059#[async_trait]
1060impl<TYPES: NodeType, I: NodeImplementation<TYPES>> ConsensusApi<TYPES, I>
1061    for SystemContextHandle<TYPES, I>
1062{
1063    fn total_nodes(&self) -> NonZeroUsize {
1064        self.hotshot.config.num_nodes_with_stake
1065    }
1066
1067    fn builder_timeout(&self) -> Duration {
1068        self.hotshot.config.builder_timeout
1069    }
1070
1071    async fn send_event(&self, event: Event<TYPES>) {
1072        debug!(?event, "send_event");
1073        broadcast_event(event, &self.hotshot.external_event_stream.0).await;
1074    }
1075
1076    fn public_key(&self) -> &TYPES::SignatureKey {
1077        &self.hotshot.public_key
1078    }
1079
1080    fn private_key(&self) -> &<TYPES::SignatureKey as SignatureKey>::PrivateKey {
1081        &self.hotshot.private_key
1082    }
1083
1084    fn state_private_key(
1085        &self,
1086    ) -> &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey {
1087        &self.hotshot.state_private_key
1088    }
1089}
1090
1091#[derive(Clone, Debug, PartialEq)]
1092pub struct InitializerEpochInfo<TYPES: NodeType> {
1093    pub epoch: EpochNumber,
1094    pub drb_result: DrbResult,
1095    // pub stake_table: Option<StakeTable>, // TODO: Figure out how to connect this up
1096    pub block_header: Option<TYPES::BlockHeader>,
1097}
1098
1099/// initializer struct for creating starting block
1100#[derive(Clone, Debug)]
1101pub struct HotShotInitializer<TYPES: NodeType> {
1102    /// Instance-level state.
1103    pub instance_state: TYPES::InstanceState,
1104
1105    /// Epoch height
1106    pub epoch_height: u64,
1107
1108    /// Epoch start block
1109    pub epoch_start_block: u64,
1110
1111    /// the anchor leaf for the hotshot initializer
1112    pub anchor_leaf: Leaf2<TYPES>,
1113
1114    /// ValidatedState for the anchor leaf
1115    pub anchor_state: Arc<TYPES::ValidatedState>,
1116
1117    /// ValidatedState::Delta for the anchor leaf, optional.
1118    pub anchor_state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1119
1120    /// Starting view number that should be equivalent to the view the node shut down with last.
1121    pub start_view: ViewNumber,
1122
1123    /// The view we last performed an action in.  An action is proposing or voting for
1124    /// either the quorum or DA.
1125    pub last_actioned_view: ViewNumber,
1126
1127    /// Starting epoch number that should be equivalent to the epoch the node shut down with last.
1128    pub start_epoch: Option<EpochNumber>,
1129
1130    /// Highest QC that was seen, for genesis it's the genesis QC.  It should be for a view greater
1131    /// than `inner`s view number for the non genesis case because we must have seen higher QCs
1132    /// to decide on the leaf.
1133    pub high_qc: QuorumCertificate2<TYPES>,
1134
1135    /// Next epoch highest QC that was seen. This is needed to propose during epoch transition after restart.
1136    pub next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
1137
1138    /// Proposals we have sent out to provide to others for catchup
1139    pub saved_proposals: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1140
1141    /// Previously decided upgrade certificate; this is necessary if an upgrade has happened and we are not restarting with the new version
1142    pub decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1143
1144    /// Undecided leaves that were seen, but not yet decided on.  These allow a restarting node
1145    /// to vote and propose right away if they didn't miss anything while down.
1146    pub undecided_leaves: BTreeMap<ViewNumber, Leaf2<TYPES>>,
1147
1148    /// Not yet decided state
1149    pub undecided_state: BTreeMap<ViewNumber, View<TYPES>>,
1150
1151    /// Saved VID shares
1152    pub saved_vid_shares: VidShares<TYPES>,
1153
1154    /// The last formed light client state update certificate if there's any
1155    pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1156
1157    /// Saved epoch information. This must be sorted ascending by epoch.
1158    pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1159}
1160
1161impl<TYPES: NodeType> HotShotInitializer<TYPES> {
1162    /// initialize from genesis
1163    /// # Errors
1164    /// If we are unable to apply the genesis block to the default state
1165    pub async fn from_genesis(
1166        instance_state: TYPES::InstanceState,
1167        epoch_height: u64,
1168        epoch_start_block: u64,
1169        start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1170        upgrade: Upgrade,
1171    ) -> Result<Self, HotShotError<TYPES>> {
1172        let (validated_state, state_delta) = TYPES::ValidatedState::genesis(&instance_state);
1173        let high_qc = QuorumCertificate2::genesis(&validated_state, &instance_state, upgrade).await;
1174
1175        Ok(Self {
1176            anchor_leaf: Leaf2::genesis(&validated_state, &instance_state, upgrade.base).await,
1177            anchor_state: Arc::new(validated_state),
1178            anchor_state_delta: Some(Arc::new(state_delta)),
1179            start_view: ViewNumber::new(0),
1180            start_epoch: genesis_epoch_from_version(upgrade.base),
1181            last_actioned_view: ViewNumber::new(0),
1182            saved_proposals: BTreeMap::new(),
1183            high_qc,
1184            next_epoch_high_qc: None,
1185            decided_upgrade_certificate: None,
1186            undecided_leaves: BTreeMap::new(),
1187            undecided_state: BTreeMap::new(),
1188            instance_state,
1189            saved_vid_shares: BTreeMap::new(),
1190            epoch_height,
1191            state_cert: None,
1192            epoch_start_block,
1193            start_epoch_info,
1194        })
1195    }
1196
1197    /// Use saved proposals to update undecided leaves and state
1198    #[must_use]
1199    pub fn update_undecided(self) -> Self {
1200        let mut undecided_leaves = self.undecided_leaves.clone();
1201        let mut undecided_state = self.undecided_state.clone();
1202
1203        for proposal in self.saved_proposals.values() {
1204            // skip proposals unless they're newer than the anchor leaf
1205            if proposal.data.view_number() <= self.anchor_leaf.view_number() {
1206                continue;
1207            }
1208
1209            undecided_leaves.insert(
1210                proposal.data.view_number(),
1211                Leaf2::from_quorum_proposal(&proposal.data),
1212            );
1213        }
1214
1215        for leaf in undecided_leaves.values() {
1216            let view_inner = ViewInner::Leaf {
1217                leaf: leaf.commit(),
1218                state: Arc::new(TYPES::ValidatedState::from_header(leaf.block_header())),
1219                delta: None,
1220                epoch: leaf.epoch(self.epoch_height),
1221            };
1222            let view = View { view_inner };
1223
1224            undecided_state.insert(leaf.view_number(), view);
1225        }
1226
1227        Self {
1228            undecided_leaves,
1229            undecided_state,
1230            ..self
1231        }
1232    }
1233
1234    /// Create a `HotShotInitializer` from the given information.
1235    ///
1236    /// This function uses the anchor leaf to set the initial validated state,
1237    /// and populates `undecided_leaves` and `undecided_state` using `saved_proposals`.
1238    ///
1239    /// If you are able to or would prefer to set these yourself,
1240    /// you should use the `HotShotInitializer` constructor directly.
1241    #[allow(clippy::too_many_arguments)]
1242    pub fn load(
1243        instance_state: TYPES::InstanceState,
1244        epoch_height: u64,
1245        epoch_start_block: u64,
1246        start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
1247        anchor_leaf: Leaf2<TYPES>,
1248        (start_view, start_epoch): (ViewNumber, Option<EpochNumber>),
1249        (high_qc, next_epoch_high_qc): (
1250            QuorumCertificate2<TYPES>,
1251            Option<NextEpochQuorumCertificate2<TYPES>>,
1252        ),
1253        last_actioned_view: ViewNumber,
1254        saved_proposals: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
1255        saved_vid_shares: VidShares<TYPES>,
1256        decided_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
1257        state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
1258    ) -> Self {
1259        let anchor_state = Arc::new(TYPES::ValidatedState::from_header(
1260            anchor_leaf.block_header(),
1261        ));
1262        let anchor_state_delta = None;
1263
1264        let initializer = Self {
1265            instance_state,
1266            epoch_height,
1267            epoch_start_block,
1268            anchor_leaf,
1269            anchor_state,
1270            anchor_state_delta,
1271            high_qc,
1272            start_view,
1273            start_epoch,
1274            last_actioned_view,
1275            saved_proposals,
1276            saved_vid_shares,
1277            next_epoch_high_qc,
1278            decided_upgrade_certificate,
1279            undecided_leaves: BTreeMap::new(),
1280            undecided_state: BTreeMap::new(),
1281            state_cert,
1282            start_epoch_info,
1283        };
1284
1285        initializer.update_undecided()
1286    }
1287}
1288
1289async fn load_start_epoch_info<TYPES: NodeType>(
1290    membership: &TYPES::Membership,
1291    start_epoch_info: &Vec<InitializerEpochInfo<TYPES>>,
1292    epoch_height: u64,
1293    epoch_start_block: u64,
1294) {
1295    let first_epoch_number =
1296        EpochNumber::new(epoch_from_block_number(epoch_start_block, epoch_height));
1297
1298    tracing::warn!("Calling set_first_epoch for epoch {first_epoch_number}");
1299    membership.set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
1300
1301    let mut sorted_epoch_info = start_epoch_info.clone();
1302    sorted_epoch_info.sort_by_key(|info| info.epoch);
1303    for epoch_info in sorted_epoch_info {
1304        if let Some(block_header) = &epoch_info.block_header {
1305            tracing::warn!("Calling add_epoch_root for epoch {}", epoch_info.epoch);
1306
1307            membership
1308                .add_epoch_root(block_header.clone())
1309                .await
1310                .unwrap_or_else(|err| {
1311                    // REVIEW NOTE: Should we panic here? a failure here seems like it should be fatal
1312                    tracing::error!(
1313                        "Failed to add epoch root for epoch {}: {err}",
1314                        epoch_info.epoch
1315                    );
1316                });
1317        }
1318    }
1319
1320    for epoch_info in start_epoch_info {
1321        tracing::warn!("Calling add_drb_result for epoch {}", epoch_info.epoch);
1322        membership.add_drb_result(epoch_info.epoch, epoch_info.drb_result);
1323    }
1324}