Skip to main content

hotshot_new_protocol/
consensus.rs

1use std::{
2    cmp::max,
3    collections::{BTreeMap, BTreeSet},
4    marker::PhantomData,
5    sync::Arc,
6};
7
8use committable::{Commitment, Committable};
9use hotshot::traits::BlockPayload;
10use hotshot_contract_adapter::light_client::derive_signed_state_digest;
11use hotshot_types::{
12    data::{
13        BlockNumber, EpochNumber, Leaf2, VidCommitment, VidCommitment2, VidDisperse2,
14        VidDisperseShare2, ViewChangeEvidence2, ViewNumber,
15    },
16    drb::DrbResult,
17    epoch_membership::EpochMembershipCoordinator,
18    message::{Proposal as SignedProposal, UpgradeLock},
19    simple_certificate::{
20        LightClientStateUpdateCertificateV2, QuorumCertificate2, TimeoutCertificate2,
21        check_qc_state_cert_correspondence,
22    },
23    simple_vote::{
24        CheckpointData, HasEpoch, LightClientStateUpdateVote2, QuorumData2, SimpleVote,
25        TimeoutData2, TimeoutVote2, Vote2Data,
26    },
27    stake_table::{HSStakeTable, StakeTableEntries},
28    traits::{
29        block_contents::BlockHeader,
30        node_implementation::NodeType,
31        signature_key::{
32            LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StateSignatureKey,
33        },
34    },
35    utils::{epoch_from_block_number, is_epoch_root, is_epoch_transition, is_last_block},
36    vote::{self, Certificate, HasViewNumber},
37};
38use tracing::{debug, instrument, warn};
39
40use crate::{
41    block::BlockAndHeaderRequest,
42    helpers::proposal_commitment,
43    logging::KeyPrefix,
44    message::{
45        Certificate1, Certificate2, CheckpointVote, EpochChangeMessage, Proposal,
46        ProposalFetchRequest, ProposalMessage, Validated, VidShareMessage, Vote1, Vote2,
47    },
48    outbox::Outbox,
49    state::{StateRequest, StateResponse},
50};
51
52/// Inputs to [`Consensus::apply_pre_cutover_seed`].
53///
54/// Carries everything the new protocol needs to take over from the legacy
55/// stack at a decided upgrade boundary: the highest legacy-decided leaf,
56/// the legacy undecided chain above it, the legacy `high_qc` (if any),
57/// the validated states for those leaves, and the upgrade certificate's
58/// `new_version_first_view`.
59#[derive(Clone, Debug)]
60pub struct PreCutoverSeed<T: NodeType> {
61    /// Highest leaf legacy decided. Anchors `last_decided_view`.
62    pub decided_anchor: Leaf2<T>,
63    /// Legacy undecided chain above the anchor, oldest-first.
64    pub undecided: Vec<Leaf2<T>>,
65    /// Legacy `high_qc`. `None` is allowed for cold-start tests; production
66    /// seed extraction always supplies one.
67    pub high_qc: Option<QuorumCertificate2<T>>,
68    /// Validated states keyed by view, for the anchor and every undecided leaf.
69    pub validated_states: BTreeMap<ViewNumber, Arc<T::ValidatedState>>,
70    /// `upgrade_cert.new_version_first_view`. `current_view`/`timeout_view`
71    /// are advanced to `cutover_view - 1` so the new protocol's normal
72    /// proposal/timeout machinery takes over at `cutover_view`.
73    pub cutover_view: ViewNumber,
74}
75
76#[derive(Eq, PartialEq, Debug, Clone)]
77#[allow(clippy::large_enum_variant)]
78pub enum ConsensusInput<T: NodeType> {
79    BlockBuilt {
80        view: ViewNumber,
81        epoch: EpochNumber,
82        payload: T::BlockPayload,
83        metadata: <T::BlockPayload as BlockPayload<T>>::Metadata,
84    },
85    BlockReconstructed(ViewNumber, VidCommitment2),
86    Certificate1(Certificate1<T>),
87    Certificate2(Certificate2<T>),
88    /// Atomic pair emitted by the `EpochRootVoteCollector` for epoch-root views:
89    /// a `Certificate1` and its matching `LightClientStateUpdateCertificateV2`.
90    /// Consensus never sees an epoch-root Cert1 without the matching state_cert.
91    EpochRootCertificates {
92        cert1: Certificate1<T>,
93        state_cert: LightClientStateUpdateCertificateV2<T>,
94    },
95    EpochChange(EpochChangeMessage<T>),
96    HeaderCreated(ViewNumber, Commitment<Leaf2<T>>, T::BlockHeader),
97    ProposalWithVidShare(
98        T::SignatureKey,
99        ProposalMessage<T, Validated>,
100        VidDisperseShare2<T>,
101    ),
102    StateValidated(StateResponse<T>),
103    StateValidationFailed(StateResponse<T>),
104    Timeout(ViewNumber, EpochNumber),
105    TimeoutCertificate(TimeoutCertificate2<T>),
106    TimeoutOneHonest(ViewNumber, EpochNumber),
107    VidDisperseCreated(ViewNumber, VidDisperse2<T>),
108    DrbResult(EpochNumber, DrbResult),
109}
110
111#[derive(Eq, PartialEq, Debug, Clone)]
112pub enum ConsensusOutput<T: NodeType> {
113    RequestBlockAndHeader(BlockAndHeaderRequest<T>),
114    RequestState(StateRequest<T>),
115    RequestDrbResult(EpochNumber),
116    SendProposal(SignedProposal<T, Proposal<T>>),
117    SendVidShares(Vec<VidShareMessage<T>>),
118    SendCheckpointVote(CheckpointVote<T>),
119    SendTimeoutVote(TimeoutVote2<T>, Option<Certificate1<T>>),
120    SendVote1(Vote1<T>),
121    SendVote2(Vote2<T>),
122    SendTimeoutCertificate(TimeoutCertificate2<T>, ViewNumber, EpochNumber),
123    SendCertificate1(Certificate1<T>),
124    SendEpochChange(EpochChangeMessage<T>),
125    RequestVidDisperse {
126        view: ViewNumber,
127        epoch: EpochNumber,
128        payload: T::BlockPayload,
129        metadata: <T::BlockPayload as BlockPayload<T>>::Metadata,
130    },
131    LeafDecided {
132        leaves: Vec<Leaf2<T>>,
133        /// Certificate1 (QC) that certifies the most recent (first) leaf in the chain.
134        /// Each older leaf's cert1 is available as the next leaf's `justify_qc`.
135        cert1: Certificate1<T>,
136        cert2: Option<Certificate2<T>>,
137        vid_shares: Vec<Option<SignedProposal<T, VidDisperseShare2<T>>>>,
138    },
139    LockUpdated(Certificate2<T>),
140    ViewChanged(ViewNumber, EpochNumber),
141    ProposalValidated {
142        proposal: SignedProposal<T, Proposal<T>>,
143        sender: T::SignatureKey,
144    },
145    /// Emitted when a node has reconstructed a block payload from VID shares.
146    /// Notifies downstream consumers (e.g. the query service) so they can store
147    /// the payload even if the corresponding view has already been decided
148    /// without a payload in the decide event.
149    BlockPayloadReconstructed {
150        view: ViewNumber,
151        header: T::BlockHeader,
152        payload: T::BlockPayload,
153    },
154}
155
156pub struct Consensus<T: NodeType> {
157    proposals: BTreeMap<ViewNumber, Proposal<T>>,
158    signed_proposals: BTreeMap<ViewNumber, SignedProposal<T, Proposal<T>>>,
159    proposed_views: BTreeSet<ViewNumber>,
160    vid_shares: BTreeMap<ViewNumber, VidDisperseShare2<T>>,
161    states_verified: BTreeMap<ViewNumber, Commitment<Leaf2<T>>>,
162    blocks_reconstructed: BTreeMap<ViewNumber, VidCommitment2>,
163    blocks: BTreeMap<ViewNumber, T::BlockPayload>,
164    certs: BTreeMap<ViewNumber, Certificate1<T>>,
165    certs2: BTreeMap<ViewNumber, Certificate2<T>>,
166    timeout_certs: BTreeMap<ViewNumber, TimeoutCertificate2<T>>,
167    locked_cert: Option<Certificate1<T>>,
168    headers: BTreeMap<(ViewNumber, Commitment<Leaf2<T>>), T::BlockHeader>,
169    leaves: BTreeMap<ViewNumber, Leaf2<T>>,
170    last_decided_view: ViewNumber,
171    last_decided_leaf: Leaf2<T>,
172    drb_results: BTreeMap<EpochNumber, DrbResult>,
173
174    voted_1_views: BTreeSet<ViewNumber>,
175    voted_2_views: BTreeSet<ViewNumber>,
176
177    /// Skipped by `maybe_vote_2_and_update_lock` (V1 AvidM dispersal).
178    pre_cutover_views: BTreeSet<ViewNumber>,
179
180    /// Certificates whose epoch membership was not yet available when they
181    /// arrived.  They are retried when new epoch data becomes available.
182    pending_certs1: BTreeMap<ViewNumber, Certificate1<T>>,
183    pending_certs2: BTreeMap<ViewNumber, Certificate2<T>>,
184
185    timeout_view: ViewNumber,
186    current_view: ViewNumber,
187    current_epoch: Option<EpochNumber>,
188
189    // TODO: We need a next epoch stake table to handle the transition
190    // And a way to set these stake tables, probably an event from coordinator
191    stake_table_coordinator: EpochMembershipCoordinator<T>,
192
193    public_key: T::SignatureKey,
194    private_key: <T::SignatureKey as SignatureKey>::PrivateKey,
195    state_private_key: <T::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
196    stake_table_capacity: usize,
197    // TODO: persist state_certs
198    state_certs: BTreeMap<EpochNumber, LightClientStateUpdateCertificateV2<T>>,
199    node_id: KeyPrefix,
200    upgrade_lock: UpgradeLock<T>,
201
202    garbage_collection_interval: BlockNumber,
203    pub(crate) epoch_height: BlockNumber,
204}
205
206/// Protocol flow directive.
207enum Protocol {
208    /// Stop with further protocol steps.
209    Abort,
210    /// Continue with protocol.
211    Continue,
212}
213
214/// Result of attempting to verify a certificate's epoch membership.
215enum CertVerification {
216    /// Certificate is cryptographically valid.
217    Valid,
218    /// Certificate is cryptographically invalid (bad signature).
219    Invalid,
220    /// The epoch's stake table is not yet available (catchup in progress).
221    EpochUnavailable,
222}
223
224impl<T: NodeType> Consensus<T> {
225    #[allow(clippy::too_many_arguments)]
226    pub fn new<B>(
227        membership_coordinator: EpochMembershipCoordinator<T>,
228        public_key: T::SignatureKey,
229        private_key: <T::SignatureKey as SignatureKey>::PrivateKey,
230        state_private_key: <T::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
231        stake_table_capacity: usize,
232        upgrade_lock: UpgradeLock<T>,
233        genesis_leaf: Leaf2<T>,
234        epoch_height: B,
235        garbage_collection_interval: B,
236    ) -> Self
237    where
238        B: Into<BlockNumber>,
239    {
240        Self {
241            proposals: BTreeMap::new(),
242            signed_proposals: BTreeMap::new(),
243            proposed_views: BTreeSet::new(),
244            blocks: BTreeMap::new(),
245            states_verified: BTreeMap::new(),
246            blocks_reconstructed: BTreeMap::new(),
247            certs: BTreeMap::new(),
248            certs2: BTreeMap::new(),
249            timeout_certs: BTreeMap::new(),
250            locked_cert: None,
251            leaves: BTreeMap::new(),
252            last_decided_view: ViewNumber::genesis(),
253            last_decided_leaf: genesis_leaf,
254            headers: BTreeMap::new(),
255            drb_results: BTreeMap::new(),
256            node_id: KeyPrefix::from(&public_key),
257            public_key,
258            timeout_view: ViewNumber::genesis(),
259            current_view: ViewNumber::genesis(),
260            current_epoch: None,
261            stake_table_coordinator: membership_coordinator,
262            voted_1_views: BTreeSet::new(),
263            voted_2_views: BTreeSet::new(),
264            pre_cutover_views: BTreeSet::new(),
265            pending_certs1: BTreeMap::new(),
266            pending_certs2: BTreeMap::new(),
267            private_key,
268            state_private_key,
269            stake_table_capacity,
270            state_certs: BTreeMap::new(),
271            upgrade_lock,
272            vid_shares: BTreeMap::new(),
273            garbage_collection_interval: garbage_collection_interval.into(),
274            epoch_height: epoch_height.into(),
275        }
276    }
277
278    /// Seed the genesis state so that the view-1 leader can propose without
279    /// any external bootstrap injection.
280    ///
281    /// Stores a genesis certificate and proposal at view 0, sets the locked
282    /// certificate, and sets the current epoch.  After calling this, a
283    /// subsequent `apply` that triggers `maybe_propose(view=1)` will find the
284    /// parent cert and proposal it needs.
285    pub fn seed_genesis(&mut self, genesis_cert1: Certificate1<T>, genesis_proposal: Proposal<T>) {
286        self.current_epoch = Some(genesis_proposal.epoch);
287        self.certs
288            .insert(ViewNumber::genesis(), genesis_cert1.clone());
289        self.locked_cert = Some(genesis_cert1);
290        self.proposals
291            .insert(ViewNumber::genesis(), genesis_proposal);
292    }
293
294    /// Apply a [`PreCutoverSeed`] to bridge legacy state into the new
295    /// protocol. Performs the four operations the seed describes
296    /// atomically: anchor the decided view, install the undecided
297    /// leaves so they can be decided via Cert2, register the legacy
298    /// high_qc, and advance `current_view`/`timeout_view` to the
299    /// pre-cutover frontier.
300    ///
301    /// Idempotent: calling with the same seed twice (or with an older
302    /// seed) does not regress decided/locked state.
303    pub fn apply_pre_cutover_seed(&mut self, seed: PreCutoverSeed<T>) {
304        let view = seed.decided_anchor.view_number();
305        if view > self.last_decided_view {
306            self.last_decided_view = view;
307            self.last_decided_leaf = seed.decided_anchor;
308        }
309
310        for leaf in seed.undecided {
311            let view = leaf.view_number();
312            let justify_qc = leaf.justify_qc().clone();
313            self.register_legacy_qc(&justify_qc);
314
315            let block_number = leaf.block_header().block_number();
316            let epoch = EpochNumber::new(epoch_from_block_number(block_number, *self.epoch_height));
317
318            let view_change_evidence = leaf.view_change_evidence.clone().and_then(|e| match e {
319                ViewChangeEvidence2::Timeout(tc) => Some(tc),
320                ViewChangeEvidence2::ViewSync(_) => None,
321            });
322            let proposal = Proposal {
323                block_header: leaf.block_header().clone(),
324                view_number: view,
325                epoch,
326                justify_qc,
327                next_epoch_justify_qc: None,
328                upgrade_certificate: leaf.upgrade_certificate().clone(),
329                view_change_evidence,
330                next_drb_result: leaf.next_drb_result,
331                state_cert: None,
332            };
333
334            self.leaves.insert(view, leaf);
335            self.proposals.insert(view, proposal);
336            self.pre_cutover_views.insert(view);
337
338            self.proposed_views.insert(view);
339            self.voted_1_views.insert(view);
340            self.voted_2_views.insert(view);
341        }
342
343        if let Some(high_qc) = &seed.high_qc {
344            self.register_legacy_qc(high_qc);
345        }
346
347        let cutover_view = seed.cutover_view;
348        if cutover_view == ViewNumber::genesis() {
349            return;
350        }
351        let last_pre_cutover = cutover_view - 1;
352        if last_pre_cutover > self.timeout_view {
353            self.timeout_view = last_pre_cutover;
354        }
355        if last_pre_cutover > self.current_view {
356            self.current_view = last_pre_cutover;
357        }
358    }
359
360    /// Register `justify_qc` as Cert1 for its parent view (idempotent)
361    /// and bump `locked_cert` if newer.
362    pub(crate) fn register_legacy_qc(&mut self, justify_qc: &Certificate1<T>) {
363        let parent_view = justify_qc.view_number();
364        self.certs
365            .entry(parent_view)
366            .or_insert_with(|| justify_qc.clone());
367        if self
368            .locked_cert
369            .as_ref()
370            .is_none_or(|locked| locked.view_number() < parent_view)
371        {
372            self.locked_cert = Some(justify_qc.clone());
373        }
374    }
375
376    /// Return the proposal stored at the given view, if any.
377    pub fn proposal_at(&self, view: ViewNumber) -> Option<&Proposal<T>> {
378        self.proposals.get(&view)
379    }
380
381    /// Return the Certificate1 (QC) stored at the given view, if any.
382    pub fn cert1_at(&self, view: ViewNumber) -> Option<&Certificate1<T>> {
383        self.certs.get(&view)
384    }
385
386    fn signed_vid_share(
387        &self,
388        view: ViewNumber,
389    ) -> Option<SignedProposal<T, VidDisperseShare2<T>>> {
390        self.vid_shares
391            .get(&view)?
392            .clone()
393            .to_proposal(&self.private_key)
394    }
395
396    pub fn signed_proposal_fetch_request(
397        &self,
398        view: ViewNumber,
399    ) -> Result<ProposalFetchRequest<T>, <T::SignatureKey as SignatureKey>::SignError> {
400        ProposalFetchRequest::new(view, self.public_key.clone(), &self.private_key)
401    }
402
403    /// Return the Certificate2 stored at the given view, if any.
404    pub fn cert2_at(&self, view: ViewNumber) -> Option<&Certificate2<T>> {
405        self.certs2.get(&view)
406    }
407
408    /// Return the TimeoutCertificate2 that advanced consensus to `view`, if
409    /// any. Keyed by the view it advanced *into* (i.e. one greater than the
410    /// view it certified as timed out).
411    pub fn timeout_cert_at(&self, view: ViewNumber) -> Option<&TimeoutCertificate2<T>> {
412        self.timeout_certs.get(&view)
413    }
414
415    /// Return the view of the locked certificate, if set.
416    pub fn locked_view(&self) -> Option<ViewNumber> {
417        self.locked_cert.as_ref().map(|c| c.view_number())
418    }
419
420    /// Apply consensus to the given input and collect protocol outputs.
421    #[instrument(level = "debug", skip_all, fields(node = %self.node_id, view = %input.view_number()))]
422    pub fn apply(&mut self, input: ConsensusInput<T>, outbox: &mut Outbox<ConsensusOutput<T>>) {
423        let drb_epoch = match &input {
424            ConsensusInput::DrbResult(epoch, _) => Some(*epoch),
425            _ => None,
426        };
427        // DRB results arrive asynchronously with no specific view attached.
428        // Use `current_view` so that the post-apply retries (`maybe_propose`,
429        // `maybe_vote_*`) target the view the node is actually on — in
430        // particular, a leader blocked on `self.drb_results` for an
431        // epoch-transition proposal retries that proposal here.
432        let view = if matches!(&input, ConsensusInput::DrbResult(..)) {
433            self.current_view
434        } else {
435            input.view_number()
436        };
437        let proto = match input {
438            ConsensusInput::ProposalWithVidShare(sender, proposal, vid_share) => {
439                self.handle_proposal_with_vid_share(sender, proposal, vid_share, outbox)
440            },
441            ConsensusInput::Certificate1(certificate) => {
442                self.handle_certificate1(certificate, outbox)
443            },
444            ConsensusInput::Certificate2(certificate) => {
445                self.handle_certificate2(certificate, outbox)
446            },
447            ConsensusInput::EpochRootCertificates { cert1, state_cert } => {
448                // Store state_cert first so the subsequent Cert1 handler / leader
449                // proposer has it on hand. Atomicity invariant: this pair always
450                // arrives together; Consensus never sees the Cert1 alone.
451                self.state_certs.insert(state_cert.epoch, state_cert);
452                self.handle_certificate1(cert1, outbox)
453            },
454            ConsensusInput::TimeoutCertificate(certificate) => {
455                self.handle_timeout_certificate(certificate, outbox)
456            },
457            ConsensusInput::BlockReconstructed(view, vid_commitment) => {
458                self.blocks_reconstructed.insert(view, vid_commitment);
459                Protocol::Continue
460            },
461            ConsensusInput::StateValidated(state_response) => {
462                self.states_verified
463                    .insert(state_response.view, state_response.commitment);
464                Protocol::Continue
465            },
466            ConsensusInput::HeaderCreated(view, commitment, header) => {
467                self.headers.insert((view, commitment), header);
468                Protocol::Continue
469            },
470            ConsensusInput::StateValidationFailed(state_response) => {
471                if let Some(proposal) = self.proposals.get(&state_response.view)
472                    && proposal_commitment(proposal) != state_response.commitment
473                {
474                    return;
475                }
476                self.proposals.remove(&state_response.view);
477                self.leaves.remove(&state_response.view);
478                self.vid_shares.remove(&state_response.view);
479                return;
480            },
481            ConsensusInput::Timeout(view, epoch)
482            | ConsensusInput::TimeoutOneHonest(view, epoch) => {
483                self.handle_timeout(view, epoch, outbox)
484            },
485            ConsensusInput::BlockBuilt {
486                view,
487                epoch,
488                payload,
489                metadata,
490            } => {
491                outbox.push_back(ConsensusOutput::RequestVidDisperse {
492                    view,
493                    epoch,
494                    payload: payload.clone(),
495                    metadata,
496                });
497                self.blocks.insert(view, payload);
498                Protocol::Continue
499            },
500            ConsensusInput::VidDisperseCreated(view, vid_disperse) => {
501                // As leader we already have the payload; record the commitment so
502                // voting doesn't have to wait on the (skipped) reconstruction path.
503                self.blocks_reconstructed
504                    .insert(view, vid_disperse.payload_commitment);
505                self.send_vid_shares(&view, vid_disperse, outbox);
506                Protocol::Continue
507            },
508            ConsensusInput::DrbResult(epoch, drb_result) => {
509                self.drb_results.insert(epoch, drb_result);
510                Protocol::Continue
511            },
512            ConsensusInput::EpochChange(epoch_change) => {
513                self.handle_epoch_change(epoch_change, outbox)
514            },
515        };
516
517        if matches!(proto, Protocol::Abort) {
518            debug!("aborting protocol");
519            return;
520        }
521
522        self.maybe_vote_1(view, outbox);
523        self.maybe_vote_2_and_update_lock(view, outbox);
524        self.maybe_decide(view, outbox);
525        self.maybe_propose(view, outbox);
526        // An event from the current view or the previous view can trigger a propose
527        self.maybe_propose(view + 1, outbox);
528
529        // When new epoch data arrives (DRB result or epoch change), retry
530        // any pending certificates that were deferred because their epoch
531        // membership wasn't available yet.
532        if drb_epoch.is_some() {
533            self.retry_pending_certs(outbox);
534        }
535    }
536
537    pub fn last_decided_view(&self) -> ViewNumber {
538        self.last_decided_view
539    }
540
541    pub fn last_decided_leaf(&self) -> &Leaf2<T> {
542        &self.last_decided_leaf
543    }
544
545    pub fn undecided_leaves(&self) -> impl Iterator<Item = &Leaf2<T>> {
546        self.leaves
547            .range((
548                std::ops::Bound::Excluded(self.last_decided_view),
549                std::ops::Bound::Unbounded,
550            ))
551            .map(|(_, leaf)| leaf)
552    }
553
554    pub fn current_view(&self) -> ViewNumber {
555        self.current_view
556    }
557
558    pub fn current_epoch(&self) -> Option<EpochNumber> {
559        self.current_epoch
560    }
561
562    pub fn set_view(&mut self, view: ViewNumber, epoch: EpochNumber) {
563        self.current_view = view;
564        self.current_epoch = Some(epoch);
565    }
566
567    pub fn wants_proposal_for_view(&self, view: &ViewNumber) -> bool {
568        let locked_too_new = self
569            .locked_cert
570            .as_ref()
571            .is_some_and(|l| l.view_number() > *view);
572        // A proposal may already be in `self.proposals` because we received
573        // an EpochChangeMessage for it (which carries the proposal but no
574        // vid_share and does not trigger state validation).  In that case we
575        // still want to process the real proposal message so handle_proposal
576        // runs — it populates vid_shares and emits RequestState.
577        let fully_processed =
578            self.proposals.contains_key(view) && self.vid_shares.contains_key(view);
579        !(locked_too_new || fully_processed)
580    }
581
582    pub fn signed_proposal(&self, view: &ViewNumber) -> Option<&SignedProposal<T, Proposal<T>>> {
583        self.signed_proposals.get(view)
584    }
585
586    pub fn gc(&mut self, view: ViewNumber, _epoch: EpochNumber) {
587        self.proposed_views = self.proposed_views.split_off(&view);
588        self.states_verified = self.states_verified.split_off(&view);
589        self.blocks_reconstructed = self.blocks_reconstructed.split_off(&view);
590        self.blocks = self.blocks.split_off(&view);
591        self.certs = self.certs.split_off(&view);
592        self.certs2 = self.certs2.split_off(&view);
593        self.pending_certs1 = self.pending_certs1.split_off(&view);
594        self.pending_certs2 = self.pending_certs2.split_off(&view);
595        self.timeout_certs = self.timeout_certs.split_off(&view);
596        self.headers
597            .retain(|(header_view, _), _| *header_view >= view);
598        self.leaves = self.leaves.split_off(&view);
599        self.proposals = self.proposals.split_off(&view);
600        self.signed_proposals = self.signed_proposals.split_off(&view);
601        self.vid_shares = self.vid_shares.split_off(&view);
602        self.voted_1_views = self.voted_1_views.split_off(&view);
603        self.voted_2_views = self.voted_2_views.split_off(&view);
604    }
605
606    /// Test-only: forcibly replace the proposal stored at `view`.
607    ///
608    /// Used to simulate the scenario where `self.proposals[parent_view]`
609    /// diverges from `self.certs[parent_view].data.leaf_commit` (e.g. a
610    /// byzantine leader sent two safe proposals at the same view, the cert
611    /// formed for the first but a later overwrite landed in the proposals
612    /// map).  No production code should ever do this.
613    #[cfg(test)]
614    pub(crate) fn force_set_proposal(&mut self, view: ViewNumber, proposal: Proposal<T>) {
615        self.proposals.insert(view, proposal);
616    }
617
618    #[instrument(level = "debug", skip_all)]
619    fn handle_proposal_with_vid_share(
620        &mut self,
621        sender: T::SignatureKey,
622        proposal: ProposalMessage<T, Validated>,
623        vid_share: VidDisperseShare2<T>,
624        outbox: &mut Outbox<ConsensusOutput<T>>,
625    ) -> Protocol {
626        let view = proposal.view_number();
627
628        if !self.wants_proposal_for_view(&view) {
629            warn!(%view, "proposal too old");
630            return Protocol::Abort;
631        }
632
633        let signed_proposal = proposal.proposal.clone();
634        let proposal = proposal.proposal.data;
635        let epoch = proposal.epoch;
636        // QC can be for a different epoch
637        let Some(qc_epoch) = proposal.justify_qc.epoch() else {
638            warn!(%view, "proposal has no epoch number");
639            return Protocol::Abort;
640        };
641        let block_number = proposal.block_header.block_number();
642
643        if !self.is_safe(&proposal) {
644            debug!("proposal not safe");
645            return Protocol::Abort;
646        }
647
648        // if the previous block is the last block of the epoch, this proposal is the first proposal of the new epoch
649        if is_last_block(block_number.saturating_sub(1), *self.epoch_height) {
650            let Some(cert2) = &proposal.next_epoch_justify_qc else {
651                warn!(%epoch, "no next epoch justify QC");
652                return Protocol::Abort;
653            };
654            if cert2.data.leaf_commit != proposal.justify_qc.data().leaf_commit {
655                warn!(%epoch, "next epoch justify QC does not match proposal");
656                return Protocol::Abort;
657            }
658            if !self.verify_cert(cert2, qc_epoch) {
659                warn!(%epoch, "next epoch justify QC not verified");
660                return Protocol::Abort;
661            }
662        }
663
664        let payload_size = vid_share.payload_byte_len();
665
666        // Store the proposal before the DRB check so it is not lost when
667        // the DRB is not yet available (e.g. a node catching up after a
668        // restart).  Voting is deferred to `maybe_vote_1` which verifies
669        // the DRB before casting a vote.
670        self.proposals.insert(view, proposal.clone());
671        self.signed_proposals.insert(view, signed_proposal.clone());
672        self.leaves.insert(view, proposal.clone().into());
673        self.vid_shares.insert(view, vid_share);
674
675        // Request the DRB if we don't have it yet.  A mismatching DRB is
676        // a hard failure (invalid leader), but a missing DRB is
677        // recoverable — the proposal is stored and voting will proceed
678        // once the DRB arrives.  Same epoch guard as `maybe_propose`:
679        // transitions in epoch >= 2 (`> genesis`) carry `next_drb_result`
680        // (the successor epoch's DRB lives in this leaf, so the successor
681        // epoch's catchup path unwraps `leaf.next_drb_result`).
682        if proposal.epoch > EpochNumber::genesis()
683            && is_epoch_transition(block_number, *self.epoch_height)
684        {
685            if let Some(drb) = self.drb_results.get(&(epoch + 1)) {
686                if proposal
687                    .next_drb_result
688                    .is_none_or(|proposed_drb| drb != &proposed_drb)
689                {
690                    warn!(%epoch, "DRB result does not match proposal");
691                    return Protocol::Abort;
692                }
693            } else {
694                outbox.push_back(ConsensusOutput::RequestDrbResult(epoch + 1));
695            }
696        }
697
698        outbox.push_back(ConsensusOutput::RequestState(StateRequest {
699            view: proposal.view_number(),
700            parent_view: proposal.justify_qc.view_number(),
701            epoch,
702            block: proposal.block_header.block_number().into(),
703            proposal: proposal.clone(),
704            parent_commitment: proposal.justify_qc.data().leaf_commit,
705            payload_size,
706        }));
707
708        let epoch = if is_last_block(block_number, *self.epoch_height) {
709            epoch + 1
710        } else {
711            epoch
712        };
713
714        outbox.push_back(ConsensusOutput::ProposalValidated {
715            proposal: signed_proposal,
716            sender,
717        });
718
719        if self.is_leader(view + 1, epoch) {
720            outbox.push_back(ConsensusOutput::RequestBlockAndHeader(
721                BlockAndHeaderRequest {
722                    view: view + 1,
723                    epoch,
724                    parent_proposal: proposal,
725                },
726            ));
727        }
728
729        Protocol::Continue
730    }
731
732    #[instrument(level = "debug", skip_all)]
733    fn handle_certificate1(
734        &mut self,
735        certificate: Certificate1<T>,
736        outbox: &mut Outbox<ConsensusOutput<T>>,
737    ) -> Protocol {
738        let view = certificate.view_number();
739        if self.certs.contains_key(&view) {
740            return Protocol::Continue;
741        }
742        let Some(certificate_epoch) = certificate.epoch() else {
743            warn!(%view, "certificate1 has no epoch number");
744            return Protocol::Abort;
745        };
746        // TODO: This signature check is slow (> 1ms).  We should consider
747        // if this should be done off the main thread.
748        match self.try_verify_cert(&certificate, certificate_epoch) {
749            CertVerification::Valid => {},
750            CertVerification::Invalid => {
751                warn!(%view, "certificate1 not verified");
752                return Protocol::Abort;
753            },
754            CertVerification::EpochUnavailable => {
755                debug!(%view, %certificate_epoch, "certificate1 deferred (epoch unavailable)");
756                self.pending_certs1.insert(view, certificate);
757                outbox.push_back(ConsensusOutput::RequestDrbResult(certificate_epoch));
758                return Protocol::Continue;
759            },
760        }
761        self.certs.insert(view, certificate);
762        Protocol::Continue
763    }
764
765    #[instrument(level = "debug", skip_all)]
766    fn handle_certificate2(
767        &mut self,
768        certificate: Certificate2<T>,
769        outbox: &mut Outbox<ConsensusOutput<T>>,
770    ) -> Protocol {
771        let view = certificate.view_number();
772        if self.certs2.contains_key(&view) {
773            return Protocol::Continue;
774        }
775        let Some(certificate_epoch) = certificate.epoch() else {
776            warn!(%view, "certificate2 has no epoch number");
777            return Protocol::Abort;
778        };
779        // TODO: This signature check is slow (> 1ms).  We should consider
780        // if this should be done off the main thread.
781        match self.try_verify_cert(&certificate, certificate_epoch) {
782            CertVerification::Valid => {},
783            CertVerification::Invalid => {
784                warn!(%view, "certificate2 not verified");
785                return Protocol::Abort;
786            },
787            CertVerification::EpochUnavailable => {
788                debug!(%view, %certificate_epoch, "certificate2 deferred (epoch unavailable)");
789                self.pending_certs2.insert(view, certificate);
790                outbox.push_back(ConsensusOutput::RequestDrbResult(certificate_epoch));
791                return Protocol::Continue;
792            },
793        }
794        self.certs2.insert(view, certificate);
795        Protocol::Continue
796    }
797
798    #[instrument(level = "debug", skip_all)]
799    fn handle_timeout(
800        &mut self,
801        view: ViewNumber,
802        epoch: EpochNumber,
803        outbox: &mut Outbox<ConsensusOutput<T>>,
804    ) -> Protocol {
805        self.timeout_view = max(self.timeout_view, view);
806        let data = TimeoutData2 {
807            view,
808            epoch: Some(epoch),
809        };
810        let vote = match SimpleVote::create_signed_vote(
811            data,
812            view,
813            &self.public_key,
814            &self.private_key,
815            &self.upgrade_lock,
816        ) {
817            Ok(vote) => vote,
818            Err(err) => {
819                warn!(%view, %err, "failed to create timeout vote");
820                return Protocol::Abort;
821            },
822        };
823        outbox.push_back(ConsensusOutput::SendTimeoutVote(
824            vote,
825            self.locked_cert.clone(),
826        ));
827        Protocol::Abort
828    }
829
830    #[instrument(level = "debug", skip_all)]
831    fn handle_timeout_certificate(
832        &mut self,
833        certificate: TimeoutCertificate2<T>,
834        outbox: &mut Outbox<ConsensusOutput<T>>,
835    ) -> Protocol {
836        let view = certificate.view_number() + 1;
837        if self.timeout_certs.contains_key(&view) {
838            return Protocol::Continue;
839        }
840        let Some(epoch) = certificate.epoch() else {
841            warn!(view = %certificate.view_number(), "timeout certificate has no epoch number");
842            return Protocol::Abort;
843        };
844        self.timeout_certs.insert(view, certificate.clone());
845        self.current_epoch = Some(epoch);
846        outbox.push_back(ConsensusOutput::ViewChanged(view, epoch));
847        outbox.push_back(ConsensusOutput::SendTimeoutCertificate(
848            certificate,
849            view,
850            epoch,
851        ));
852        if !self.is_leader(view, epoch) {
853            debug!(%epoch, "not leader");
854            return Protocol::Abort;
855        }
856
857        // If we are the leader of the next view, try to get a block to propose
858        // after forming the TC
859        let Some(locked_view) = self.locked_cert.as_ref().map(|cert| cert.view_number()) else {
860            debug!("locked certificate not available");
861            return Protocol::Abort;
862        };
863        let Some(proposal) = self.proposals.get(&locked_view) else {
864            debug!(%locked_view, "proposal not available");
865            return Protocol::Abort;
866        };
867        // Note: We don't handle epoch change on timeout certificate, because
868        // we can't change epoch after a timeout
869        outbox.push_back(ConsensusOutput::RequestBlockAndHeader(
870            BlockAndHeaderRequest {
871                view,
872                epoch,
873                parent_proposal: proposal.clone(),
874            },
875        ));
876        Protocol::Continue
877    }
878
879    #[instrument(level = "debug", skip_all)]
880    fn handle_epoch_change(
881        &mut self,
882        epoch_change: EpochChangeMessage<T>,
883        outbox: &mut Outbox<ConsensusOutput<T>>,
884    ) -> Protocol {
885        let EpochChangeMessage {
886            cert1,
887            cert2,
888            proposal,
889        } = epoch_change;
890        // Check if this epoch change is new
891        if self
892            .locked_cert
893            .as_ref()
894            .is_some_and(|locked_cert| locked_cert.view_number() > cert1.view_number())
895        {
896            warn!("locked certificate is newer than epoch change certificate1");
897            return Protocol::Abort;
898        }
899        // Check if the certificates match
900        if cert1.view_number() != cert2.view_number()
901            || cert1.epoch() != cert2.epoch()
902            || cert1.data.leaf_commit != cert2.data.leaf_commit
903        {
904            warn!("epoch change certificates do not match");
905            return Protocol::Abort;
906        }
907        // check if it's the last block for the correct epoch
908        if !is_last_block(cert2.data.block_number, *self.epoch_height) {
909            warn!("epoch change certificate2 is not the last block of the epoch");
910            return Protocol::Abort;
911        }
912        if cert2.data.block_number / *self.epoch_height != *cert2.data.epoch {
913            warn!("epoch change certificate2 is not for the correct epoch");
914            return Protocol::Abort;
915        }
916        // Check if the proposal matches the certificate1
917        if proposal_commitment(&proposal) != cert1.data.leaf_commit {
918            warn!("epoch change proposal commitment does not match certificate1 leaf commitment");
919            return Protocol::Abort;
920        }
921        // Verify the certificates
922        let Some(cert1_epoch) = cert1.epoch() else {
923            warn!("epoch change certificate1 has no epoch number");
924            return Protocol::Abort;
925        };
926        if !self.verify_cert(&cert1, cert1_epoch) {
927            warn!("epoch change certificate not verified");
928            return Protocol::Abort;
929        }
930        if !self.verify_cert(&cert2, cert2.data.epoch) {
931            warn!("epoch change certificate not verified");
932            return Protocol::Abort;
933        }
934        let next_view = cert2.view_number() + 1;
935        let next_epoch = cert2.data.epoch + 1;
936        // Change view to the first view of the next epoch
937        self.current_epoch = Some(next_epoch);
938        outbox.push_back(ConsensusOutput::ViewChanged(next_view, next_epoch));
939
940        // Request block and header if we're the first leader of the next epoch
941        if self.is_leader(next_view, next_epoch) {
942            outbox.push_back(ConsensusOutput::RequestBlockAndHeader(
943                BlockAndHeaderRequest {
944                    view: next_view,
945                    epoch: next_epoch,
946                    parent_proposal: proposal.clone(),
947                },
948            ));
949        }
950
951        self.proposals.insert(cert2.view_number(), proposal);
952        self.certs.insert(cert1.view_number(), cert1);
953        self.certs2.insert(cert2.view_number(), cert2);
954        Protocol::Continue
955    }
956
957    // The leader's own share is also unicast back to itself (cliquenet
958    // self-loopback): it is the only way the leader's `handle_proposal_and_vid_share`
959    // path runs for its own proposal, populating `proposals`, `leaves`,
960    // `states_verified`, and seeding the VID reconstructor's metadata.
961    #[instrument(level = "debug", skip_all)]
962    fn send_vid_shares(
963        &self,
964        view: &ViewNumber,
965        vid_disperse: VidDisperse2<T>,
966        outbox: &mut Outbox<ConsensusOutput<T>>,
967    ) {
968        let vid_messages = vid_disperse
969            .to_shares()
970            .into_iter()
971            .filter_map(|share| {
972                let Some(proposal) = share.to_proposal(&self.private_key) else {
973                    warn!(%view, "failed to sign VID share proposal");
974                    return None;
975                };
976                Some(proposal)
977            })
978            .collect::<Vec<_>>();
979        outbox.push_back(ConsensusOutput::SendVidShares(vid_messages));
980    }
981
982    #[instrument(level = "debug", skip_all)]
983    fn maybe_propose(&mut self, view: ViewNumber, outbox: &mut Outbox<ConsensusOutput<T>>) {
984        if self.proposed_views.contains(&view) {
985            return;
986        }
987
988        let view_change_evidence = self.timeout_certs.get(&view).cloned();
989        let parent_cert = if view_change_evidence.is_some() {
990            let Some(cert) = &self.locked_cert else {
991                debug!("no locked qc");
992                return;
993            };
994            cert
995        } else {
996            let Some(cert) = self.certs.get(&ViewNumber::from(view.saturating_sub(1))) else {
997                debug!("no parent certificate");
998                return;
999            };
1000            cert
1001        };
1002        let parent_view = parent_cert.view_number();
1003        let Some(proposal) = self.proposals.get(&parent_view) else {
1004            debug!(parent = %parent_view, "no proposal for parent view");
1005            return;
1006        };
1007
1008        // Key the header lookup by the cert's `leaf_commit`, NOT
1009        // `proposal_commitment(proposal)`.  `self.proposals` is keyed by view
1010        // and can be overwritten by any later-arriving safe proposal at the
1011        // same view, so it may not match the cert.  Using the cert's
1012        // leaf_commit pins the lookup to the leaf the QC actually certified
1013        // and prevents the leader from grabbing a header that was built for a
1014        // different (same-view) parent and therefore carries a wrong
1015        // block_number.
1016        //
1017        // Genesis is the one special case: the synthetic genesis proposal
1018        // carries a non-null justify_qc, so the leaf derived from it has a
1019        // different commitment than the anchor leaf the genesis cert was
1020        // built over.  For view 1 we fall back to the proposal's commit.
1021        let parent_commitment = if parent_view == ViewNumber::genesis() {
1022            proposal_commitment(proposal)
1023        } else if proposal_commitment(proposal) != parent_cert.data.leaf_commit {
1024            warn!(
1025                %parent_view,
1026                "stored proposal at parent_view does not match parent cert's leaf_commit; \
1027                 refusing to propose with mismatched parent"
1028            );
1029            return;
1030        } else {
1031            parent_cert.data.leaf_commit
1032        };
1033        let Some(header) = self.headers.get(&(view, parent_commitment)) else {
1034            debug!("no block header");
1035            return;
1036        };
1037        if !self.blocks.contains_key(&view) {
1038            debug!("no block");
1039            return;
1040        };
1041
1042        let first_proposal_of_epoch =
1043            is_last_block(header.block_number().saturating_sub(1), *self.epoch_height);
1044        let proposal_epoch = if first_proposal_of_epoch {
1045            proposal.epoch + 1
1046        } else {
1047            proposal.epoch
1048        };
1049        if !self.is_leader(view, proposal_epoch) {
1050            warn!(epoch = %proposal_epoch, "not the leader for this view, we should not have a header");
1051            return;
1052        }
1053
1054        // Epoch 1 is the genesis epoch and has no successor that needs a
1055        // DRB from a transition leaf — `set_first_epoch` pre-loads DRBs for
1056        // `first_epoch` and `first_epoch + 1`.  Every epoch beyond that
1057        // communicates its successor's DRB via `next_drb_result` on each
1058        // leaf in its transition zone; the successor epoch's catchup
1059        // unwraps that field, so leaving it `None` here would panic
1060        // peers that fetch this leaf.
1061        let next_drb_result = if proposal.epoch > EpochNumber::genesis()
1062            && is_epoch_transition(header.block_number(), *self.epoch_height)
1063        {
1064            let Some(drb) = self.drb_results.get(&EpochNumber::new(*proposal.epoch + 1)) else {
1065                debug!(%proposal.epoch, "no DRB result for epoch");
1066                // Keep retrying — the epoch manager dedups pending requests,
1067                // but if an earlier catchup failed (e.g. Leaf2Fetcher
1068                // timeout under CPU load) nothing else kicks the request.
1069                outbox.push_back(ConsensusOutput::RequestDrbResult(proposal.epoch + 1));
1070                return;
1071            };
1072            Some(*drb)
1073        } else {
1074            None
1075        };
1076        let next_epoch_justify_qc = if first_proposal_of_epoch {
1077            let Some(next_epoch_justify_qc) = self.certs2.get(&parent_view) else {
1078                debug!("no next epoch justify QC");
1079                return;
1080            };
1081            Some(next_epoch_justify_qc.clone())
1082        } else {
1083            None
1084        };
1085
1086        // If the parent QC is for an epoch-root block, attach the state_cert.
1087        // By the atomicity invariant (enforced by `EpochRootVoteCollector`),
1088        // if we hold the epoch-root Cert1 then `state_certs` also holds the
1089        // matching cert.
1090        let parent_block_number = parent_cert.data.block_number.unwrap_or(0);
1091        let state_cert = if is_epoch_root(parent_block_number, *self.epoch_height) {
1092            let Some(parent_epoch) = parent_cert.data.epoch() else {
1093                warn!("epoch-root parent QC has no epoch; cannot propose");
1094                return;
1095            };
1096            let Some(sc) = self.state_certs.get(&parent_epoch).cloned() else {
1097                warn!(
1098                    %view,
1099                    "epoch-root parent QC without state_cert — atomicity invariant broken; skipping propose"
1100                );
1101                return;
1102            };
1103            if !check_qc_state_cert_correspondence(parent_cert, &sc, *self.epoch_height) {
1104                warn!(%view, "state_cert does not correspond to parent QC; skipping propose");
1105                return;
1106            }
1107            Some(sc)
1108        } else {
1109            None
1110        };
1111
1112        let proposal = Proposal::<T> {
1113            block_header: header.clone(),
1114            view_number: view,
1115            epoch: proposal_epoch,
1116            justify_qc: parent_cert.clone(),
1117            next_epoch_justify_qc,
1118            upgrade_certificate: None,
1119            view_change_evidence,
1120            next_drb_result,
1121            state_cert,
1122        };
1123
1124        // Sign the proposal
1125        let proposed_leaf: Leaf2<T> = proposal.clone().into();
1126        let signature =
1127            match T::SignatureKey::sign(&self.private_key, proposed_leaf.commit().as_ref()) {
1128                Ok(sig) => sig,
1129                Err(err) => {
1130                    warn!(%view, %err, "failed to sign proposal");
1131                    return;
1132                },
1133            };
1134
1135        let message = SignedProposal {
1136            data: proposal,
1137            signature,
1138            _pd: PhantomData,
1139        };
1140
1141        self.proposed_views.insert(view);
1142        outbox.push_back(ConsensusOutput::SendProposal(message));
1143    }
1144
1145    #[instrument(level = "debug", skip_all)]
1146    fn maybe_decide(&mut self, view: ViewNumber, outbox: &mut Outbox<ConsensusOutput<T>>) {
1147        if view <= self.last_decided_view {
1148            return;
1149        }
1150        let Some(cert2) = self.certs2.get(&view) else {
1151            debug!("cert2 not available");
1152            return;
1153        };
1154        let Some(proposal) = self.proposals.get(&view) else {
1155            debug!("proposal not available");
1156            return;
1157        };
1158        let proposal_commit = proposal_commitment(proposal);
1159        if cert2.data.leaf_commit != proposal_commit {
1160            debug!("cert2 commitment does not match proposal commitment");
1161            return;
1162        }
1163        // Handle Epoch Change by broadcasting the epoch change message if we have
1164        // all the data we need.
1165        if is_last_block(proposal.block_header.block_number(), *self.epoch_height)
1166            && let Some(cert1) = self.certs.get(&view)
1167            && cert1.data.leaf_commit == proposal_commit
1168        {
1169            let epoch_change = EpochChangeMessage {
1170                cert1: cert1.clone(),
1171                cert2: cert2.clone(),
1172                proposal: proposal.clone(),
1173            };
1174            outbox.push_back(ConsensusOutput::SendEpochChange(epoch_change));
1175        }
1176        // we have a second certificate, and matching proposal, it is decided.
1177        let mut leaf: Leaf2<T> = proposal.clone().into();
1178        if let Some(payload) = self.blocks.get(&view) {
1179            leaf.fill_block_payload_unchecked(payload.clone());
1180        }
1181        let new_decided_view = max(self.last_decided_view, leaf.view_number());
1182        let last_decided_leaf = leaf.clone();
1183        let mut gc = None;
1184        if leaf.block_header().block_number() % *self.garbage_collection_interval == 0 {
1185            gc = Some((leaf.view_number(), leaf.justify_qc().epoch()));
1186        }
1187        let mut decided = vec![leaf];
1188        let mut vid_shares = vec![self.signed_vid_share(view)];
1189
1190        let mut parent_view = proposal.justify_qc.view_number();
1191        let mut parent_commit = proposal.justify_qc.data.leaf_commit;
1192
1193        while parent_view > self.last_decided_view
1194            && let Some(proposal) = self.proposals.get(&parent_view)
1195        {
1196            let proposal_commit = proposal_commitment(proposal);
1197            if proposal_commit != parent_commit {
1198                break;
1199            }
1200            let mut leaf: Leaf2<T> = proposal.clone().into();
1201            if let Some(payload) = self.blocks.get(&parent_view) {
1202                leaf.fill_block_payload_unchecked(payload.clone());
1203            }
1204            if gc.is_none()
1205                && leaf.block_header().block_number() % *self.garbage_collection_interval == 0
1206            {
1207                gc = Some((leaf.view_number(), leaf.justify_qc().epoch()));
1208            }
1209            vid_shares.push(self.signed_vid_share(parent_view));
1210            decided.push(leaf);
1211            parent_view = proposal.justify_qc.view_number();
1212            parent_commit = proposal.justify_qc.data.leaf_commit;
1213        }
1214        self.last_decided_view = new_decided_view;
1215        self.last_decided_leaf = last_decided_leaf;
1216        let Some(cert1) = self.certs.get(&view).cloned() else {
1217            debug!(%view, "cert1 missing");
1218            return;
1219        };
1220        outbox.push_back(ConsensusOutput::LeafDecided {
1221            leaves: decided,
1222            cert1,
1223            cert2: Some(cert2.clone()),
1224            vid_shares,
1225        });
1226        if let Some(gc) = gc {
1227            let gc_data = CheckpointData {
1228                view: gc.0,
1229                epoch: gc.1.unwrap_or_default(),
1230            };
1231            let vote = match SimpleVote::create_signed_vote(
1232                gc_data,
1233                view,
1234                &self.public_key,
1235                &self.private_key,
1236                &self.upgrade_lock,
1237            ) {
1238                Ok(vote) => vote,
1239                Err(err) => {
1240                    warn!(%view, %err, "failed to create signed checkpoint vote");
1241                    return;
1242                },
1243            };
1244            outbox.push_back(ConsensusOutput::SendCheckpointVote(vote));
1245        }
1246    }
1247
1248    /// Build a `LightClientStateUpdateVote2` for an epoch-root leaf.
1249    ///
1250    /// Computes the `LightClientState` from the header, fetches the next-epoch
1251    /// stake-table commitment, and signs both the LCV2 (pre-upgrade, for
1252    /// backward compatibility with existing relay infrastructure) and LCV3
1253    /// (current) Schnorr signatures.
1254    fn build_state_vote(
1255        &self,
1256        proposal: &Proposal<T>,
1257    ) -> anyhow::Result<LightClientStateUpdateVote2<T>> {
1258        let view_number = proposal.view_number;
1259        let light_client_state = proposal
1260            .block_header
1261            .get_light_client_state(view_number)
1262            .map_err(|e| anyhow::anyhow!("failed to generate light client state: {e}"))?;
1263        let auth_root = proposal
1264            .block_header
1265            .auth_root()
1266            .map_err(|e| anyhow::anyhow!("failed to fetch auth root: {e}"))?;
1267        let membership = self
1268            .stake_table_coordinator
1269            .membership_for_epoch(Some(proposal.epoch))
1270            .map_err(|e| anyhow::anyhow!("membership lookup failed: {e}"))?;
1271        let next_stake_table = membership
1272            .next_epoch_stake_table()
1273            .map_err(|e| anyhow::anyhow!("next-epoch stake table lookup failed: {e}"))?;
1274        let next_stake_table_state = HSStakeTable::from_iter(next_stake_table.stake_table())
1275            .commitment(self.stake_table_capacity)
1276            .map_err(|e| anyhow::anyhow!("failed to compute stake table commitment: {e}"))?;
1277        let v2_signature = <T::StateSignatureKey as LCV2StateSignatureKey>::sign_state(
1278            &self.state_private_key,
1279            &light_client_state,
1280            &next_stake_table_state,
1281        )
1282        .map_err(|e| anyhow::anyhow!("failed to sign LCV2 state: {e}"))?;
1283        let signed_state_digest =
1284            derive_signed_state_digest(&light_client_state, &next_stake_table_state, &auth_root);
1285        let signature = <T::StateSignatureKey as LCV3StateSignatureKey>::sign_state(
1286            &self.state_private_key,
1287            signed_state_digest,
1288        )
1289        .map_err(|e| anyhow::anyhow!("failed to sign LCV3 state: {e}"))?;
1290        Ok(LightClientStateUpdateVote2 {
1291            epoch: proposal.epoch,
1292            light_client_state,
1293            next_stake_table_state,
1294            signature,
1295            v2_signature,
1296            auth_root,
1297            signed_state_digest,
1298        })
1299    }
1300
1301    #[instrument(level = "debug", skip_all)]
1302    fn maybe_vote_1(&mut self, view: ViewNumber, outbox: &mut Outbox<ConsensusOutput<T>>) {
1303        if view <= self.timeout_view {
1304            return;
1305        }
1306        if self.voted_1_views.contains(&view) {
1307            return;
1308        }
1309
1310        let Some(state_commitment) = self.states_verified.get(&view) else {
1311            debug!("state commitment not available");
1312            return;
1313        };
1314        let Some(proposal) = self.proposals.get(&view) else {
1315            debug!("proposal not available");
1316            return;
1317        };
1318        let Some(vid_share) = self.vid_shares.get(&view) else {
1319            debug!("vid share not available");
1320            return;
1321        };
1322
1323        // Don't vote for epoch-transition proposals until we can verify
1324        // the attached DRB result.  Same guard as `maybe_propose`:
1325        // transitions in epoch >= 2 must carry `next_drb_result`.
1326        let block_number = proposal.block_header.block_number();
1327        if proposal.epoch > EpochNumber::genesis()
1328            && is_epoch_transition(block_number, *self.epoch_height)
1329        {
1330            let Some(drb) = self.drb_results.get(&(proposal.epoch + 1)) else {
1331                debug!("DRB result not yet available, deferring vote");
1332                return;
1333            };
1334            if proposal
1335                .next_drb_result
1336                .is_none_or(|proposed_drb| drb != &proposed_drb)
1337            {
1338                warn!("DRB result does not match proposal, refusing to vote");
1339                return;
1340            }
1341        }
1342
1343        if !self.staked_in_epoch(proposal.epoch) {
1344            return;
1345        }
1346
1347        // Verify parent chain unless justify_qc is the genesis QC
1348        let parent_view = proposal.justify_qc.view_number();
1349
1350        // Pre-cutover parents are V1 AvidM, not V2-reconstructable.
1351        let parent_is_pre_cutover = self.pre_cutover_views.contains(&parent_view);
1352        if parent_view != ViewNumber::genesis()
1353            && !is_last_block(
1354                proposal.block_header.block_number().saturating_sub(1),
1355                *self.epoch_height,
1356            )
1357        {
1358            let Some(prev_proposal) = self.proposals.get(&parent_view) else {
1359                debug!(%parent_view, "proposal not available");
1360                return;
1361            };
1362            if !parent_is_pre_cutover {
1363                // Verify we have the block for the QC on this commitment
1364                let Some(block_commitment) = self.blocks_reconstructed.get(&parent_view) else {
1365                    debug!(%parent_view, "block commitment not available");
1366                    return;
1367                };
1368                let VidCommitment::V2(prev_block_commitment) =
1369                    prev_proposal.block_header.payload_commitment()
1370                else {
1371                    warn! {
1372                        %view,
1373                        %parent_view,
1374                        "prev. proposal payload commitment is not a V2 VID commitment"
1375                    }
1376                    return;
1377                };
1378                if block_commitment != &prev_block_commitment {
1379                    debug!(%parent_view, "parent block commitment does not match prev. block commitment");
1380                    return;
1381                }
1382            }
1383
1384            if proposal.justify_qc.data().leaf_commit != proposal_commitment(prev_proposal) {
1385                debug!(%parent_view, "justify qc commitment does not match proposal commitment");
1386                return;
1387            }
1388        }
1389
1390        let proposal_commit = proposal_commitment(proposal);
1391
1392        // Verify the state commitment matches the proposal
1393        if state_commitment != &proposal_commit {
1394            debug!("state commitment does not match proposal commitment");
1395            return;
1396        }
1397
1398        let inner_vote = match SimpleVote::create_signed_vote(
1399            QuorumData2 {
1400                leaf_commit: proposal_commit,
1401                epoch: proposal.epoch(),
1402                block_number: Some(proposal.block_header.block_number()),
1403            },
1404            view,
1405            &self.public_key,
1406            &self.private_key,
1407            &self.upgrade_lock,
1408        ) {
1409            Ok(vote) => vote,
1410            Err(err) => {
1411                warn!(%view, %err, "failed to created signed vote for proposal");
1412                return;
1413            },
1414        };
1415
1416        let state_vote = if is_epoch_root(proposal.block_header.block_number(), *self.epoch_height)
1417        {
1418            match self.build_state_vote(proposal) {
1419                Ok(sv) => Some(sv),
1420                Err(err) => {
1421                    warn!(%view, %err, "failed to build state vote for epoch-root leaf; skipping vote1");
1422                    return;
1423                },
1424            }
1425        } else {
1426            None
1427        };
1428
1429        let vote = Vote1 {
1430            vote: inner_vote,
1431            vid_share: vid_share.clone(),
1432            state_vote,
1433        };
1434        outbox.push_back(ConsensusOutput::SendVote1(vote));
1435        self.voted_1_views.insert(view);
1436    }
1437
1438    #[instrument(level = "debug", skip_all)]
1439    fn maybe_vote_2_and_update_lock(
1440        &mut self,
1441        view: ViewNumber,
1442        outbox: &mut Outbox<ConsensusOutput<T>>,
1443    ) {
1444        // V1 AvidM dispersal cannot be re-voted under V2.
1445        if self.pre_cutover_views.contains(&view) {
1446            return;
1447        }
1448        if self.voted_2_views.contains(&view) {
1449            return;
1450        }
1451        if !self.blocks_reconstructed.contains_key(&view) {
1452            debug!("reconstructed block commitment not available");
1453            return;
1454        }
1455        let Some(cert1) = self.certs.get(&view) else {
1456            debug!("cert1 not available");
1457            return;
1458        };
1459        let Some(proposal) = self.proposals.get(&view) else {
1460            debug!("proposal not available");
1461            return;
1462        };
1463        let proposal_epoch = proposal.epoch;
1464
1465        let proposal_commit = proposal_commitment(proposal);
1466
1467        // The certificate must match the proposal
1468        if cert1.data.leaf_commit != proposal_commit {
1469            warn!(%view, "cert1 commitment does not match proposal commitment");
1470            return;
1471        }
1472        let reconstructed_block_commitment =
1473            self.blocks_reconstructed.get(&view).expect("checked above");
1474        let VidCommitment::V2(proposal_block_commitment) =
1475            proposal.block_header.payload_commitment()
1476        else {
1477            warn!(%view, "proposal payload commitment is not a V2 VID commitment");
1478            return;
1479        };
1480        if &proposal_block_commitment != reconstructed_block_commitment {
1481            warn!(%view, "proposal commitment does not match reconstructed block commitment");
1482            return;
1483        }
1484
1485        // We have a valid certificate, proposal, and reconstructed block
1486        // We can now update the lock, change view and vote
1487        if self
1488            .locked_cert
1489            .as_mut()
1490            .is_none_or(|locked_cert| locked_cert.view_number() < cert1.view_number())
1491        {
1492            self.locked_cert = Some(cert1.clone());
1493            self.current_epoch = Some(proposal_epoch);
1494            outbox.push_back(ConsensusOutput::ViewChanged(view + 1, proposal_epoch));
1495            outbox.push_back(ConsensusOutput::SendCertificate1(cert1.clone()));
1496        }
1497
1498        if !self.staked_in_epoch(proposal_epoch) {
1499            return;
1500        }
1501
1502        let vote = match SimpleVote::create_signed_vote(
1503            Vote2Data {
1504                leaf_commit: proposal_commit,
1505                epoch: proposal_epoch,
1506                block_number: proposal.block_header.block_number(),
1507            },
1508            view,
1509            &self.public_key,
1510            &self.private_key,
1511            &self.upgrade_lock,
1512        ) {
1513            Ok(vote) => vote,
1514            Err(err) => {
1515                warn!(%view, %err, "failed to created signed vote2");
1516                return;
1517            },
1518        };
1519        outbox.push_back(ConsensusOutput::SendVote2(vote));
1520        self.voted_2_views.insert(view);
1521    }
1522
1523    #[instrument(level = "trace", skip_all)]
1524    fn is_safe(&self, proposal: &Proposal<T>) -> bool {
1525        let Some(locked_cert) = self.locked_cert.as_ref() else {
1526            // Locked certificate is not set which means it is at genesis
1527            debug!("at genesis");
1528            return true;
1529        };
1530
1531        // cert1 + block arrived before proposal
1532        if locked_cert.view_number() == proposal.view_number() {
1533            return locked_cert.data.leaf_commit == proposal_commitment(proposal);
1534        }
1535
1536        let liveness_check = proposal.justify_qc.view_number() > locked_cert.view_number();
1537        let parent_commit = match proposal.justify_qc.data_commitment(&self.upgrade_lock) {
1538            Ok(c) => c,
1539            Err(err) => {
1540                warn!(%err, "failed to compute justify qc data commitment");
1541                return false;
1542            },
1543        };
1544        let locked_commit = match locked_cert.data_commitment(&self.upgrade_lock) {
1545            Ok(c) => c,
1546            Err(err) => {
1547                warn!(%err, "failed to compute locked certificate data");
1548                return false;
1549            },
1550        };
1551
1552        let safety_check = parent_commit == locked_commit;
1553
1554        safety_check || liveness_check
1555    }
1556
1557    #[instrument(level = "trace", skip_all)]
1558    fn verify_cert<A, C>(&self, cert: &C, epoch: EpochNumber) -> bool
1559    where
1560        C: vote::Certificate<T, A>,
1561    {
1562        match self
1563            .stake_table_coordinator
1564            .membership_for_epoch(Some(epoch))
1565        {
1566            Ok(stake_table) => {
1567                let entries = StakeTableEntries::from_iter(stake_table.stake_table()).0;
1568                let threshold = stake_table.success_threshold();
1569                match cert.is_valid_cert(&entries, threshold, &self.upgrade_lock) {
1570                    Ok(()) => true,
1571                    Err(err) => {
1572                        warn!(%epoch, %err, "invalid threshold signature");
1573                        false
1574                    },
1575                }
1576            },
1577            Err(err) => {
1578                warn!(%epoch, %err, "failed to get stake table");
1579                false
1580            },
1581        }
1582    }
1583
1584    /// Try to verify a certificate, distinguishing between "epoch not available"
1585    /// and "cryptographically invalid".
1586    #[instrument(level = "trace", skip_all)]
1587    fn try_verify_cert<A, C>(&self, cert: &C, epoch: EpochNumber) -> CertVerification
1588    where
1589        C: vote::Certificate<T, A>,
1590    {
1591        match self
1592            .stake_table_coordinator
1593            .membership_for_epoch(Some(epoch))
1594        {
1595            Ok(stake_table) => {
1596                let entries = StakeTableEntries::from_iter(stake_table.stake_table()).0;
1597                let threshold = stake_table.success_threshold();
1598                match cert.is_valid_cert(&entries, threshold, &self.upgrade_lock) {
1599                    Ok(()) => CertVerification::Valid,
1600                    Err(err) => {
1601                        warn!(%epoch, %err, "invalid threshold signature");
1602                        CertVerification::Invalid
1603                    },
1604                }
1605            },
1606            Err(_) => CertVerification::EpochUnavailable,
1607        }
1608    }
1609
1610    /// Retry verification of pending certificates whose epoch may now be available.
1611    fn retry_pending_certs(&mut self, outbox: &mut Outbox<ConsensusOutput<T>>) {
1612        // Retry pending cert1s.
1613        let pending = std::mem::take(&mut self.pending_certs1);
1614        for (view, cert) in pending {
1615            self.handle_certificate1(cert.clone(), outbox);
1616            self.maybe_vote_2_and_update_lock(view, outbox);
1617            self.maybe_propose(view, outbox);
1618        }
1619
1620        // Retry pending cert2s.
1621        let pending = std::mem::take(&mut self.pending_certs2);
1622        for (view, cert) in pending {
1623            self.handle_certificate2(cert.clone(), outbox);
1624            self.maybe_decide(view, outbox);
1625            self.maybe_propose(view, outbox);
1626        }
1627    }
1628
1629    #[instrument(level = "trace", skip_all)]
1630    fn is_leader(&self, view: ViewNumber, epoch: EpochNumber) -> bool {
1631        match self
1632            .stake_table_coordinator
1633            .membership_for_epoch(Some(epoch))
1634        {
1635            Ok(stake_table) => match stake_table.leader(view) {
1636                Ok(leader) => leader == self.public_key,
1637                Err(err) => {
1638                    warn!(%view, %epoch, %err, "failed to get leader from stake table");
1639                    false
1640                },
1641            },
1642            Err(err) => {
1643                warn!(%view, %epoch, %err, "failed to get stake table");
1644                false
1645            },
1646        }
1647    }
1648
1649    fn staked_in_epoch(&self, epoch: EpochNumber) -> bool {
1650        match self
1651            .stake_table_coordinator
1652            .membership_for_epoch(Some(epoch))
1653        {
1654            Ok(stake_table) => stake_table.has_stake(&self.public_key),
1655            Err(err) => {
1656                warn!(%epoch, %err, "failed to get stake table");
1657                false
1658            },
1659        }
1660    }
1661}
1662
1663impl<T: NodeType> ConsensusInput<T> {
1664    fn view_number(&self) -> ViewNumber {
1665        match self {
1666            ConsensusInput::BlockBuilt { view, .. } => *view,
1667            ConsensusInput::BlockReconstructed(view, _) => *view,
1668            ConsensusInput::Certificate1(cert) => cert.view_number(),
1669            ConsensusInput::Certificate2(cert) => cert.view_number(),
1670            ConsensusInput::EpochRootCertificates { cert1, .. } => cert1.view_number(),
1671            ConsensusInput::HeaderCreated(view, ..) => *view,
1672            ConsensusInput::ProposalWithVidShare(_, prop, _) => prop.view_number(),
1673            ConsensusInput::StateValidated(response) => response.view,
1674            ConsensusInput::StateValidationFailed(request) => request.view,
1675            ConsensusInput::Timeout(view, _) => *view,
1676            ConsensusInput::TimeoutOneHonest(view, _) => *view,
1677            ConsensusInput::TimeoutCertificate(cert) => {
1678                // Add one because we are moving to the next view so all event
1679                // processing is for the next view
1680                cert.view_number() + 1
1681            },
1682            ConsensusInput::VidDisperseCreated(view, _) => *view,
1683            // DRB results arrive asynchronously and don't belong to any
1684            // particular view; `apply` handles routing by using
1685            // `current_view` for this variant.
1686            ConsensusInput::DrbResult(..) => ViewNumber::genesis(),
1687            ConsensusInput::EpochChange(epoch_change) => epoch_change.cert1.view_number(),
1688        }
1689    }
1690}