hotshot_task_impls/
helpers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{HashMap, HashSet},
9    sync::Arc,
10    time::Instant,
11};
12
13use alloy::{
14    primitives::{FixedBytes, U256},
15    sol_types::SolValue,
16};
17use ark_ff::PrimeField;
18use async_broadcast::{Receiver, SendError, Sender};
19use async_lock::RwLock;
20use committable::{Commitment, Committable};
21use hotshot_contract_adapter::sol_types::{LightClientStateSol, StakeTableStateSol};
22use hotshot_task::dependency::{Dependency, EventDependency};
23use hotshot_types::{
24    consensus::OuterConsensus,
25    data::{
26        EpochNumber, Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2,
27        ViewNumber,
28    },
29    drb::DrbResult,
30    epoch_membership::EpochMembershipCoordinator,
31    event::{Event, EventType, LeafInfo},
32    light_client::{CircuitField, LightClientState, StakeTableState},
33    message::{Proposal, UpgradeLock},
34    request_response::ProposalRequestPayload,
35    simple_certificate::{
36        CertificatePair, DaCertificate2, LightClientStateUpdateCertificateV2,
37        NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
38    },
39    simple_vote::HasEpoch,
40    stake_table::StakeTableEntries,
41    traits::{
42        BlockPayload, ValidatedState,
43        block_contents::BlockHeader,
44        election::Membership,
45        node_implementation::{NodeImplementation, NodeType},
46        signature_key::{
47            LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StakeTableEntryType,
48        },
49        storage::Storage,
50    },
51    utils::{
52        Terminator, View, ViewInner, epoch_from_block_number, is_epoch_root, is_epoch_transition,
53        is_transition_block, option_epoch_from_block_number,
54    },
55    vote::{Certificate, HasViewNumber},
56};
57use hotshot_utils::anytrace::*;
58use time::OffsetDateTime;
59use tokio::time::timeout;
60use tracing::instrument;
61use versions::EPOCH_VERSION;
62
63use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
64
65/// Trigger a request to the network for a proposal for a view and wait for the response or timeout.
66#[instrument(skip_all)]
67#[allow(clippy::too_many_arguments)]
68pub(crate) async fn fetch_proposal<TYPES: NodeType>(
69    qc: &QuorumCertificate2<TYPES>,
70    event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
71    event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
72    membership_coordinator: EpochMembershipCoordinator<TYPES>,
73    consensus: OuterConsensus<TYPES>,
74    sender_public_key: TYPES::SignatureKey,
75    sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
76    upgrade_lock: &UpgradeLock<TYPES>,
77    epoch_height: u64,
78) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
79    let view_number = qc.view_number();
80    let leaf_commit = qc.data.leaf_commit;
81    // We need to be able to sign this request before submitting it to the network. Compute the
82    // payload first.
83    let signed_proposal_request = ProposalRequestPayload {
84        view_number,
85        key: sender_public_key,
86    };
87
88    // Finally, compute the signature for the payload.
89    let signature = TYPES::SignatureKey::sign(
90        &sender_private_key,
91        signed_proposal_request.commit().as_ref(),
92    )
93    .wrap()
94    .context(error!("Failed to sign proposal. This should never happen."))?;
95
96    tracing::info!("Sending proposal request for view {view_number}");
97
98    // First, broadcast that we need a proposal to the current leader
99    broadcast_event(
100        HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
101        &event_sender,
102    )
103    .await;
104
105    let mut rx = event_receiver.clone();
106    // Make a background task to await the arrival of the event data.
107    let Ok(Some(proposal)) =
108        // We want to explicitly timeout here so we aren't waiting around for the data.
109        timeout(REQUEST_TIMEOUT, async move {
110            // We want to iterate until the proposal is not None, or until we reach the timeout.
111            while let Ok(event) = rx.recv_direct().await {
112                if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
113                    let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
114                    if leaf.view_number() == view_number && leaf.commit() == leaf_commit {
115                        return Some(quorum_proposal.clone());
116                    }
117                }
118            }
119            None
120        })
121        .await
122    else {
123        bail!("Request for proposal failed");
124    };
125
126    let view_number = proposal.data.view_number();
127    let justify_qc = proposal.data.justify_qc().clone();
128
129    let justify_qc_epoch = justify_qc.data.epoch();
130
131    let epoch_membership = membership_coordinator
132        .stake_table_for_epoch(justify_qc_epoch)
133        .await?;
134    let membership_stake_table = epoch_membership.stake_table().await;
135    let membership_success_threshold = epoch_membership.success_threshold().await;
136
137    justify_qc
138        .is_valid_cert(
139            &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
140            membership_success_threshold,
141            upgrade_lock,
142        )
143        .context(|e| warn!("Invalid justify_qc in proposal for view {view_number}: {e}"))?;
144
145    let mut consensus_writer = consensus.write().await;
146    let leaf = Leaf2::from_quorum_proposal(&proposal.data);
147    let state = Arc::new(
148        <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
149    );
150
151    if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
152        tracing::trace!("{e:?}");
153    }
154    let view = View {
155        view_inner: ViewInner::Leaf {
156            leaf: leaf.commit(),
157            state,
158            delta: None,
159            epoch: leaf.epoch(epoch_height),
160        },
161    };
162    Ok((leaf, view))
163}
164pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
165    membership: &Arc<RwLock<TYPES::Membership>>,
166    epoch: EpochNumber,
167    storage: &I::Storage,
168    drb_result: DrbResult,
169) {
170    tracing::debug!("Calling store_drb_result for epoch {epoch}");
171    if let Err(e) = storage.store_drb_result(epoch, drb_result).await {
172        tracing::error!("Failed to store drb result for epoch {epoch}: {e}");
173    }
174
175    membership.write().await.add_drb_result(epoch, drb_result)
176}
177
178/// Verify the DRB result from the proposal for the next epoch if this is the last block of the
179/// current epoch.
180///
181/// Uses the result from `start_drb_task`.
182///
183/// Returns an error if we should not vote.
184pub(crate) async fn verify_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
185    proposal: &QuorumProposalWrapper<TYPES>,
186    validation_info: &ValidationInfo<TYPES, I>,
187) -> Result<()> {
188    // Skip if this is not the expected block.
189    if validation_info.epoch_height == 0
190        || !is_epoch_transition(
191            proposal.block_header().block_number(),
192            validation_info.epoch_height,
193        )
194    {
195        tracing::debug!("Skipping DRB result verification");
196        return Ok(());
197    }
198
199    // #3967 REVIEW NOTE: Check if this is the right way to decide if we're doing epochs
200    // Alternatively, should we just return Err() if epochs aren't happening here? Or can we assume
201    // that epochs are definitely happening by virtue of getting here?
202    let epoch = option_epoch_from_block_number(
203        validation_info
204            .upgrade_lock
205            .epochs_enabled(proposal.view_number()),
206        proposal.block_header().block_number(),
207        validation_info.epoch_height,
208    );
209
210    let proposal_result = proposal
211        .next_drb_result()
212        .context(info!("Proposal is missing the next epoch's DRB result."))?;
213
214    if let Some(epoch_val) = epoch {
215        let current_epoch_membership = validation_info
216            .membership
217            .coordinator
218            .stake_table_for_epoch(epoch)
219            .await
220            .context(warn!("No stake table for epoch {}", epoch_val))?;
221
222        let has_stake_current_epoch = current_epoch_membership
223            .has_stake(&validation_info.public_key)
224            .await;
225
226        if has_stake_current_epoch {
227            let computed_result = current_epoch_membership
228                .next_epoch()
229                .await
230                .context(warn!("No stake table for epoch {}", epoch_val + 1))?
231                .get_epoch_drb()
232                .await
233                .clone()
234                .context(warn!("DRB result not found"))?;
235
236            ensure!(
237                proposal_result == computed_result,
238                warn!(
239                    "Our calculated DRB result is {computed_result:?}, which does not match the \
240                     proposed DRB result of {proposal_result:?}"
241                )
242            );
243        }
244
245        Ok(())
246    } else {
247        Err(error!("Epochs are not available"))
248    }
249}
250
251/// Handles calling add_epoch_root and sync_l1 on Membership if necessary.
252async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
253    decided_leaf: &Leaf2<TYPES>,
254    epoch_height: u64,
255    membership: &EpochMembershipCoordinator<TYPES>,
256    storage: &I::Storage,
257    consensus: &OuterConsensus<TYPES>,
258) {
259    let decided_leaf = decided_leaf.clone();
260    let decided_block_number = decided_leaf.block_header().block_number();
261
262    // Skip if this is not the expected block.
263    if epoch_height != 0 && is_epoch_root(decided_block_number, epoch_height) {
264        let next_epoch_number =
265            EpochNumber::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
266
267        let start = Instant::now();
268        if let Err(e) = storage
269            .store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
270            .await
271        {
272            tracing::error!("Failed to store epoch root for epoch {next_epoch_number}: {e}");
273        }
274        tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());
275
276        let membership = membership.clone();
277        let decided_block_header = decided_leaf.block_header().clone();
278        let storage = storage.clone();
279        let consensus = consensus.clone();
280
281        let consensus_reader = consensus.read().await;
282
283        drop(consensus_reader);
284
285        tokio::spawn(async move {
286            let membership_clone = membership.clone();
287
288            // First add the epoch root to `membership`
289            {
290                let start = Instant::now();
291                if let Err(e) = Membership::add_epoch_root(
292                    Arc::clone(membership_clone.membership()),
293                    decided_block_header,
294                )
295                .await
296                {
297                    tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
298                }
299                tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
300            }
301
302            let membership_clone = membership.clone();
303
304            let drb_result = membership_clone
305                .compute_drb_result(next_epoch_number, decided_leaf.clone())
306                .await;
307
308            let drb_result = match drb_result {
309                Ok(result) => result,
310                Err(e) => {
311                    tracing::error!("Failed to compute DRB result from decide: {e}");
312                    return;
313                },
314            };
315
316            let start = Instant::now();
317            handle_drb_result::<TYPES, I>(
318                membership.membership(),
319                next_epoch_number,
320                &storage,
321                drb_result,
322            )
323            .await;
324            tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
325        });
326    }
327}
328
329/// Helper type to give names and to the output values of the leaf chain traversal operation.
330#[derive(Debug)]
331pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
332    /// The new locked view obtained from a 2 chain starting from the proposal's parent.
333    pub new_locked_view_number: Option<ViewNumber>,
334
335    /// The new decided view obtained from a 3 chain starting from the proposal's parent.
336    pub new_decided_view_number: Option<ViewNumber>,
337
338    /// The QC signing the latest new leaf, causing it to become committed.
339    ///
340    /// Only present if a new leaf chain has been decided.
341    pub committing_qc: Option<CertificatePair<TYPES>>,
342
343    /// A second QC extending the committing QC, causing the new leaf chain to become decided.
344    ///
345    /// This is only applicable in HotStuff2, and will be [`None`] prior to HotShot version 0.3.
346    /// HotStuff1 (HotShot < 0.3) uses a different commit rule, which is not captured in this type.
347    pub deciding_qc: Option<CertificatePair<TYPES>>,
348
349    /// The decided leaves with corresponding validated state and VID info.
350    pub leaf_views: Vec<LeafInfo<TYPES>>,
351
352    /// The transactions in the block payload for each leaf.
353    pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
354
355    /// The most recent upgrade certificate from one of the leaves.
356    pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
357}
358
359/// We need Default to be implemented because the leaf ascension has very few failure branches,
360/// and when they *do* happen, we still return intermediate states. Default makes the burden
361/// of filling values easier.
362impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
363    /// The default method for this type is to set all of the returned values to `None`.
364    fn default() -> Self {
365        Self {
366            new_locked_view_number: None,
367            new_decided_view_number: None,
368            committing_qc: None,
369            deciding_qc: None,
370            leaf_views: Vec::new(),
371            included_txns: None,
372            decided_upgrade_cert: None,
373        }
374    }
375}
376
377async fn update_metrics<TYPES: NodeType>(
378    consensus: &OuterConsensus<TYPES>,
379    leaf_views: &[LeafInfo<TYPES>],
380) {
381    let consensus_reader = consensus.read().await;
382    let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
383
384    for leaf_view in leaf_views {
385        let proposal_timestamp = leaf_view.leaf.block_header().timestamp();
386
387        let Some(proposal_to_decide_time) = now.checked_sub(proposal_timestamp) else {
388            tracing::error!("Failed to calculate proposal to decide time: {proposal_timestamp}");
389            continue;
390        };
391        consensus_reader
392            .metrics
393            .proposal_to_decide_time
394            .add_point(proposal_to_decide_time as f64);
395        if let Some(txn_bytes) = leaf_view.leaf.block_payload().map(|p| p.txn_bytes()) {
396            consensus_reader
397                .metrics
398                .finalized_bytes
399                .add_point(txn_bytes as f64);
400        }
401    }
402}
403
404/// calculate the new decided leaf chain based on the rules of HotStuff 2
405///
406/// # Panics
407/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
408/// impossible.
409#[allow(clippy::too_many_arguments)]
410pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>>(
411    proposal: &QuorumProposalWrapper<TYPES>,
412    consensus: OuterConsensus<TYPES>,
413    upgrade_lock: &UpgradeLock<TYPES>,
414    public_key: &TYPES::SignatureKey,
415    with_epochs: bool,
416    membership: &EpochMembershipCoordinator<TYPES>,
417    storage: &I::Storage,
418) -> LeafChainTraversalOutcome<TYPES> {
419    let mut res = LeafChainTraversalOutcome::default();
420    let consensus_reader = consensus.read().await;
421    let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
422    res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
423
424    // If we don't have the proposals parent return early
425    let Some(parent_info) = consensus_reader
426        .parent_leaf_info(&proposed_leaf, public_key)
427        .await
428    else {
429        return res;
430    };
431    // Get the parents parent and check if it's consecutive in view to the parent, if so we can decided
432    // the grandparents view.  If not we're done.
433    let Some(grand_parent_info) = consensus_reader
434        .parent_leaf_info(&parent_info.leaf, public_key)
435        .await
436    else {
437        return res;
438    };
439    if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
440        return res;
441    }
442    res.committing_qc = Some(CertificatePair::for_parent(&parent_info.leaf));
443    res.deciding_qc = Some(CertificatePair::for_parent(&proposed_leaf));
444    let decided_view_number = grand_parent_info.leaf.view_number();
445    res.new_decided_view_number = Some(decided_view_number);
446    // We've reached decide, now get the leaf chain all the way back to the last decided view, not including it.
447    let old_anchor_view = consensus_reader.last_decided_view();
448    let mut current_leaf_info = Some(grand_parent_info);
449    let existing_upgrade_cert_reader = upgrade_lock.decided_upgrade_cert();
450    let mut txns = HashSet::new();
451    while current_leaf_info
452        .as_ref()
453        .is_some_and(|info| info.leaf.view_number() > old_anchor_view)
454    {
455        // unwrap is safe, we just checked that he option is some
456        let info = &mut current_leaf_info.unwrap();
457        // Check if there's a new upgrade certificate available.
458        if let Some(cert) = info.leaf.upgrade_certificate()
459            && info.leaf.upgrade_certificate() != existing_upgrade_cert_reader
460        {
461            if cert.data.decide_by < decided_view_number {
462                tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
463            } else {
464                tracing::info!("Reached decide on upgrade certificate: {cert:?}");
465                res.decided_upgrade_cert = Some(cert.clone());
466            }
467        }
468
469        // If the block payload is available for this leaf, include it in
470        // the leaf chain that we send to the client.
471        if let Some(payload) = consensus_reader
472            .saved_payloads()
473            .get(&info.leaf.view_number())
474        {
475            info.leaf
476                .fill_block_payload_unchecked(payload.as_ref().payload.clone());
477        }
478
479        if let Some(ref payload) = info.leaf.block_payload() {
480            for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
481                txns.insert(txn);
482            }
483        }
484
485        current_leaf_info = consensus_reader
486            .parent_leaf_info(&info.leaf, public_key)
487            .await;
488        res.leaf_views.push(info.clone());
489    }
490
491    if !txns.is_empty() {
492        res.included_txns = Some(txns);
493    }
494
495    if with_epochs && res.new_decided_view_number.is_some() {
496        let Some(first_leaf) = res.leaf_views.first() else {
497            return res;
498        };
499        let epoch_height = consensus_reader.epoch_height;
500        consensus_reader
501            .metrics
502            .last_synced_block_height
503            .set(usize::try_from(first_leaf.leaf.height()).unwrap_or(0));
504        drop(consensus_reader);
505
506        for decided_leaf_info in &res.leaf_views {
507            decide_epoch_root::<TYPES, I>(
508                &decided_leaf_info.leaf,
509                epoch_height,
510                membership,
511                storage,
512                &consensus,
513            )
514            .await;
515        }
516        update_metrics(&consensus, &res.leaf_views).await;
517    }
518
519    res
520}
521
522/// Ascends the leaf chain by traversing through the parent commitments of the proposal. We begin
523/// by obtaining the parent view, and if we are in a chain (i.e. the next view from the parent is
524/// one view newer), then we begin attempting to form the chain. This is a direct impl from
525/// [HotStuff](https://arxiv.org/pdf/1803.05069) section 5:
526///
527/// > When a node b* carries a QC that refers to a direct parent, i.e., b*.justify.node = b*.parent,
528/// > we say that it forms a One-Chain. Denote by b'' = b*.justify.node. Node b* forms a Two-Chain,
529/// > if in addition to forming a One-Chain, b''.justify.node = b''.parent.
530/// > It forms a Three-Chain, if b'' forms a Two-Chain.
531///
532/// We follow this exact logic to determine if we are able to reach a commit and a decide. A commit
533/// is reached when we have a two chain, and a decide is reached when we have a three chain.
534///
535/// # Example
536/// Suppose we have a decide for view 1, and we then move on to get undecided views 2, 3, and 4. Further,
537/// suppose that our *next* proposal is for view 5, but this leader did not see info for view 4, so the
538/// justify qc of the proposal points to view 3. This is fine, and the undecided chain now becomes
539/// 2-3-5.
540///
541/// Assuming we continue with honest leaders, we then eventually could get a chain like: 2-3-5-6-7-8. This
542/// will prompt a decide event to occur (this code), where the `proposal` is for view 8. Now, since the
543/// lowest value in the 3-chain here would be 5 (excluding 8 since we only walk the parents), we begin at
544/// the first link in the chain, and walk back through all undecided views, making our new anchor view 5,
545/// and out new locked view will be 6.
546///
547/// Upon receipt then of a proposal for view 9, assuming it is valid, this entire process will repeat, and
548/// the anchor view will be set to view 6, with the locked view as view 7.
549///
550/// # Panics
551/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
552/// impossible.
553#[allow(clippy::too_many_arguments)]
554pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>>(
555    proposal: &QuorumProposalWrapper<TYPES>,
556    consensus: OuterConsensus<TYPES>,
557    upgrade_lock: &UpgradeLock<TYPES>,
558    public_key: &TYPES::SignatureKey,
559    with_epochs: bool,
560    membership: &EpochMembershipCoordinator<TYPES>,
561    storage: &I::Storage,
562    epoch_height: u64,
563) -> LeafChainTraversalOutcome<TYPES> {
564    let consensus_reader = consensus.read().await;
565    let existing_upgrade_cert_reader = upgrade_lock.decided_upgrade_cert();
566    let view_number = proposal.view_number();
567    let parent_view_number = proposal.justify_qc().view_number();
568    let old_anchor_view = consensus_reader.last_decided_view();
569
570    let mut last_view_number_visited = view_number;
571    let mut current_chain_length = 0usize;
572    let mut res = LeafChainTraversalOutcome::default();
573
574    if let Err(e) = consensus_reader.visit_leaf_ancestors(
575        parent_view_number,
576        Terminator::Exclusive(old_anchor_view),
577        true,
578        |leaf, state, delta| {
579            // This is the core paper logic. We're implementing the chain in chained hotstuff.
580            if res.new_decided_view_number.is_none() {
581                // If the last view number is the child of the leaf we've moved to...
582                if last_view_number_visited == leaf.view_number() + 1 {
583                    last_view_number_visited = leaf.view_number();
584
585                    // The chain grows by one
586                    current_chain_length += 1;
587
588                    // We emit a locked view when the chain length is 2
589                    if current_chain_length == 2 {
590                        res.new_locked_view_number = Some(leaf.view_number());
591                        // The next leaf in the chain, if there is one, is decided, so this
592                        // leaf's justify_qc would become the QC for the decided chain.
593                        res.committing_qc = Some(CertificatePair::for_parent(leaf));
594                    } else if current_chain_length == 3 {
595                        // And we decide when the chain length is 3.
596                        res.new_decided_view_number = Some(leaf.view_number());
597                    }
598                } else {
599                    // There isn't a new chain extension available, so we signal to the callback
600                    // owner that we can exit for now.
601                    return false;
602                }
603            }
604
605            // Now, if we *have* reached a decide, we need to do some state updates.
606            if let Some(new_decided_view) = res.new_decided_view_number {
607                // First, get a mutable reference to the provided leaf.
608                let mut leaf = leaf.clone();
609
610                // Update the metrics
611                if leaf.view_number() == new_decided_view {
612                    consensus_reader
613                        .metrics
614                        .last_synced_block_height
615                        .set(usize::try_from(leaf.height()).unwrap_or(0));
616                }
617
618                // Check if there's a new upgrade certificate available.
619                if let Some(cert) = leaf.upgrade_certificate()
620                    && leaf.upgrade_certificate() != existing_upgrade_cert_reader
621                {
622                    if cert.data.decide_by < view_number {
623                        tracing::warn!(
624                            "Failed to decide an upgrade certificate in time. Ignoring."
625                        );
626                    } else {
627                        tracing::info!("Reached decide on upgrade certificate: {cert:?}");
628                        res.decided_upgrade_cert = Some(cert.clone());
629                    }
630                }
631                // If the block payload is available for this leaf, include it in
632                // the leaf chain that we send to the client.
633                if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
634                    leaf.fill_block_payload_unchecked(payload.as_ref().payload.clone());
635                }
636
637                // Get the VID share at the leaf's view number, corresponding to our key
638                // (if one exists)
639                let vid_share = consensus_reader
640                    .vid_shares()
641                    .get(&leaf.view_number())
642                    .and_then(|key_map| key_map.get(public_key))
643                    .and_then(|epoch_map| epoch_map.get(&leaf.epoch(epoch_height)))
644                    .map(|prop| prop.data.clone());
645
646                let state_cert = if leaf.with_epoch
647                    && is_epoch_root(
648                        leaf.block_header().block_number(),
649                        consensus_reader.epoch_height,
650                    ) {
651                    match consensus_reader.state_cert() {
652                        // Sanity check that the state cert is for the same view as the decided leaf
653                        Some(state_cert)
654                            if state_cert.light_client_state.view_number
655                                == leaf.view_number().u64() =>
656                        {
657                            Some(state_cert.clone())
658                        },
659                        _ => None,
660                    }
661                } else {
662                    None
663                };
664
665                // Add our data into a new `LeafInfo`
666                res.leaf_views.push(LeafInfo::new(
667                    leaf.clone(),
668                    Arc::clone(&state),
669                    delta.clone(),
670                    vid_share,
671                    state_cert,
672                ));
673                if let Some(ref payload) = leaf.block_payload() {
674                    res.included_txns = Some(
675                        payload
676                            .transaction_commitments(leaf.block_header().metadata())
677                            .into_iter()
678                            .collect::<HashSet<_>>(),
679                    );
680                }
681            }
682            true
683        },
684    ) {
685        tracing::debug!("Leaf ascension failed; error={e}");
686    }
687
688    let epoch_height = consensus_reader.epoch_height;
689    drop(consensus_reader);
690
691    if with_epochs && res.new_decided_view_number.is_some() {
692        for decided_leaf_info in &res.leaf_views {
693            decide_epoch_root::<TYPES, I>(
694                &decided_leaf_info.leaf,
695                epoch_height,
696                membership,
697                storage,
698                &consensus,
699            )
700            .await;
701        }
702    }
703
704    res
705}
706
707/// Gets the parent leaf and state from the parent of a proposal, returning an [`utils::anytrace::Error`] if not.
708#[instrument(skip_all)]
709#[allow(clippy::too_many_arguments)]
710pub(crate) async fn parent_leaf_and_state<TYPES: NodeType>(
711    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
712    event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
713    membership: EpochMembershipCoordinator<TYPES>,
714    public_key: TYPES::SignatureKey,
715    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
716    consensus: OuterConsensus<TYPES>,
717    upgrade_lock: &UpgradeLock<TYPES>,
718    parent_qc: &QuorumCertificate2<TYPES>,
719    epoch_height: u64,
720) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
721    let consensus_reader = consensus.read().await;
722    let vsm_contains_parent_view = consensus_reader
723        .validated_state_map()
724        .contains_key(&parent_qc.view_number());
725    drop(consensus_reader);
726
727    if !vsm_contains_parent_view {
728        let _ = fetch_proposal(
729            parent_qc,
730            event_sender.clone(),
731            event_receiver.clone(),
732            membership,
733            consensus.clone(),
734            public_key.clone(),
735            private_key.clone(),
736            upgrade_lock,
737            epoch_height,
738        )
739        .await
740        .context(info!("Failed to fetch proposal"))?;
741    }
742
743    let consensus_reader = consensus.read().await;
744    let parent_view = consensus_reader
745        .validated_state_map()
746        .get(&parent_qc.view_number())
747        .context(debug!(
748            "Couldn't find parent view in state map, waiting for replica to see proposal; \
749             parent_view_number: {}",
750            *parent_qc.view_number()
751        ))?;
752
753    let (leaf_commitment, state) = parent_view.leaf_and_state().context(info!(
754        "Parent of high QC points to a view without a proposal; parent_view_number: {}, \
755         parent_view {:?}",
756        *parent_qc.view_number(),
757        parent_view
758    ))?;
759
760    if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
761        // NOTE: This happens on the genesis block
762        tracing::debug!(
763            "They don't equal: {:?}   {:?}",
764            leaf_commitment,
765            consensus_reader.high_qc().data().leaf_commit
766        );
767    }
768
769    let leaf = consensus_reader
770        .saved_leaves()
771        .get(&leaf_commitment)
772        .context(info!("Failed to find high QC of parent"))?;
773
774    Ok((leaf.clone(), Arc::clone(state)))
775}
776
777pub(crate) async fn update_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
778    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
779    validation_info: &ValidationInfo<TYPES, I>,
780) -> Result<()> {
781    let in_transition_epoch = proposal
782        .data
783        .justify_qc()
784        .data
785        .block_number
786        .is_some_and(|bn| {
787            !is_transition_block(bn, validation_info.epoch_height)
788                && is_epoch_transition(bn, validation_info.epoch_height)
789                && bn % validation_info.epoch_height != 0
790        });
791    let justify_qc = proposal.data.justify_qc();
792    let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc();
793    if !in_transition_epoch {
794        tracing::debug!(
795            "Storing high QC for view {:?} and height {:?}",
796            justify_qc.view_number(),
797            justify_qc.data.block_number
798        );
799        if let Err(e) = validation_info
800            .storage
801            .update_high_qc2(justify_qc.clone())
802            .await
803        {
804            bail!("Failed to store High QC, not voting; error = {e:?}");
805        }
806        if justify_qc
807            .data
808            .block_number
809            .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
810        {
811            let Some(state_cert) = proposal.data.state_cert() else {
812                bail!("Epoch root QC has no state cert, not voting!");
813            };
814            if let Err(e) = validation_info
815                .storage
816                .update_state_cert(state_cert.clone())
817                .await
818            {
819                bail!(
820                    "Failed to store the light client state update certificate, not voting; error \
821                     = {:?}",
822                    e
823                );
824            }
825            validation_info
826                .consensus
827                .write()
828                .await
829                .update_state_cert(state_cert.clone())?;
830        }
831        if let Some(next_epoch_justify_qc) = maybe_next_epoch_justify_qc
832            && let Err(e) = validation_info
833                .storage
834                .update_next_epoch_high_qc2(next_epoch_justify_qc.clone())
835                .await
836        {
837            bail!("Failed to store next epoch High QC, not voting; error = {e:?}");
838        }
839    }
840    let mut consensus_writer = validation_info.consensus.write().await;
841    if let Some(next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
842        if justify_qc
843            .data
844            .block_number
845            .is_some_and(|bn| is_transition_block(bn, validation_info.epoch_height))
846        {
847            consensus_writer.reset_high_qc(justify_qc.clone(), next_epoch_justify_qc.clone())?;
848            consensus_writer
849                .update_transition_qc(justify_qc.clone(), next_epoch_justify_qc.clone());
850            return Ok(());
851        }
852        consensus_writer.update_next_epoch_high_qc(next_epoch_justify_qc.clone())?;
853    }
854    consensus_writer.update_high_qc(justify_qc.clone())?;
855
856    Ok(())
857}
858
859async fn transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
860    validation_info: &ValidationInfo<TYPES, I>,
861) -> Option<(
862    QuorumCertificate2<TYPES>,
863    NextEpochQuorumCertificate2<TYPES>,
864)> {
865    validation_info
866        .consensus
867        .read()
868        .await
869        .transition_qc()
870        .cloned()
871}
872
873pub(crate) async fn validate_epoch_transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
874    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
875    validation_info: &ValidationInfo<TYPES, I>,
876) -> Result<()> {
877    let proposed_qc = proposal.data.justify_qc();
878    let Some(qc_block_number) = proposed_qc.data().block_number else {
879        bail!("Justify QC has no block number");
880    };
881    if !is_epoch_transition(qc_block_number, validation_info.epoch_height)
882        || qc_block_number % validation_info.epoch_height == 0
883    {
884        return Ok(());
885    }
886    let Some(next_epoch_qc) = proposal.data.next_epoch_justify_qc() else {
887        bail!("Next epoch justify QC is not present");
888    };
889    ensure!(
890        next_epoch_qc.data.leaf_commit == proposed_qc.data().leaf_commit,
891        "Next epoch QC has different leaf commit to justify QC"
892    );
893
894    if is_transition_block(qc_block_number, validation_info.epoch_height) {
895        // Height is epoch height - 2
896        ensure!(
897            transition_qc(validation_info)
898                .await
899                .is_none_or(|(qc, _)| qc.view_number() <= proposed_qc.view_number()),
900            "Proposed transition qc must have view number greater than or equal to previous \
901             transition QC"
902        );
903
904        validation_info
905            .consensus
906            .write()
907            .await
908            .update_transition_qc(proposed_qc.clone(), next_epoch_qc.clone());
909        // reset the high qc to the transition qc
910        update_high_qc(proposal, validation_info).await?;
911    } else {
912        // Height is either epoch height - 1 or epoch height
913        ensure!(
914            transition_qc(validation_info)
915                .await
916                .is_none_or(|(qc, _)| qc.view_number() < proposed_qc.view_number()),
917            "Transition block must have view number greater than previous transition QC"
918        );
919        ensure!(
920            proposal.data.view_change_evidence().is_none(),
921            "Second to last block and last block of epoch must directly extend previous block, Qc \
922             Block number: {qc_block_number}, Proposal Block number: {}",
923            proposal.data.block_header().block_number()
924        );
925        ensure!(
926            proposed_qc.view_number() + 1 == proposal.data.view_number()
927                || transition_qc(validation_info)
928                    .await
929                    .is_some_and(|(qc, _)| &qc == proposed_qc),
930            "Transition proposals must extend the previous view directly, or extend the previous \
931             transition block"
932        );
933    }
934    Ok(())
935}
936
937/// Validate the state and safety and liveness of a proposal then emit
938/// a `QuorumProposalValidated` event.
939///
940///
941/// # Errors
942/// If any validation or state update fails.
943#[allow(clippy::too_many_lines)]
944#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
945pub(crate) async fn validate_proposal_safety_and_liveness<
946    TYPES: NodeType,
947    I: NodeImplementation<TYPES>,
948>(
949    proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
950    parent_leaf: Leaf2<TYPES>,
951    validation_info: &ValidationInfo<TYPES, I>,
952    event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
953    sender: TYPES::SignatureKey,
954) -> Result<()> {
955    let view_number = proposal.data.view_number();
956
957    let mut valid_epoch_transition = false;
958    if validation_info
959        .upgrade_lock
960        .version(proposal.data.justify_qc().view_number())
961        .is_ok_and(|v| v >= EPOCH_VERSION)
962    {
963        let Some(block_number) = proposal.data.justify_qc().data.block_number else {
964            bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
965        };
966        if is_epoch_transition(block_number, validation_info.epoch_height) {
967            validate_epoch_transition_qc(&proposal, validation_info).await?;
968            valid_epoch_transition = true;
969        }
970    }
971
972    let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
973    ensure!(
974        proposed_leaf.parent_commitment() == parent_leaf.commit(),
975        "Proposed leaf does not extend the parent leaf."
976    );
977    let proposal_epoch = option_epoch_from_block_number(
978        validation_info.upgrade_lock.epochs_enabled(view_number),
979        proposed_leaf.height(),
980        validation_info.epoch_height,
981    );
982
983    let state = Arc::new(
984        <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
985    );
986
987    {
988        let mut consensus_writer = validation_info.consensus.write().await;
989        if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
990            tracing::trace!("{e:?}");
991        }
992
993        // Update our internal storage of the proposal. The proposal is valid, so
994        // we swallow this error and just log if it occurs.
995        if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
996            tracing::debug!("Internal proposal update failed; error = {e:#}");
997        };
998    }
999
1000    UpgradeCertificate::validate(
1001        proposal.data.upgrade_certificate(),
1002        &validation_info.membership,
1003        proposal_epoch,
1004        &validation_info.upgrade_lock,
1005    )
1006    .await?;
1007
1008    // Validate that the upgrade certificate is re-attached, if we saw one on the parent
1009    proposed_leaf.extends_upgrade(&parent_leaf, &validation_info.upgrade_lock)?;
1010
1011    let justify_qc = proposal.data.justify_qc().clone();
1012    // Create a positive vote if either liveness or safety check
1013    // passes.
1014
1015    {
1016        let consensus_reader = validation_info.consensus.read().await;
1017        // Epoch safety check:
1018        // The proposal is safe if
1019        // 1. the proposed block and the justify QC block belong to the same epoch or
1020        // 2. the justify QC is the eQC for the previous block
1021        let justify_qc_epoch = option_epoch_from_block_number(
1022            validation_info.upgrade_lock.epochs_enabled(view_number),
1023            parent_leaf.height(),
1024            validation_info.epoch_height,
1025        );
1026        ensure!(
1027            proposal_epoch == justify_qc_epoch
1028                || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
1029            {
1030                error!(
1031                    "Failed epoch safety check \n Proposed leaf is {proposed_leaf:?} \n justify \
1032                     QC leaf is {parent_leaf:?}"
1033                )
1034            }
1035        );
1036
1037        // Make sure that the epoch transition proposal includes the next epoch QC
1038        if is_epoch_transition(parent_leaf.height(), validation_info.epoch_height)
1039            && validation_info.upgrade_lock.epochs_enabled(view_number)
1040        {
1041            ensure!(
1042                proposal.data.next_epoch_justify_qc().is_some(),
1043                "Epoch transition proposal does not include the next epoch justify QC. Do not \
1044                 vote!"
1045            );
1046        }
1047
1048        // Liveness check.
1049        let liveness_check =
1050            justify_qc.view_number() > consensus_reader.locked_view() || valid_epoch_transition;
1051
1052        // Safety check.
1053        // Check if proposal extends from the locked leaf.
1054        let outcome = consensus_reader.visit_leaf_ancestors(
1055            justify_qc.view_number(),
1056            Terminator::Inclusive(consensus_reader.locked_view()),
1057            false,
1058            |leaf, _, _| {
1059                // if leaf view no == locked view no then we're done, report success by
1060                // returning true
1061                leaf.view_number() != consensus_reader.locked_view()
1062            },
1063        );
1064        let safety_check = outcome.is_ok();
1065
1066        ensure!(safety_check || liveness_check, {
1067            if let Err(e) = outcome {
1068                broadcast_event(
1069                    Event {
1070                        view_number,
1071                        event: EventType::Error { error: Arc::new(e) },
1072                    },
1073                    &validation_info.output_event_stream,
1074                )
1075                .await;
1076            }
1077
1078            error!(
1079                "Failed safety and liveness check \n High QC is {:?}  Proposal QC is {:?}  Locked \
1080                 view is {:?}",
1081                consensus_reader.high_qc(),
1082                proposal.data,
1083                consensus_reader.locked_view()
1084            )
1085        });
1086    }
1087
1088    // We accept the proposal, notify the application layer
1089    broadcast_event(
1090        Event {
1091            view_number,
1092            event: EventType::QuorumProposal {
1093                proposal: proposal.clone(),
1094                sender,
1095            },
1096        },
1097        &validation_info.output_event_stream,
1098    )
1099    .await;
1100
1101    // Notify other tasks
1102    broadcast_event(
1103        Arc::new(HotShotEvent::QuorumProposalValidated(
1104            proposal.clone(),
1105            parent_leaf,
1106        )),
1107        &event_stream,
1108    )
1109    .await;
1110
1111    Ok(())
1112}
1113
1114/// Validates, from a given `proposal` that the view that it is being submitted for is valid when
1115/// compared to `cur_view` which is the highest proposed view (so far) for the caller. If the proposal
1116/// is for a view that's later than expected, that the proposal includes a timeout or view sync certificate.
1117///
1118/// # Errors
1119/// If any validation or view number check fails.
1120pub(crate) async fn validate_proposal_view_and_certs<
1121    TYPES: NodeType,
1122    I: NodeImplementation<TYPES>,
1123>(
1124    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1125    validation_info: &ValidationInfo<TYPES, I>,
1126) -> Result<()> {
1127    let view_number = proposal.data.view_number();
1128    ensure!(
1129        view_number + 1 >= validation_info.consensus.read().await.cur_view(),
1130        "Proposal is from an older view {:?}",
1131        proposal.data
1132    );
1133
1134    // Validate the proposal's signature. This should also catch if the leaf_commitment does not equal our calculated parent commitment
1135    let mut membership = validation_info.membership.clone();
1136    proposal.validate_signature(&membership).await?;
1137
1138    // Verify a timeout certificate OR a view sync certificate exists and is valid.
1139    if proposal.data.justify_qc().view_number() != view_number - 1 {
1140        let received_proposal_cert =
1141            proposal
1142                .data
1143                .view_change_evidence()
1144                .clone()
1145                .context(debug!(
1146                    "Quorum proposal for view {view_number} needed a timeout or view sync \
1147                     certificate, but did not have one",
1148                ))?;
1149
1150        match received_proposal_cert {
1151            ViewChangeEvidence2::Timeout(timeout_cert) => {
1152                ensure!(
1153                    timeout_cert.data().view == view_number - 1,
1154                    "Timeout certificate for view {view_number} was not for the immediately \
1155                     preceding view"
1156                );
1157                let timeout_cert_epoch = timeout_cert.data().epoch();
1158                membership = membership.get_new_epoch(timeout_cert_epoch).await?;
1159
1160                let membership_stake_table = membership.stake_table().await;
1161                let membership_success_threshold = membership.success_threshold().await;
1162
1163                timeout_cert
1164                    .is_valid_cert(
1165                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1166                        membership_success_threshold,
1167                        &validation_info.upgrade_lock,
1168                    )
1169                    .context(|e| {
1170                        warn!("Timeout certificate for view {view_number} was invalid: {e}")
1171                    })?;
1172            },
1173            ViewChangeEvidence2::ViewSync(view_sync_cert) => {
1174                ensure!(
1175                    view_sync_cert.view_number == view_number,
1176                    "View sync cert view number {:?} does not match proposal view number {:?}",
1177                    view_sync_cert.view_number,
1178                    view_number
1179                );
1180
1181                let view_sync_cert_epoch = view_sync_cert.data().epoch();
1182                membership = membership.get_new_epoch(view_sync_cert_epoch).await?;
1183
1184                let membership_stake_table = membership.stake_table().await;
1185                let membership_success_threshold = membership.success_threshold().await;
1186
1187                // View sync certs must also be valid.
1188                view_sync_cert
1189                    .is_valid_cert(
1190                        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1191                        membership_success_threshold,
1192                        &validation_info.upgrade_lock,
1193                    )
1194                    .context(|e| warn!("Invalid view sync finalize cert provided: {e}"))?;
1195            },
1196        }
1197    }
1198
1199    // Validate the upgrade certificate -- this is just a signature validation.
1200    // Note that we don't do anything with the certificate directly if this passes; it eventually gets stored as part of the leaf if nothing goes wrong.
1201    {
1202        let epoch = option_epoch_from_block_number(
1203            proposal.data.epoch().is_some(),
1204            proposal.data.block_header().block_number(),
1205            validation_info.epoch_height,
1206        );
1207        UpgradeCertificate::validate(
1208            proposal.data.upgrade_certificate(),
1209            &validation_info.membership,
1210            epoch,
1211            &validation_info.upgrade_lock,
1212        )
1213        .await?;
1214    }
1215
1216    Ok(())
1217}
1218
1219/// Helper function to send events and log errors
1220pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
1221    match sender.broadcast_direct(event).await {
1222        Ok(None) => (),
1223        Ok(Some(overflowed)) => {
1224            tracing::error!(
1225                "Event sender queue overflow, Oldest event removed form queue: {overflowed:?}"
1226            );
1227        },
1228        Err(SendError(e)) => {
1229            tracing::warn!("Event: {e:?}\n Sending failed, event stream probably shutdown");
1230        },
1231    }
1232}
1233
1234/// Validates qc's signatures and, if provided, validates next_epoch_qc's signatures and whether it
1235/// corresponds to the provided high_qc.
1236pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType>(
1237    qc: &QuorumCertificate2<TYPES>,
1238    maybe_next_epoch_qc: Option<&NextEpochQuorumCertificate2<TYPES>>,
1239    consensus: &OuterConsensus<TYPES>,
1240    membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1241    upgrade_lock: &UpgradeLock<TYPES>,
1242    epoch_height: u64,
1243) -> Result<()> {
1244    let cert = CertificatePair::new(qc.clone(), maybe_next_epoch_qc.cloned());
1245
1246    let mut epoch_membership = membership_coordinator
1247        .stake_table_for_epoch(cert.epoch())
1248        .await?;
1249
1250    let membership_stake_table = epoch_membership.stake_table().await;
1251    let membership_success_threshold = epoch_membership.success_threshold().await;
1252
1253    if let Err(e) = cert.qc().is_valid_cert(
1254        &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1255        membership_success_threshold,
1256        upgrade_lock,
1257    ) {
1258        consensus.read().await.metrics.invalid_qc.update(1);
1259        return Err(warn!("Invalid certificate: {e}"));
1260    }
1261
1262    // Check the next epoch QC if required.
1263    if upgrade_lock.epochs_enabled(cert.view_number())
1264        && let Some(next_epoch_qc) = cert.verify_next_epoch_qc(epoch_height)?
1265    {
1266        epoch_membership = epoch_membership.next_epoch_stake_table().await?;
1267        let membership_next_stake_table = epoch_membership.stake_table().await;
1268        let membership_next_success_threshold = epoch_membership.success_threshold().await;
1269        next_epoch_qc
1270            .is_valid_cert(
1271                &StakeTableEntries::<TYPES>::from(membership_next_stake_table).0,
1272                membership_next_success_threshold,
1273                upgrade_lock,
1274            )
1275            .context(|e| warn!("Invalid next epoch certificate: {e}"))?;
1276    }
1277
1278    Ok(())
1279}
1280
1281/// Validates the light client state update certificate
1282pub async fn validate_light_client_state_update_certificate<TYPES: NodeType>(
1283    state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1284    membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1285    upgrade_lock: &UpgradeLock<TYPES>,
1286) -> Result<()> {
1287    tracing::debug!("Validating light client state update certificate");
1288
1289    let epoch_membership = membership_coordinator
1290        .membership_for_epoch(state_cert.epoch())
1291        .await?;
1292
1293    let membership_stake_table = epoch_membership.stake_table().await;
1294    let membership_success_threshold = epoch_membership.success_threshold().await;
1295
1296    let mut state_key_map = HashMap::new();
1297    membership_stake_table.into_iter().for_each(|config| {
1298        state_key_map.insert(
1299            config.state_ver_key.clone(),
1300            config.stake_table_entry.stake(),
1301        );
1302    });
1303
1304    let mut accumulated_stake = U256::from(0);
1305    let signed_state_digest = derive_signed_state_digest(
1306        &state_cert.light_client_state,
1307        &state_cert.next_stake_table_state,
1308        &state_cert.auth_root,
1309    );
1310    for (key, sig, sig_v2) in state_cert.signatures.iter() {
1311        if let Some(stake) = state_key_map.get(key) {
1312            accumulated_stake += *stake;
1313            #[allow(clippy::collapsible_else_if)]
1314            // We only perform the second signature check prior to the DrbAndHeaderUpgrade
1315            if !upgrade_lock
1316                .proposal2_version(ViewNumber::new(state_cert.light_client_state.view_number))
1317            {
1318                if !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1319                    key,
1320                    sig_v2,
1321                    &state_cert.light_client_state,
1322                    &state_cert.next_stake_table_state,
1323                ) {
1324                    bail!("Invalid light client state update certificate signature");
1325                }
1326            } else {
1327                if !<TYPES::StateSignatureKey as LCV3StateSignatureKey>::verify_state_sig(
1328                    key,
1329                    sig,
1330                    signed_state_digest,
1331                ) || !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1332                    key,
1333                    sig_v2,
1334                    &state_cert.light_client_state,
1335                    &state_cert.next_stake_table_state,
1336                ) {
1337                    bail!("Invalid light client state update certificate signature");
1338                }
1339            }
1340        } else {
1341            bail!("Invalid light client state update certificate signature");
1342        }
1343    }
1344    if accumulated_stake < membership_success_threshold {
1345        bail!("Light client state update certificate does not meet the success threshold");
1346    }
1347
1348    Ok(())
1349}
1350
1351pub(crate) fn check_qc_state_cert_correspondence<TYPES: NodeType>(
1352    qc: &QuorumCertificate2<TYPES>,
1353    state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1354    epoch_height: u64,
1355) -> bool {
1356    qc.data
1357        .block_number
1358        .is_some_and(|bn| is_epoch_root(bn, epoch_height))
1359        && Some(state_cert.epoch) == qc.data.epoch()
1360        && qc.view_number().u64() == state_cert.light_client_state.view_number
1361}
1362
1363/// Gets the second VID share, the current or the next epoch accordingly, from the shared consensus state;
1364/// makes sure it corresponds to the given DA certificate;
1365/// if it's not yet available, waits for it with the given timeout.
1366pub async fn wait_for_second_vid_share<TYPES: NodeType>(
1367    target_epoch: Option<EpochNumber>,
1368    vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
1369    da_cert: &DaCertificate2<TYPES>,
1370    consensus: &OuterConsensus<TYPES>,
1371    receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
1372    cancel_receiver: Receiver<()>,
1373    id: u64,
1374) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
1375    tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
1376    let maybe_second_vid_share = consensus
1377        .read()
1378        .await
1379        .vid_shares()
1380        .get(&vid_share.data.view_number())
1381        .and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
1382        .and_then(|epoch_map| epoch_map.get(&target_epoch))
1383        .cloned();
1384    if let Some(second_vid_share) = maybe_second_vid_share
1385        && ((target_epoch == da_cert.epoch()
1386            && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
1387            || (target_epoch != da_cert.epoch()
1388                && Some(second_vid_share.data.payload_commitment())
1389                    == da_cert.data().next_epoch_payload_commit))
1390    {
1391        return Ok(second_vid_share);
1392    }
1393
1394    let receiver = receiver.clone();
1395    let da_cert_clone = da_cert.clone();
1396    let Some(event) = EventDependency::new(
1397        receiver,
1398        cancel_receiver,
1399        format!(
1400            "VoteDependency Second VID share for view {:?}, my id {:?}",
1401            vid_share.data.view_number(),
1402            id
1403        ),
1404        Box::new(move |event| {
1405            let event = event.as_ref();
1406            if let HotShotEvent::VidShareValidated(second_vid_share) = event {
1407                if target_epoch == da_cert_clone.epoch() {
1408                    second_vid_share.data.payload_commitment()
1409                        == da_cert_clone.data().payload_commit
1410                } else {
1411                    Some(second_vid_share.data.payload_commitment())
1412                        == da_cert_clone.data().next_epoch_payload_commit
1413                }
1414            } else {
1415                false
1416            }
1417        }),
1418    )
1419    .completed()
1420    .await
1421    else {
1422        return Err(warn!("Error while waiting for the second VID share."));
1423    };
1424    let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
1425        // this shouldn't happen
1426        return Err(warn!(
1427            "Received event is not VidShareValidated but we checked it earlier. Shouldn't be \
1428             possible."
1429        ));
1430    };
1431    Ok(second_vid_share.clone())
1432}
1433
1434pub async fn broadcast_view_change<TYPES: NodeType>(
1435    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
1436    new_view_number: ViewNumber,
1437    epoch: Option<EpochNumber>,
1438    first_epoch: Option<(ViewNumber, EpochNumber)>,
1439) {
1440    let mut broadcast_epoch = epoch;
1441    if let Some((first_epoch_view, first_epoch)) = first_epoch
1442        && new_view_number == first_epoch_view
1443        && broadcast_epoch != Some(first_epoch)
1444    {
1445        broadcast_epoch = Some(first_epoch);
1446    }
1447    tracing::trace!("Sending ViewChange for view {new_view_number} and epoch {broadcast_epoch:?}");
1448    broadcast_event(
1449        Arc::new(HotShotEvent::ViewChange(new_view_number, broadcast_epoch)),
1450        sender,
1451    )
1452    .await
1453}
1454
1455pub fn derive_signed_state_digest(
1456    lc_state: &LightClientState,
1457    next_stake_state: &StakeTableState,
1458    auth_root: &FixedBytes<32>,
1459) -> CircuitField {
1460    let lc_state_sol: LightClientStateSol = (*lc_state).into();
1461    let stake_st_sol: StakeTableStateSol = (*next_stake_state).into();
1462
1463    let res = alloy::primitives::keccak256(
1464        (
1465            lc_state_sol.abi_encode(),
1466            stake_st_sol.abi_encode(),
1467            auth_root.abi_encode(),
1468        )
1469            .abi_encode_packed(),
1470    );
1471    CircuitField::from_be_bytes_mod_order(res.as_ref())
1472}