Skip to main content

hotshot_types/
consensus.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Provides the core consensus types
8
9use std::{
10    collections::{BTreeMap, HashMap, HashSet},
11    mem::ManuallyDrop,
12    ops::{Deref, DerefMut},
13    sync::Arc,
14};
15
16use alloy::primitives::U256;
17use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
18use committable::{Commitment, Committable};
19use hotshot_utils::anytrace::*;
20use tracing::instrument;
21use vec1::Vec1;
22
23pub use crate::utils::{View, ViewInner};
24use crate::{
25    constants::EPOCH_PARTICIPATION_HISTORY,
26    data::{
27        EpochNumber, Leaf2, QuorumProposalWrapper, VidCommitment, VidDisperse,
28        VidDisperseAndDuration, VidDisperseShare, ViewNumber,
29    },
30    epoch_membership::EpochMembershipCoordinator,
31    error::HotShotError,
32    event::{HotShotAction, LeafInfo},
33    message::{Proposal, UpgradeLock},
34    simple_certificate::{
35        DaCertificate2, LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2,
36        QuorumCertificate2,
37    },
38    simple_vote::HasEpoch,
39    stake_table::{HSStakeTable, StakeTableEntries},
40    traits::{
41        BlockPayload, ValidatedState,
42        block_contents::{BlockHeader, BuilderFee},
43        metrics::{Counter, Gauge, Histogram, Metrics, NoMetrics},
44        node_implementation::NodeType,
45        signature_key::{SignatureKey, StakeTableEntryType},
46    },
47    utils::{
48        BuilderCommitment, LeafCommitment, StateAndDelta, Terminator, epoch_from_block_number,
49        is_epoch_root, is_epoch_transition, is_last_block, is_transition_block,
50        option_epoch_from_block_number,
51    },
52    vote::{Certificate, HasViewNumber},
53};
54
55/// A type alias for `HashMap<Commitment<T>, T>`
56pub type CommitmentMap<T> = HashMap<Commitment<T>, T>;
57
58/// A type alias for `BTreeMap<T::Time, HashMap<T::SignatureKey, BTreeMap<T::Epoch, Proposal<T, VidDisperseShare<T>>>>>`
59pub type VidShares<TYPES> = BTreeMap<
60    ViewNumber,
61    HashMap<
62        <TYPES as NodeType>::SignatureKey,
63        BTreeMap<Option<EpochNumber>, Proposal<TYPES, VidDisperseShare<TYPES>>>,
64    >,
65>;
66
67/// Type alias for consensus state wrapped in a lock.
68pub type LockedConsensusState<TYPES> = Arc<RwLock<Consensus<TYPES>>>;
69
70/// A thin wrapper around `LockedConsensusState` that helps debugging locks
71#[derive(Clone, Debug)]
72pub struct OuterConsensus<TYPES: NodeType> {
73    /// Inner `LockedConsensusState`
74    pub inner_consensus: LockedConsensusState<TYPES>,
75}
76
77impl<TYPES: NodeType> OuterConsensus<TYPES> {
78    /// Create a new instance of `OuterConsensus`, hopefully uniquely named
79    pub fn new(consensus: LockedConsensusState<TYPES>) -> Self {
80        Self {
81            inner_consensus: consensus,
82        }
83    }
84
85    /// Locks inner consensus for reading and leaves debug traces
86    #[instrument(skip_all, target = "OuterConsensus")]
87    pub async fn read(&self) -> ConsensusReadLockGuard<'_, TYPES> {
88        tracing::trace!("Trying to acquire read lock on consensus");
89        let ret = self.inner_consensus.read().await;
90        tracing::trace!("Acquired read lock on consensus");
91        ConsensusReadLockGuard::new(ret)
92    }
93
94    /// Locks inner consensus for writing and leaves debug traces
95    #[instrument(skip_all, target = "OuterConsensus")]
96    pub async fn write(&self) -> ConsensusWriteLockGuard<'_, TYPES> {
97        tracing::trace!("Trying to acquire write lock on consensus");
98        let ret = self.inner_consensus.write().await;
99        tracing::trace!("Acquired write lock on consensus");
100        ConsensusWriteLockGuard::new(ret)
101    }
102
103    /// Tries to acquire write lock on inner consensus and leaves debug traces
104    #[instrument(skip_all, target = "OuterConsensus")]
105    pub fn try_write(&self) -> Option<ConsensusWriteLockGuard<'_, TYPES>> {
106        tracing::trace!("Trying to acquire write lock on consensus");
107        let ret = self.inner_consensus.try_write();
108        if let Some(guard) = ret {
109            tracing::trace!("Acquired write lock on consensus");
110            Some(ConsensusWriteLockGuard::new(guard))
111        } else {
112            tracing::trace!("Failed to acquire write lock");
113            None
114        }
115    }
116
117    /// Acquires upgradable read lock on inner consensus and leaves debug traces
118    #[instrument(skip_all, target = "OuterConsensus")]
119    pub async fn upgradable_read(&self) -> ConsensusUpgradableReadLockGuard<'_, TYPES> {
120        tracing::trace!("Trying to acquire upgradable read lock on consensus");
121        let ret = self.inner_consensus.upgradable_read().await;
122        tracing::trace!("Acquired upgradable read lock on consensus");
123        ConsensusUpgradableReadLockGuard::new(ret)
124    }
125
126    /// Tries to acquire read lock on inner consensus and leaves debug traces
127    #[instrument(skip_all, target = "OuterConsensus")]
128    pub fn try_read(&self) -> Option<ConsensusReadLockGuard<'_, TYPES>> {
129        tracing::trace!("Trying to acquire read lock on consensus");
130        let ret = self.inner_consensus.try_read();
131        if let Some(guard) = ret {
132            tracing::trace!("Acquired read lock on consensus");
133            Some(ConsensusReadLockGuard::new(guard))
134        } else {
135            tracing::trace!("Failed to acquire read lock");
136            None
137        }
138    }
139}
140
141/// A thin wrapper around `RwLockReadGuard` for `Consensus` that leaves debug traces when the lock is freed
142pub struct ConsensusReadLockGuard<'a, TYPES: NodeType> {
143    /// Inner `RwLockReadGuard`
144    lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>,
145}
146
147impl<'a, TYPES: NodeType> ConsensusReadLockGuard<'a, TYPES> {
148    /// Creates a new instance of `ConsensusReadLockGuard` with the same name as parent `OuterConsensus`
149    #[must_use]
150    pub fn new(lock_guard: RwLockReadGuard<'a, Consensus<TYPES>>) -> Self {
151        Self { lock_guard }
152    }
153}
154
155impl<TYPES: NodeType> Deref for ConsensusReadLockGuard<'_, TYPES> {
156    type Target = Consensus<TYPES>;
157    fn deref(&self) -> &Self::Target {
158        &self.lock_guard
159    }
160}
161
162impl<TYPES: NodeType> Drop for ConsensusReadLockGuard<'_, TYPES> {
163    #[instrument(skip_all, target = "ConsensusReadLockGuard")]
164    fn drop(&mut self) {
165        tracing::trace!("Read lock on consensus dropped");
166    }
167}
168
169/// A thin wrapper around `RwLockWriteGuard` for `Consensus` that leaves debug traces when the lock is freed
170pub struct ConsensusWriteLockGuard<'a, TYPES: NodeType> {
171    /// Inner `RwLockWriteGuard`
172    lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>,
173}
174
175impl<'a, TYPES: NodeType> ConsensusWriteLockGuard<'a, TYPES> {
176    /// Creates a new instance of `ConsensusWriteLockGuard` with the same name as parent `OuterConsensus`
177    #[must_use]
178    pub fn new(lock_guard: RwLockWriteGuard<'a, Consensus<TYPES>>) -> Self {
179        Self { lock_guard }
180    }
181}
182
183impl<TYPES: NodeType> Deref for ConsensusWriteLockGuard<'_, TYPES> {
184    type Target = Consensus<TYPES>;
185    fn deref(&self) -> &Self::Target {
186        &self.lock_guard
187    }
188}
189
190impl<TYPES: NodeType> DerefMut for ConsensusWriteLockGuard<'_, TYPES> {
191    fn deref_mut(&mut self) -> &mut Self::Target {
192        &mut self.lock_guard
193    }
194}
195
196impl<TYPES: NodeType> Drop for ConsensusWriteLockGuard<'_, TYPES> {
197    #[instrument(skip_all, target = "ConsensusWriteLockGuard")]
198    fn drop(&mut self) {
199        tracing::debug!("Write lock on consensus dropped");
200    }
201}
202
203/// A thin wrapper around `RwLockUpgradableReadGuard` for `Consensus` that leaves debug traces when the lock is freed or upgraded
204pub struct ConsensusUpgradableReadLockGuard<'a, TYPES: NodeType> {
205    /// Inner `RwLockUpgradableReadGuard`
206    lock_guard: ManuallyDrop<RwLockUpgradableReadGuard<'a, Consensus<TYPES>>>,
207    /// A helper bool to indicate whether inner lock has been unsafely taken or not
208    taken: bool,
209}
210
211impl<'a, TYPES: NodeType> ConsensusUpgradableReadLockGuard<'a, TYPES> {
212    /// Creates a new instance of `ConsensusUpgradableReadLockGuard` with the same name as parent `OuterConsensus`
213    #[must_use]
214    pub fn new(lock_guard: RwLockUpgradableReadGuard<'a, Consensus<TYPES>>) -> Self {
215        Self {
216            lock_guard: ManuallyDrop::new(lock_guard),
217            taken: false,
218        }
219    }
220
221    /// Upgrades the inner `RwLockUpgradableReadGuard` and leaves debug traces
222    #[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
223    #[allow(unused_assignments)] // `taken` is read in Drop impl
224    pub async fn upgrade(mut guard: Self) -> ConsensusWriteLockGuard<'a, TYPES> {
225        let inner_guard = unsafe { ManuallyDrop::take(&mut guard.lock_guard) };
226        guard.taken = true;
227        tracing::debug!("Trying to upgrade upgradable read lock on consensus");
228        let ret = RwLockUpgradableReadGuard::upgrade(inner_guard).await;
229        tracing::debug!("Upgraded upgradable read lock on consensus");
230        ConsensusWriteLockGuard::new(ret)
231    }
232}
233
234impl<TYPES: NodeType> Deref for ConsensusUpgradableReadLockGuard<'_, TYPES> {
235    type Target = Consensus<TYPES>;
236
237    fn deref(&self) -> &Self::Target {
238        &self.lock_guard
239    }
240}
241
242impl<TYPES: NodeType> Drop for ConsensusUpgradableReadLockGuard<'_, TYPES> {
243    #[instrument(skip_all, target = "ConsensusUpgradableReadLockGuard")]
244    fn drop(&mut self) {
245        if !self.taken {
246            unsafe { ManuallyDrop::drop(&mut self.lock_guard) }
247            tracing::debug!("Upgradable read lock on consensus dropped");
248        }
249    }
250}
251
252/// A bundle of views that we have most recently performed some action
253#[derive(Debug, Clone, Copy)]
254struct HotShotActionViews {
255    /// View we last proposed in to the Quorum
256    proposed: ViewNumber,
257    /// View we last voted in for a QuorumProposal
258    voted: ViewNumber,
259    /// View we last proposed to the DA committee
260    da_proposed: ViewNumber,
261    /// View we lasted voted for DA proposal
262    da_vote: ViewNumber,
263}
264
265impl Default for HotShotActionViews {
266    fn default() -> Self {
267        let genesis = ViewNumber::genesis();
268        Self {
269            proposed: genesis,
270            voted: genesis,
271            da_proposed: genesis,
272            da_vote: genesis,
273        }
274    }
275}
276impl HotShotActionViews {
277    /// Create HotShotActionViews from a view number
278    fn from_view(view: ViewNumber) -> Self {
279        Self {
280            proposed: view,
281            voted: view,
282            da_proposed: view,
283            da_vote: view,
284        }
285    }
286}
287
288type ValidatorParticipationMap<TYPES> = HashMap<<TYPES as NodeType>::SignatureKey, (u64, u64)>;
289
290#[derive(Debug, Clone)]
291struct ValidatorParticipation<TYPES: NodeType> {
292    epoch: EpochNumber,
293    /// Current epoch participation by key maps key -> (num leader, num times proposed)
294    current_epoch_participation: ValidatorParticipationMap<TYPES>,
295
296    /// Previous epochs participation ordered by epoch number, maps epoch -> key -> (num leader, num times proposed)
297    previous_epoch_participation: BTreeMap<EpochNumber, ValidatorParticipationMap<TYPES>>,
298}
299
300impl<TYPES: NodeType> ValidatorParticipation<TYPES> {
301    fn new() -> Self {
302        Self {
303            epoch: EpochNumber::genesis(),
304            current_epoch_participation: HashMap::new(),
305            previous_epoch_participation: BTreeMap::new(),
306        }
307    }
308
309    fn update_participation(
310        &mut self,
311        key: TYPES::SignatureKey,
312        epoch: EpochNumber,
313        proposed: bool,
314    ) {
315        if epoch != self.epoch {
316            return;
317        }
318        let entry = self
319            .current_epoch_participation
320            .entry(key)
321            .or_insert((0, 0));
322        if proposed {
323            entry.1 += 1;
324        }
325        entry.0 += 1;
326    }
327
328    fn update_participation_epoch(&mut self, epoch: EpochNumber) {
329        if epoch <= self.epoch {
330            return;
331        }
332        self.previous_epoch_participation
333            .insert(self.epoch, self.current_epoch_participation.clone());
334
335        self.previous_epoch_participation =
336            self.previous_epoch_participation
337                .split_off(&EpochNumber::new(
338                    self.epoch.saturating_sub(EPOCH_PARTICIPATION_HISTORY),
339                ));
340
341        self.epoch = epoch;
342        self.current_epoch_participation = HashMap::new();
343    }
344
345    fn current_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
346        self.current_epoch_participation
347            .iter()
348            .map(|(key, (leader, proposed))| {
349                (
350                    key.clone(),
351                    if *leader == 0 {
352                        0.0
353                    } else {
354                        *proposed as f64 / *leader as f64
355                    },
356                )
357            })
358            .collect()
359    }
360    fn proposal_participation(&self, epoch: EpochNumber) -> HashMap<TYPES::SignatureKey, f64> {
361        let tracked_participation = if epoch == self.epoch {
362            self.current_epoch_participation.clone()
363        } else {
364            self.previous_epoch_participation
365                .get(&epoch)
366                .unwrap_or(&HashMap::new())
367                .clone()
368        };
369
370        tracked_participation
371            .iter()
372            .map(|(key, (leader, proposed))| {
373                (
374                    key.clone(),
375                    if *leader == 0 {
376                        0.0
377                    } else {
378                        *proposed as f64 / *leader as f64
379                    },
380                )
381            })
382            .collect()
383    }
384
385    fn current_epoch(&self) -> EpochNumber {
386        self.epoch
387    }
388}
389
390type VoteParticipationMap<TYPES> = (
391    HashMap<<<TYPES as NodeType>::SignatureKey as SignatureKey>::VerificationKeyType, u64>,
392    u64,
393);
394
395#[derive(Clone, Debug)]
396struct VoteParticipation<TYPES: NodeType> {
397    /// Current epoch
398    epoch: Option<EpochNumber>,
399
400    /// Current stake_table
401    stake_table: HSStakeTable<TYPES>,
402
403    /// Success threshold
404    success_threshold: U256,
405
406    /// Set of views in the current epoch
407    view_set: HashSet<ViewNumber>,
408
409    /// Number of views in the current epoch
410    current_epoch_num_views: u64,
411
412    /// Current epoch participation by key maps key -> num times voted
413    current_epoch_participation:
414        HashMap<<TYPES::SignatureKey as SignatureKey>::VerificationKeyType, u64>,
415
416    /// Previous epochs participation ordered by epoch number, maps epoch -> key -> num times voted
417    previous_epoch_participation: BTreeMap<Option<EpochNumber>, VoteParticipationMap<TYPES>>,
418}
419
420impl<TYPES: NodeType> VoteParticipation<TYPES> {
421    fn new(
422        stake_table: HSStakeTable<TYPES>,
423        success_threshold: U256,
424        epoch: Option<EpochNumber>,
425    ) -> Self {
426        let current_epoch_participation: HashMap<_, _> = stake_table
427            .iter()
428            .map({
429                |peer_config| {
430                    (
431                        peer_config
432                            .stake_table_entry
433                            .public_key()
434                            .to_verification_key(),
435                        0u64,
436                    )
437                }
438            })
439            .collect();
440        Self {
441            epoch,
442            stake_table,
443            success_threshold,
444            view_set: HashSet::new(),
445            current_epoch_num_views: 0u64,
446            current_epoch_participation,
447            previous_epoch_participation: BTreeMap::new(),
448        }
449    }
450
451    fn update_participation(&mut self, qc: QuorumCertificate2<TYPES>) -> Result<()> {
452        ensure!(
453            qc.epoch() == self.epoch,
454            info!(
455                "Incorrect epoch while updating vote participation, current epoch: {:?}, QC epoch \
456                 {:?}",
457                self.epoch,
458                qc.epoch()
459            )
460        );
461        ensure!(
462            !self.view_set.contains(&qc.view_number()),
463            info!(
464                "Participation for view {} already updated",
465                qc.view_number()
466            )
467        );
468        let signers = qc
469            .signers(
470                &StakeTableEntries::<TYPES>::from(self.stake_table.clone()).0,
471                self.success_threshold,
472            )
473            .context(|e| warn!("Tracing signers: {e}"))?;
474        for vk in signers {
475            let Some(votes) = self.current_epoch_participation.get_mut(&vk) else {
476                bail!(warn!(
477                    "Trying to update vote participation for unknown key: {:?}",
478                    vk
479                ));
480            };
481            *votes += 1;
482        }
483        self.view_set.insert(qc.view_number());
484        self.current_epoch_num_views += 1;
485        Ok(())
486    }
487
488    fn update_participation_epoch(
489        &mut self,
490        stake_table: HSStakeTable<TYPES>,
491        success_threshold: U256,
492        epoch: Option<EpochNumber>,
493    ) -> Result<()> {
494        ensure!(
495            epoch >= self.epoch,
496            warn!(
497                "New epoch less than current epoch while updating vote participation epoch, \
498                 current epoch: {:?}, new epoch {:?}",
499                self.epoch, epoch
500            )
501        );
502        // Same epoch, do nothing
503        if epoch == self.epoch {
504            return Ok(());
505        }
506
507        self.previous_epoch_participation.insert(
508            self.epoch,
509            (
510                self.current_epoch_participation.clone(),
511                self.current_epoch_num_views,
512            ),
513        );
514
515        self.previous_epoch_participation = self.previous_epoch_participation.split_off(
516            &self
517                .epoch
518                .map(|e| EpochNumber::new(e.saturating_sub(EPOCH_PARTICIPATION_HISTORY))),
519        );
520
521        self.previous_epoch_participation.insert(
522            self.epoch,
523            (
524                self.current_epoch_participation.clone(),
525                self.current_epoch_num_views,
526            ),
527        );
528
529        self.previous_epoch_participation = self.previous_epoch_participation.split_off(
530            &self
531                .epoch
532                .map(|e| EpochNumber::new(e.saturating_sub(EPOCH_PARTICIPATION_HISTORY))),
533        );
534
535        self.epoch = epoch;
536        self.current_epoch_num_views = 0;
537        self.view_set = HashSet::new();
538        let current_epoch_participation: HashMap<_, _> = stake_table
539            .iter()
540            .map({
541                |peer_config| {
542                    (
543                        peer_config
544                            .stake_table_entry
545                            .public_key()
546                            .to_verification_key(),
547                        0u64,
548                    )
549                }
550            })
551            .collect();
552        self.current_epoch_participation = current_epoch_participation;
553        self.stake_table = stake_table;
554        self.success_threshold = success_threshold;
555        Ok(())
556    }
557
558    fn current_vote_participation(
559        &self,
560    ) -> HashMap<<TYPES::SignatureKey as SignatureKey>::VerificationKeyType, f64> {
561        self.current_epoch_participation
562            .iter()
563            .map(|(key, votes)| {
564                (
565                    key.clone(),
566                    Self::calculate_ratio(votes, self.current_epoch_num_views),
567                )
568            })
569            .collect()
570    }
571    fn vote_participation(
572        &self,
573        epoch: Option<EpochNumber>,
574    ) -> HashMap<<TYPES::SignatureKey as SignatureKey>::VerificationKeyType, f64> {
575        let tracked_participation = if epoch == self.epoch {
576            (
577                self.current_epoch_participation.clone(),
578                self.current_epoch_num_views,
579            )
580        } else {
581            self.previous_epoch_participation
582                .get(&epoch)
583                .unwrap_or(&(HashMap::new(), 0))
584                .clone()
585        };
586
587        tracked_participation
588            .0
589            .iter()
590            .map(|(key, votes)| {
591                (
592                    key.clone(),
593                    Self::calculate_ratio(votes, tracked_participation.1),
594                )
595            })
596            .collect()
597    }
598
599    fn calculate_ratio(num_votes: &u64, total_views: u64) -> f64 {
600        if total_views == 0 {
601            0.0
602        } else {
603            *num_votes as f64 / total_views as f64
604        }
605    }
606
607    fn current_epoch(&self) -> Option<EpochNumber> {
608        self.epoch
609    }
610}
611
612/// A reference to the consensus algorithm
613///
614/// This will contain the state of all rounds.
615#[derive(derive_more::Debug, Clone)]
616pub struct Consensus<TYPES: NodeType> {
617    /// The validated states that are currently loaded in memory.
618    validated_state_map: BTreeMap<ViewNumber, View<TYPES>>,
619
620    /// All the VID shares we've received for current and future views.
621    vid_shares: VidShares<TYPES>,
622
623    /// All the DA certs we've received for current and future views.
624    /// view -> DA cert
625    saved_da_certs: HashMap<ViewNumber, DaCertificate2<TYPES>>,
626
627    /// View number that is currently on.
628    cur_view: ViewNumber,
629
630    /// Epoch number that is currently on.
631    cur_epoch: Option<EpochNumber>,
632
633    /// Last proposals we sent out, None if we haven't proposed yet.
634    /// Prevents duplicate proposals, and can be served to those trying to catchup
635    last_proposals: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
636
637    /// last view had a successful decide event
638    last_decided_view: ViewNumber,
639
640    /// The `locked_qc` view number
641    locked_view: ViewNumber,
642
643    /// Map of leaf hash -> leaf
644    /// - contains undecided leaves
645    /// - includes the MOST RECENT decided leaf
646    saved_leaves: CommitmentMap<Leaf2<TYPES>>,
647
648    /// Bundle of views which we performed the most recent action
649    /// visibible to the network.  Actions are votes and proposals
650    /// for DA and Quorum
651    last_actions: HotShotActionViews,
652
653    /// Saved payloads.
654    ///
655    /// Encoded transactions for every view if we got a payload for that view.
656    saved_payloads: BTreeMap<ViewNumber, Arc<PayloadWithMetadata<TYPES>>>,
657
658    /// the highqc per spec
659    high_qc: QuorumCertificate2<TYPES>,
660
661    /// The high QC for the next epoch
662    next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
663
664    /// Track the participation of each validator in the current epoch and previous epoch
665    validator_participation: ValidatorParticipation<TYPES>,
666
667    /// Track the vote participation of each node in the current epoch and previous epoch
668    vote_participation: VoteParticipation<TYPES>,
669
670    /// A reference to the metrics trait
671    pub metrics: Arc<ConsensusMetricsValue>,
672
673    /// Number of blocks in an epoch, zero means there are no epochs
674    pub epoch_height: u64,
675
676    /// Number of iterations for the DRB calculation, taken from HotShotConfig
677    pub drb_difficulty: u64,
678
679    /// Number of iterations for the DRB calculation post-difficulty upgrade, taken from HotShotConfig
680    pub drb_upgrade_difficulty: u64,
681
682    /// The transition QC for the current epoch
683    transition_qc: Option<(
684        QuorumCertificate2<TYPES>,
685        NextEpochQuorumCertificate2<TYPES>,
686    )>,
687
688    /// The highest block number that we have seen
689    pub highest_block: u64,
690    /// The light client state update certificate
691    pub state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
692}
693
694/// This struct holds a payload and its metadata
695#[derive(Debug, Clone, Hash, Eq, PartialEq)]
696pub struct PayloadWithMetadata<TYPES: NodeType> {
697    pub payload: TYPES::BlockPayload,
698    pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
699}
700
701/// Contains several `ConsensusMetrics` that we're interested in from the consensus interfaces
702#[derive(Clone, Debug)]
703pub struct ConsensusMetricsValue {
704    /// The number of last synced block height
705    pub last_synced_block_height: Box<dyn Gauge>,
706    /// The number of last decided view
707    pub last_decided_view: Box<dyn Gauge>,
708    /// The number of the last voted view
709    pub last_voted_view: Box<dyn Gauge>,
710    /// Number of timestamp for the last decided time
711    pub last_decided_time: Box<dyn Gauge>,
712    /// The current view
713    pub current_view: Box<dyn Gauge>,
714    /// Number of views that are in-flight since the last decided view
715    pub number_of_views_since_last_decide: Box<dyn Gauge>,
716    /// Number of views that are in-flight since the last anchor view
717    pub number_of_views_per_decide_event: Box<dyn Histogram>,
718    /// Duration of views as leader
719    pub view_duration_as_leader: Box<dyn Histogram>,
720    /// Number of invalid QCs we've seen since the last commit.
721    pub invalid_qc: Box<dyn Gauge>,
722    /// Number of outstanding transactions
723    pub outstanding_transactions: Box<dyn Gauge>,
724    /// Memory size in bytes of the serialized transactions still outstanding
725    pub outstanding_transactions_memory_size: Box<dyn Gauge>,
726    /// Number of views that timed out
727    pub number_of_timeouts: Box<dyn Counter>,
728    /// Number of views that timed out as leader
729    pub number_of_timeouts_as_leader: Box<dyn Counter>,
730    /// The number of empty blocks that have been proposed
731    pub number_of_empty_blocks_proposed: Box<dyn Counter>,
732    /// Number of events in the hotshot event queue
733    pub internal_event_queue_len: Box<dyn Gauge>,
734    /// Time from proposal creation to decide time
735    pub proposal_to_decide_time: Box<dyn Histogram>,
736    /// Time from proposal received to proposal creation
737    pub previous_proposal_to_proposal_time: Box<dyn Histogram>,
738    /// Finalized bytes per view
739    pub finalized_bytes: Box<dyn Histogram>,
740    /// The duration of the validate and apply header
741    pub validate_and_apply_header_duration: Box<dyn Histogram>,
742    /// The duration of update leaf
743    pub update_leaf_duration: Box<dyn Histogram>,
744    /// The time it took to calculate the disperse
745    pub vid_disperse_duration: Box<dyn Histogram>,
746}
747
748impl ConsensusMetricsValue {
749    /// Create a new instance of this [`ConsensusMetricsValue`] struct, setting all the counters and gauges
750    #[must_use]
751    pub fn new(metrics: &dyn Metrics) -> Self {
752        Self {
753            last_synced_block_height: metrics
754                .create_gauge(String::from("last_synced_block_height"), None),
755            last_decided_view: metrics.create_gauge(String::from("last_decided_view"), None),
756            last_voted_view: metrics.create_gauge(String::from("last_voted_view"), None),
757            last_decided_time: metrics.create_gauge(String::from("last_decided_time"), None),
758            current_view: metrics.create_gauge(String::from("current_view"), None),
759            number_of_views_since_last_decide: metrics
760                .create_gauge(String::from("number_of_views_since_last_decide"), None),
761            number_of_views_per_decide_event: metrics
762                .create_histogram(String::from("number_of_views_per_decide_event"), None),
763            view_duration_as_leader: metrics
764                .create_histogram(String::from("view_duration_as_leader"), None),
765            invalid_qc: metrics.create_gauge(String::from("invalid_qc"), None),
766            outstanding_transactions: metrics
767                .create_gauge(String::from("outstanding_transactions"), None),
768            outstanding_transactions_memory_size: metrics
769                .create_gauge(String::from("outstanding_transactions_memory_size"), None),
770            number_of_timeouts: metrics.create_counter(String::from("number_of_timeouts"), None),
771            number_of_timeouts_as_leader: metrics
772                .create_counter(String::from("number_of_timeouts_as_leader"), None),
773            number_of_empty_blocks_proposed: metrics
774                .create_counter(String::from("number_of_empty_blocks_proposed"), None),
775            internal_event_queue_len: metrics
776                .create_gauge(String::from("internal_event_queue_len"), None),
777            proposal_to_decide_time: metrics
778                .create_histogram(String::from("proposal_to_decide_time"), None),
779            previous_proposal_to_proposal_time: metrics
780                .create_histogram(String::from("previous_proposal_to_proposal_time"), None),
781            finalized_bytes: metrics.create_histogram(String::from("finalized_bytes"), None),
782            validate_and_apply_header_duration: metrics.create_histogram(
783                String::from("validate_and_apply_header_duration"),
784                Some("seconds".to_string()),
785            ),
786            update_leaf_duration: metrics.create_histogram(
787                String::from("update_leaf_duration"),
788                Some("seconds".to_string()),
789            ),
790            vid_disperse_duration: metrics.create_histogram(
791                String::from("vid_disperse_duration"),
792                Some("seconds".to_string()),
793            ),
794        }
795    }
796}
797
798impl Default for ConsensusMetricsValue {
799    fn default() -> Self {
800        Self::new(&*NoMetrics::boxed())
801    }
802}
803
804impl<TYPES: NodeType> Consensus<TYPES> {
805    /// Constructor.
806    #[allow(clippy::too_many_arguments)]
807    pub fn new(
808        validated_state_map: BTreeMap<ViewNumber, View<TYPES>>,
809        vid_shares: Option<VidShares<TYPES>>,
810        cur_view: ViewNumber,
811        cur_epoch: Option<EpochNumber>,
812        locked_view: ViewNumber,
813        last_decided_view: ViewNumber,
814        last_actioned_view: ViewNumber,
815        last_proposals: BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>>,
816        saved_leaves: CommitmentMap<Leaf2<TYPES>>,
817        saved_payloads: BTreeMap<ViewNumber, Arc<PayloadWithMetadata<TYPES>>>,
818        high_qc: QuorumCertificate2<TYPES>,
819        next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
820        metrics: Arc<ConsensusMetricsValue>,
821        epoch_height: u64,
822        state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
823        drb_difficulty: u64,
824        drb_upgrade_difficulty: u64,
825        stake_table: HSStakeTable<TYPES>,
826        success_threshold: U256,
827    ) -> Self {
828        let transition_qc = if let Some(ref next_epoch_high_qc) = next_epoch_high_qc {
829            if high_qc
830                .data
831                .block_number
832                .is_some_and(|bn| is_transition_block(bn, epoch_height))
833            {
834                if high_qc.data.leaf_commit == next_epoch_high_qc.data.leaf_commit {
835                    Some((high_qc.clone(), next_epoch_high_qc.clone()))
836                } else {
837                    tracing::error!("Next epoch high QC has different leaf commit to high QC");
838                    None
839                }
840            } else {
841                None
842            }
843        } else {
844            None
845        };
846        Consensus {
847            validated_state_map,
848            vid_shares: vid_shares.unwrap_or_default(),
849            saved_da_certs: HashMap::new(),
850            cur_view,
851            cur_epoch,
852            last_decided_view,
853            last_proposals,
854            last_actions: HotShotActionViews::from_view(last_actioned_view),
855            locked_view,
856            saved_leaves,
857            saved_payloads,
858            high_qc,
859            next_epoch_high_qc,
860            metrics,
861            epoch_height,
862            transition_qc,
863            highest_block: 0,
864            state_cert,
865            drb_difficulty,
866            validator_participation: ValidatorParticipation::new(),
867            vote_participation: VoteParticipation::new(stake_table, success_threshold, cur_epoch),
868            drb_upgrade_difficulty,
869        }
870    }
871
872    /// Get the current view.
873    pub fn cur_view(&self) -> ViewNumber {
874        self.cur_view
875    }
876
877    /// Get the current epoch.
878    pub fn cur_epoch(&self) -> Option<EpochNumber> {
879        self.cur_epoch
880    }
881
882    /// Get the last decided view.
883    pub fn last_decided_view(&self) -> ViewNumber {
884        self.last_decided_view
885    }
886
887    /// Get the locked view.
888    pub fn locked_view(&self) -> ViewNumber {
889        self.locked_view
890    }
891
892    /// Get the high QC.
893    pub fn high_qc(&self) -> &QuorumCertificate2<TYPES> {
894        &self.high_qc
895    }
896
897    /// Get the transition QC.
898    pub fn transition_qc(
899        &self,
900    ) -> Option<&(
901        QuorumCertificate2<TYPES>,
902        NextEpochQuorumCertificate2<TYPES>,
903    )> {
904        self.transition_qc.as_ref()
905    }
906
907    ///Update the highest block number
908    pub fn update_highest_block(&mut self, block_number: u64) {
909        if block_number > self.highest_block {
910            self.highest_block = block_number;
911            return;
912        }
913
914        if is_epoch_transition(block_number, self.epoch_height) {
915            let new_epoch = epoch_from_block_number(block_number, self.epoch_height);
916            let high_epoch = epoch_from_block_number(self.highest_block, self.epoch_height);
917            if new_epoch >= high_epoch {
918                self.highest_block = block_number;
919            }
920        }
921    }
922
923    /// Update the transition QC.
924    pub fn update_transition_qc(
925        &mut self,
926        qc: QuorumCertificate2<TYPES>,
927        next_epoch_qc: NextEpochQuorumCertificate2<TYPES>,
928    ) {
929        if next_epoch_qc.data.leaf_commit != qc.data().leaf_commit {
930            tracing::error!(
931                "Next epoch QC for view {} has different leaf commit {:?} to {:?}",
932                qc.view_number(),
933                next_epoch_qc.data.leaf_commit,
934                qc.data().leaf_commit
935            );
936            return;
937        }
938        if let Some((transition_qc, _)) = &self.transition_qc
939            && transition_qc.view_number() >= qc.view_number()
940        {
941            return;
942        }
943        self.transition_qc = Some((qc, next_epoch_qc));
944    }
945
946    /// Get the current light client state certificate
947    pub fn state_cert(&self) -> Option<&LightClientStateUpdateCertificateV2<TYPES>> {
948        self.state_cert.as_ref()
949    }
950
951    /// Get the next epoch high QC.
952    pub fn next_epoch_high_qc(&self) -> Option<&NextEpochQuorumCertificate2<TYPES>> {
953        self.next_epoch_high_qc.as_ref()
954    }
955
956    /// Get the validated state map.
957    pub fn validated_state_map(&self) -> &BTreeMap<ViewNumber, View<TYPES>> {
958        &self.validated_state_map
959    }
960
961    /// Get the saved leaves.
962    pub fn saved_leaves(&self) -> &CommitmentMap<Leaf2<TYPES>> {
963        &self.saved_leaves
964    }
965
966    /// Get the saved payloads.
967    pub fn saved_payloads(&self) -> &BTreeMap<ViewNumber, Arc<PayloadWithMetadata<TYPES>>> {
968        &self.saved_payloads
969    }
970
971    /// Get the vid shares.
972    pub fn vid_shares(&self) -> &VidShares<TYPES> {
973        &self.vid_shares
974    }
975
976    /// Get the saved DA certs.
977    pub fn saved_da_certs(&self) -> &HashMap<ViewNumber, DaCertificate2<TYPES>> {
978        &self.saved_da_certs
979    }
980
981    /// Get the map of our recent proposals
982    pub fn last_proposals(
983        &self,
984    ) -> &BTreeMap<ViewNumber, Proposal<TYPES, QuorumProposalWrapper<TYPES>>> {
985        &self.last_proposals
986    }
987
988    /// Update the current view.
989    /// # Errors
990    /// Can return an error when the new view_number is not higher than the existing view number.
991    pub fn update_view(&mut self, view_number: ViewNumber) -> Result<()> {
992        ensure!(
993            view_number > self.cur_view,
994            debug!("New view isn't newer than the current view.")
995        );
996        self.cur_view = view_number;
997        Ok(())
998    }
999
1000    /// Update the validator participation
1001    pub fn update_validator_participation(
1002        &mut self,
1003        key: TYPES::SignatureKey,
1004        epoch: EpochNumber,
1005        proposed: bool,
1006    ) {
1007        self.validator_participation
1008            .update_participation(key, epoch, proposed);
1009    }
1010
1011    /// Update the validator participation epoch
1012    pub fn update_validator_participation_epoch(&mut self, epoch: EpochNumber) {
1013        self.validator_participation
1014            .update_participation_epoch(epoch);
1015    }
1016
1017    /// Get the current proposal participation
1018    pub fn current_proposal_participation(&self) -> HashMap<TYPES::SignatureKey, f64> {
1019        self.validator_participation
1020            .current_proposal_participation()
1021    }
1022
1023    /// Get the current proposal participation epoch
1024    pub fn current_proposal_participation_epoch(&self) -> EpochNumber {
1025        self.validator_participation.current_epoch()
1026    }
1027
1028    /// Get the proposal participation for a given epoch
1029    pub fn proposal_participation(&self, epoch: EpochNumber) -> HashMap<TYPES::SignatureKey, f64> {
1030        self.validator_participation.proposal_participation(epoch)
1031    }
1032
1033    /// Update the vote participation
1034    pub fn update_vote_participation(&mut self, qc: QuorumCertificate2<TYPES>) -> Result<()> {
1035        self.vote_participation.update_participation(qc)
1036    }
1037
1038    /// Update the vote participation epoch
1039    pub fn update_vote_participation_epoch(
1040        &mut self,
1041        stake_table: HSStakeTable<TYPES>,
1042        success_threshold: U256,
1043        epoch: Option<EpochNumber>,
1044    ) -> Result<()> {
1045        self.vote_participation
1046            .update_participation_epoch(stake_table, success_threshold, epoch)
1047    }
1048
1049    /// Get the current vote participation
1050    pub fn current_vote_participation(
1051        &self,
1052    ) -> HashMap<<TYPES::SignatureKey as SignatureKey>::VerificationKeyType, f64> {
1053        self.vote_participation.current_vote_participation()
1054    }
1055
1056    /// Get the current vote participation
1057    pub fn current_vote_participation_epoch(&self) -> Option<EpochNumber> {
1058        self.vote_participation.current_epoch()
1059    }
1060
1061    /// Get the previous vote participation
1062    pub fn vote_participation(
1063        &self,
1064        epoch: Option<EpochNumber>,
1065    ) -> HashMap<<TYPES::SignatureKey as SignatureKey>::VerificationKeyType, f64> {
1066        self.vote_participation.vote_participation(epoch)
1067    }
1068
1069    /// Get the parent Leaf Info from a given leaf and our public key.
1070    /// Returns None if we don't have the data in out state
1071    pub async fn parent_leaf_info(
1072        &self,
1073        leaf: &Leaf2<TYPES>,
1074        public_key: &TYPES::SignatureKey,
1075    ) -> Option<LeafInfo<TYPES>> {
1076        let parent_view_number = leaf.justify_qc().view_number();
1077        let parent_epoch = leaf.justify_qc().epoch();
1078        let parent_leaf = self
1079            .saved_leaves
1080            .get(&leaf.justify_qc().data().leaf_commit)?;
1081        let parent_state_and_delta = self.state_and_delta(parent_view_number);
1082        let (Some(state), delta) = parent_state_and_delta else {
1083            return None;
1084        };
1085
1086        let parent_vid = self
1087            .vid_shares()
1088            .get(&parent_view_number)
1089            .and_then(|key_map| key_map.get(public_key))
1090            .and_then(|epoch_map| epoch_map.get(&parent_epoch))
1091            .map(|prop| prop.data.clone());
1092
1093        let state_cert = if parent_leaf.with_epoch
1094            && is_epoch_root(parent_leaf.block_header().block_number(), self.epoch_height)
1095        {
1096            match self.state_cert() {
1097                // Sanity check that the state cert is for the same view as the parent leaf
1098                Some(state_cert)
1099                    if state_cert.light_client_state.view_number == parent_view_number.u64() =>
1100                {
1101                    Some(state_cert.clone())
1102                },
1103                _ => None,
1104            }
1105        } else {
1106            None
1107        };
1108
1109        Some(LeafInfo {
1110            leaf: parent_leaf.clone(),
1111            state,
1112            delta,
1113            vid_share: parent_vid,
1114            state_cert,
1115        })
1116    }
1117
1118    /// Update the current epoch.
1119    /// # Errors
1120    /// Can return an error when the new epoch_number is not higher than the existing epoch number.
1121    pub fn update_epoch(&mut self, epoch_number: EpochNumber) -> Result<()> {
1122        ensure!(
1123            self.cur_epoch.is_none() || Some(epoch_number) > self.cur_epoch,
1124            debug!("New epoch isn't newer than the current epoch.")
1125        );
1126        tracing::trace!(
1127            "Updating epoch from {:?} to {}",
1128            self.cur_epoch,
1129            epoch_number
1130        );
1131        self.cur_epoch = Some(epoch_number);
1132        Ok(())
1133    }
1134
1135    /// Update the last actioned view internally for votes and proposals
1136    ///
1137    /// Returns true if the action is for a newer view than the last action of that type
1138    pub fn update_action(&mut self, action: HotShotAction, view: ViewNumber) -> bool {
1139        let old_view = match action {
1140            HotShotAction::Vote => &mut self.last_actions.voted,
1141            HotShotAction::Propose => &mut self.last_actions.proposed,
1142            HotShotAction::DaPropose => &mut self.last_actions.da_proposed,
1143            HotShotAction::DaVote => {
1144                if view > self.last_actions.da_vote {
1145                    self.last_actions.da_vote = view;
1146                }
1147                // TODO Add logic to prevent double voting.  For now the simple check if
1148                // the last voted view is less than the view we are trying to vote doesn't work
1149                // because the leader of view n + 1 may propose to the DA (and we would vote)
1150                // before the leader of view n.
1151                return true;
1152            },
1153            _ => return true,
1154        };
1155        if view > *old_view {
1156            *old_view = view;
1157            return true;
1158        }
1159        false
1160    }
1161
1162    /// reset last actions to genesis so we can resend events in tests
1163    pub fn reset_actions(&mut self) {
1164        self.last_actions = HotShotActionViews::default();
1165    }
1166
1167    /// Update the last proposal.
1168    ///
1169    /// # Errors
1170    /// Can return an error when the new view_number is not higher than the existing proposed view number.
1171    pub fn update_proposed_view(
1172        &mut self,
1173        proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1174    ) -> Result<()> {
1175        ensure!(
1176            proposal.data.view_number()
1177                > self
1178                    .last_proposals
1179                    .last_key_value()
1180                    .map_or(ViewNumber::genesis(), |(k, _)| { *k }),
1181            debug!("New view isn't newer than the previously proposed view.")
1182        );
1183        self.last_proposals
1184            .insert(proposal.data.view_number(), proposal);
1185        Ok(())
1186    }
1187
1188    /// Update the last decided view.
1189    ///
1190    /// # Errors
1191    /// Can return an error when the new view_number is not higher than the existing decided view number.
1192    pub fn update_last_decided_view(&mut self, view_number: ViewNumber) -> Result<()> {
1193        ensure!(
1194            view_number > self.last_decided_view,
1195            debug!("New view isn't newer than the previously decided view.")
1196        );
1197        self.last_decided_view = view_number;
1198        Ok(())
1199    }
1200
1201    /// Update the locked view.
1202    ///
1203    /// # Errors
1204    /// Can return an error when the new view_number is not higher than the existing locked view number.
1205    pub fn update_locked_view(&mut self, view_number: ViewNumber) -> Result<()> {
1206        ensure!(
1207            view_number > self.locked_view,
1208            debug!("New view isn't newer than the previously locked view.")
1209        );
1210        self.locked_view = view_number;
1211        Ok(())
1212    }
1213
1214    /// Update the validated state map with a new view_number/view combo.
1215    ///
1216    /// # Errors
1217    /// Can return an error when the new view contains less information than the existing view
1218    /// with the same view number.
1219    pub fn update_da_view(
1220        &mut self,
1221        view_number: ViewNumber,
1222        epoch: Option<EpochNumber>,
1223        payload_commitment: VidCommitment,
1224    ) -> Result<()> {
1225        let view = View {
1226            view_inner: ViewInner::Da {
1227                payload_commitment,
1228                epoch,
1229            },
1230        };
1231        self.update_validated_state_map(view_number, view)
1232    }
1233
1234    /// Update the validated state map with a new view_number/view combo.
1235    ///
1236    /// # Errors
1237    /// Can return an error when the new view contains less information than the existing view
1238    /// with the same view number.
1239    pub fn update_leaf(
1240        &mut self,
1241        leaf: Leaf2<TYPES>,
1242        state: Arc<TYPES::ValidatedState>,
1243        delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1244    ) -> Result<()> {
1245        let view_number = leaf.view_number();
1246        let epoch =
1247            option_epoch_from_block_number(leaf.with_epoch, leaf.height(), self.epoch_height);
1248        let view = View {
1249            view_inner: ViewInner::Leaf {
1250                leaf: leaf.commit(),
1251                state,
1252                delta,
1253                epoch,
1254            },
1255        };
1256        self.update_validated_state_map(view_number, view)?;
1257        self.update_saved_leaves(leaf);
1258        Ok(())
1259    }
1260
1261    /// Update the validated state map with a new view_number/view combo.
1262    ///
1263    /// # Errors
1264    /// Can return an error when the new view contains less information than the existing view
1265    /// with the same view number.
1266    fn update_validated_state_map(
1267        &mut self,
1268        view_number: ViewNumber,
1269        new_view: View<TYPES>,
1270    ) -> Result<()> {
1271        if let Some(existing_view) = self.validated_state_map().get(&view_number)
1272            && let ViewInner::Leaf {
1273                delta: ref existing_delta,
1274                ..
1275            } = existing_view.view_inner
1276        {
1277            if let ViewInner::Leaf {
1278                delta: ref new_delta,
1279                ..
1280            } = new_view.view_inner
1281            {
1282                ensure!(
1283                    new_delta.is_some() || existing_delta.is_none(),
1284                    debug!(
1285                        "Skipping the state update to not override a `Leaf` view with `Some` \
1286                         state delta."
1287                    )
1288                );
1289            } else {
1290                bail!(
1291                    "Skipping the state update to not override a `Leaf` view with a non-`Leaf` \
1292                     view."
1293                );
1294            }
1295        }
1296        self.validated_state_map.insert(view_number, new_view);
1297        Ok(())
1298    }
1299
1300    /// Update the saved leaves with a new leaf.
1301    fn update_saved_leaves(&mut self, leaf: Leaf2<TYPES>) {
1302        self.saved_leaves.insert(leaf.commit(), leaf);
1303    }
1304
1305    /// Update the saved payloads with a new encoded transaction.
1306    ///
1307    /// # Errors
1308    /// Can return an error when there's an existing payload corresponding to the same view number.
1309    pub fn update_saved_payloads(
1310        &mut self,
1311        view_number: ViewNumber,
1312        payload: Arc<PayloadWithMetadata<TYPES>>,
1313    ) -> Result<()> {
1314        ensure!(
1315            !self.saved_payloads.contains_key(&view_number),
1316            "Payload with the same view already exists."
1317        );
1318        self.saved_payloads.insert(view_number, payload);
1319        Ok(())
1320    }
1321
1322    /// Update the high QC if given a newer one.
1323    /// # Errors
1324    /// Can return an error when the provided high_qc is not newer than the existing entry.
1325    pub fn update_high_qc(&mut self, high_qc: QuorumCertificate2<TYPES>) -> Result<()> {
1326        if self.high_qc == high_qc {
1327            return Ok(());
1328        }
1329        // make sure the we don't update the high QC unless is't a higher view
1330        ensure!(
1331            high_qc.view_number > self.high_qc.view_number,
1332            debug!("High QC with an equal or higher view exists.")
1333        );
1334        tracing::debug!("Updating high QC");
1335        self.high_qc = high_qc;
1336
1337        Ok(())
1338    }
1339
1340    /// Update the next epoch high QC if given a newer one.
1341    /// # Errors
1342    /// Can return an error when the provided high_qc is not newer than the existing entry.
1343    /// # Panics
1344    /// It can't actually panic. If the option is None, we will not call unwrap on it.
1345    pub fn update_next_epoch_high_qc(
1346        &mut self,
1347        high_qc: NextEpochQuorumCertificate2<TYPES>,
1348    ) -> Result<()> {
1349        if self.next_epoch_high_qc.as_ref() == Some(&high_qc) {
1350            return Ok(());
1351        }
1352        if let Some(next_epoch_high_qc) = self.next_epoch_high_qc() {
1353            ensure!(
1354                high_qc.view_number > next_epoch_high_qc.view_number,
1355                debug!("Next epoch high QC with an equal or higher view exists.")
1356            );
1357        }
1358        tracing::debug!("Updating next epoch high QC");
1359        self.next_epoch_high_qc = Some(high_qc);
1360
1361        Ok(())
1362    }
1363
1364    /// Resets high qc and next epoch qc to the provided transition qc.
1365    /// # Errors
1366    /// Can return an error when the provided high_qc is not newer than the existing entry.
1367    pub fn reset_high_qc(
1368        &mut self,
1369        high_qc: QuorumCertificate2<TYPES>,
1370        next_epoch_qc: NextEpochQuorumCertificate2<TYPES>,
1371    ) -> Result<()> {
1372        ensure!(
1373            high_qc.data.leaf_commit == next_epoch_qc.data.leaf_commit,
1374            error!("High QC's and next epoch QC's leaf commits do not match.")
1375        );
1376        if self.high_qc == high_qc {
1377            return Ok(());
1378        }
1379        let same_epoch = high_qc.data.block_number.is_some_and(|bn| {
1380            let current_qc = self.high_qc();
1381            let Some(high_bn) = current_qc.data.block_number else {
1382                return false;
1383            };
1384            epoch_from_block_number(bn + 1, self.epoch_height)
1385                == epoch_from_block_number(high_bn + 1, self.epoch_height)
1386        });
1387        ensure!(
1388            high_qc
1389                .data
1390                .block_number
1391                .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
1392                && same_epoch,
1393            error!("Provided QC is not a transition QC.")
1394        );
1395        tracing::debug!("Resetting high QC and next epoch high QC");
1396        self.high_qc = high_qc;
1397        self.next_epoch_high_qc = Some(next_epoch_qc);
1398
1399        Ok(())
1400    }
1401
1402    /// Update the light client state update certificate if given a newer one.
1403    /// # Errors
1404    /// Can return an error when the provided state_cert is not newer than the existing entry.
1405    pub fn update_state_cert(
1406        &mut self,
1407        state_cert: LightClientStateUpdateCertificateV2<TYPES>,
1408    ) -> Result<()> {
1409        if let Some(existing_state_cert) = &self.state_cert {
1410            ensure!(
1411                state_cert.epoch > existing_state_cert.epoch,
1412                debug!(
1413                    "Light client state update certification with an equal or higher epoch exists."
1414                )
1415            );
1416        }
1417        tracing::debug!("Updating light client state update certification");
1418        self.state_cert = Some(state_cert);
1419
1420        Ok(())
1421    }
1422
1423    /// Add a new entry to the vid_shares map.
1424    pub fn update_vid_shares(
1425        &mut self,
1426        view_number: ViewNumber,
1427        disperse: Proposal<TYPES, VidDisperseShare<TYPES>>,
1428    ) {
1429        self.vid_shares
1430            .entry(view_number)
1431            .or_default()
1432            .entry(disperse.data.recipient_key().clone())
1433            .or_default()
1434            .insert(disperse.data.target_epoch(), disperse);
1435    }
1436
1437    /// Add a new entry to the da_certs map.
1438    pub fn update_saved_da_certs(&mut self, view_number: ViewNumber, cert: DaCertificate2<TYPES>) {
1439        self.saved_da_certs.insert(view_number, cert);
1440    }
1441
1442    /// gather information from the parent chain of leaves
1443    /// # Errors
1444    /// If the leaf or its ancestors are not found in storage
1445    pub fn visit_leaf_ancestors<F>(
1446        &self,
1447        start_from: ViewNumber,
1448        terminator: Terminator<ViewNumber>,
1449        ok_when_finished: bool,
1450        mut f: F,
1451    ) -> std::result::Result<(), HotShotError<TYPES>>
1452    where
1453        F: FnMut(
1454            &Leaf2<TYPES>,
1455            Arc<<TYPES as NodeType>::ValidatedState>,
1456            Option<Arc<<<TYPES as NodeType>::ValidatedState as ValidatedState<TYPES>>::Delta>>,
1457        ) -> bool,
1458    {
1459        let mut next_leaf = if let Some(view) = self.validated_state_map.get(&start_from) {
1460            view.leaf_commitment().ok_or_else(|| {
1461                HotShotError::InvalidState(format!(
1462                    "Visited failed view {start_from} leaf. Expected successful leaf"
1463                ))
1464            })?
1465        } else {
1466            return Err(HotShotError::InvalidState(format!(
1467                "View {start_from} leaf does not exist in state map "
1468            )));
1469        };
1470
1471        while let Some(leaf) = self.saved_leaves.get(&next_leaf) {
1472            let view = leaf.view_number();
1473            if let (Some(state), delta) = self.state_and_delta(view) {
1474                if let Terminator::Exclusive(stop_before) = terminator
1475                    && stop_before == view
1476                {
1477                    if ok_when_finished {
1478                        return Ok(());
1479                    }
1480                    break;
1481                }
1482                next_leaf = leaf.parent_commitment();
1483                if !f(leaf, state, delta) {
1484                    return Ok(());
1485                }
1486                if let Terminator::Inclusive(stop_after) = terminator
1487                    && stop_after == view
1488                {
1489                    if ok_when_finished {
1490                        return Ok(());
1491                    }
1492                    break;
1493                }
1494            } else {
1495                return Err(HotShotError::InvalidState(format!(
1496                    "View {view} state does not exist in state map"
1497                )));
1498            }
1499        }
1500        Err(HotShotError::MissingLeaf(next_leaf))
1501    }
1502
1503    /// Garbage collects based on state change right now, this removes from both the
1504    /// `saved_payloads` and `validated_state_map` fields of `Consensus`.
1505    /// # Panics
1506    /// On inconsistent stored entries
1507    pub fn collect_garbage(&mut self, old_anchor_view: ViewNumber, new_anchor_view: ViewNumber) {
1508        // Nothing to collect
1509        if new_anchor_view <= old_anchor_view {
1510            return;
1511        }
1512        let gc_view = ViewNumber::new(new_anchor_view.saturating_sub(1));
1513        // state check
1514        let anchor_entry = self
1515            .validated_state_map
1516            .iter()
1517            .next()
1518            .expect("INCONSISTENT STATE: anchor leaf not in state map!");
1519        if **anchor_entry.0 != old_anchor_view.saturating_sub(1) {
1520            tracing::info!(
1521                "Something about GC has failed. Older leaf exists than the previous anchor leaf."
1522            );
1523        }
1524        // perform gc
1525        self.saved_da_certs
1526            .retain(|view_number, _| *view_number >= old_anchor_view);
1527        self.validated_state_map
1528            .range(..gc_view)
1529            .filter_map(|(_view_number, view)| view.leaf_commitment())
1530            .for_each(|leaf| {
1531                self.saved_leaves.remove(&leaf);
1532            });
1533        self.validated_state_map = self.validated_state_map.split_off(&gc_view);
1534        self.saved_payloads = self.saved_payloads.split_off(&gc_view);
1535        self.vid_shares = self.vid_shares.split_off(&gc_view);
1536        self.last_proposals = self.last_proposals.split_off(&gc_view);
1537    }
1538
1539    /// Gets the last decided leaf.
1540    ///
1541    /// # Panics
1542    /// if the last decided view's leaf does not exist in the state map or saved leaves, which
1543    /// should never happen.
1544    #[must_use]
1545    pub fn decided_leaf(&self) -> Leaf2<TYPES> {
1546        let decided_view_num = self.last_decided_view;
1547        let view = self.validated_state_map.get(&decided_view_num).unwrap();
1548        let leaf = view
1549            .leaf_commitment()
1550            .expect("Decided leaf not found! Consensus internally inconsistent");
1551        self.saved_leaves.get(&leaf).unwrap().clone()
1552    }
1553
1554    pub fn undecided_leaves(&self) -> Vec<Leaf2<TYPES>> {
1555        self.saved_leaves.values().cloned().collect::<Vec<_>>()
1556    }
1557
1558    /// Gets the validated state with the given view number, if in the state map.
1559    #[must_use]
1560    pub fn state(&self, view_number: ViewNumber) -> Option<&Arc<TYPES::ValidatedState>> {
1561        match self.validated_state_map.get(&view_number) {
1562            Some(view) => view.state(),
1563            None => None,
1564        }
1565    }
1566
1567    /// Gets the validated state and state delta with the given view number, if in the state map.
1568    #[must_use]
1569    pub fn state_and_delta(&self, view_number: ViewNumber) -> StateAndDelta<TYPES> {
1570        match self.validated_state_map.get(&view_number) {
1571            Some(view) => view.state_and_delta(),
1572            None => (None, None),
1573        }
1574    }
1575
1576    /// Gets the last decided validated state.
1577    ///
1578    /// # Panics
1579    /// If the last decided view's state does not exist in the state map, which should never
1580    /// happen.
1581    #[must_use]
1582    pub fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
1583        let decided_view_num = self.last_decided_view;
1584        self.state_and_delta(decided_view_num)
1585            .0
1586            .expect("Decided state not found! Consensus internally inconsistent")
1587    }
1588
1589    /// Associated helper function:
1590    /// Takes `LockedConsensusState` which will be updated; locks it for read and write accordingly.
1591    /// Calculates `VidDisperse` based on the view, the txns and the membership,
1592    /// and updates `vid_shares` map with the signed `VidDisperseShare` proposals.
1593    /// Returned `Option` indicates whether the update has actually happened or not.
1594    #[instrument(skip_all, target = "Consensus", fields(view = *view))]
1595    pub async fn calculate_and_update_vid(
1596        consensus: OuterConsensus<TYPES>,
1597        view: ViewNumber,
1598        target_epoch: Option<EpochNumber>,
1599        membership_coordinator: EpochMembershipCoordinator<TYPES>,
1600        private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
1601        upgrade_lock: &UpgradeLock<TYPES>,
1602    ) -> Option<()> {
1603        let payload_with_metadata = Arc::clone(consensus.read().await.saved_payloads().get(&view)?);
1604        let epoch = consensus
1605            .read()
1606            .await
1607            .validated_state_map()
1608            .get(&view)?
1609            .view_inner
1610            .epoch()?;
1611
1612        let VidDisperseAndDuration {
1613            disperse: vid,
1614            duration: disperse_duration,
1615        } = VidDisperse::calculate_vid_disperse(
1616            &payload_with_metadata.payload,
1617            &membership_coordinator,
1618            view,
1619            target_epoch,
1620            epoch,
1621            &payload_with_metadata.metadata,
1622            upgrade_lock,
1623        )
1624        .await
1625        .ok()?;
1626
1627        let mut consensus_writer = consensus.write().await;
1628        consensus_writer
1629            .metrics
1630            .vid_disperse_duration
1631            .add_point(disperse_duration.as_secs_f64());
1632        for share in vid.to_shares() {
1633            if let Some(prop) = share.to_proposal(private_key) {
1634                consensus_writer.update_vid_shares(view, prop);
1635            }
1636        }
1637
1638        Some(())
1639    }
1640    /// Returns true if a given leaf is for the epoch transition
1641    pub fn is_epoch_transition(&self, leaf_commit: LeafCommitment<TYPES>) -> bool {
1642        let Some(leaf) = self.saved_leaves.get(&leaf_commit) else {
1643            tracing::trace!("We don't have a leaf corresponding to the leaf commit");
1644            return false;
1645        };
1646        let block_height = leaf.height();
1647        is_epoch_transition(block_height, self.epoch_height)
1648    }
1649
1650    /// Returns true if our high QC is for one of the epoch transition blocks
1651    pub fn is_high_qc_for_epoch_transition(&self) -> bool {
1652        let Some(block_height) = self.high_qc().data.block_number else {
1653            return false;
1654        };
1655        is_epoch_transition(block_height, self.epoch_height)
1656    }
1657
1658    /// Returns true if the `parent_leaf` formed an eQC for the previous epoch to the `proposed_leaf`
1659    pub fn check_eqc(&self, proposed_leaf: &Leaf2<TYPES>, parent_leaf: &Leaf2<TYPES>) -> bool {
1660        if parent_leaf.view_number() == ViewNumber::genesis() {
1661            return true;
1662        }
1663        let new_epoch = epoch_from_block_number(proposed_leaf.height(), self.epoch_height);
1664        let old_epoch = epoch_from_block_number(parent_leaf.height(), self.epoch_height);
1665
1666        new_epoch - 1 == old_epoch && is_last_block(parent_leaf.height(), self.epoch_height)
1667    }
1668}
1669
1670/// Alias for the block payload commitment and the associated metadata. The primary data
1671/// needed in order to submit a proposal.
1672#[derive(Eq, PartialEq, Debug, Clone)]
1673pub struct CommitmentAndMetadata<TYPES: NodeType> {
1674    /// Vid Commitment
1675    pub commitment: VidCommitment,
1676    /// Builder Commitment
1677    pub builder_commitment: BuilderCommitment,
1678    /// Metadata for the block payload
1679    pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
1680    /// Builder fee data
1681    pub fees: Vec1<BuilderFee<TYPES>>,
1682    /// View number this block is for
1683    pub block_view: ViewNumber,
1684}