Skip to main content

hotshot_task_impls/
helpers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{collections::HashSet, sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, SendError, Sender};
10use committable::{Commitment, Committable};
11use hotshot_task::dependency::{Dependency, EventDependency};
12use hotshot_types::{
13    consensus::OuterConsensus,
14    data::{
15        EpochNumber, Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2,
16        ViewNumber,
17    },
18    drb::DrbResult,
19    epoch_membership::EpochMembershipCoordinator,
20    event::{Event, EventType, LeafInfo},
21    message::{Proposal, UpgradeLock},
22    request_response::ProposalRequestPayload,
23    simple_certificate::{
24        CertificatePair, DaCertificate2, NextEpochQuorumCertificate2, QuorumCertificate2,
25        UpgradeCertificate,
26    },
27    simple_vote::HasEpoch,
28    stake_table::StakeTableEntries,
29    traits::{
30        BlockPayload, ValidatedState,
31        block_contents::BlockHeader,
32        election::Membership,
33        node_implementation::{NodeImplementation, NodeType},
34        signature_key::SignatureKey,
35        storage::Storage,
36    },
37    utils::{
38        Terminator, View, ViewInner, epoch_from_block_number, is_epoch_root, is_epoch_transition,
39        is_transition_block, option_epoch_from_block_number,
40    },
41    vote::{Certificate, HasViewNumber},
42};
43use hotshot_utils::anytrace::*;
44use time::OffsetDateTime;
45use tokio::time::timeout;
46use tracing::instrument;
47use versions::EPOCH_VERSION;
48
49use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
50
51/// Trigger a request to the network for a proposal for a view and wait for the response or timeout.
52#[instrument(skip_all)]
53#[allow(clippy::too_many_arguments)]
54pub(crate) async fn fetch_proposal<TYPES: NodeType>(
55    qc: &QuorumCertificate2<TYPES>,
56    event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
57    event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
58    membership_coordinator: EpochMembershipCoordinator<TYPES>,
59    consensus: OuterConsensus<TYPES>,
60    sender_public_key: TYPES::SignatureKey,
61    sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
62    upgrade_lock: &UpgradeLock<TYPES>,
63    epoch_height: u64,
64) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
65    let view_number = qc.view_number();
66    let leaf_commit = qc.data.leaf_commit;
67    // We need to be able to sign this request before submitting it to the network. Compute the
68    // payload first.
69    let signed_proposal_request = ProposalRequestPayload {
70        view_number,
71        key: sender_public_key,
72    };
73
74    // Finally, compute the signature for the payload.
75    let signature = TYPES::SignatureKey::sign(
76        &sender_private_key,
77        signed_proposal_request.commit().as_ref(),
78    )
79    .wrap()
80    .context(error!("Failed to sign proposal. This should never happen."))?;
81
82    tracing::info!("Sending proposal request for view {view_number}");
83
84    // First, broadcast that we need a proposal to the current leader
85    broadcast_event(
86        HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
87        &event_sender,
88    )
89    .await;
90
91    let mut rx = event_receiver.clone();
92    // Make a background task to await the arrival of the event data.
93    let Ok(Some(proposal)) =
94        // We want to explicitly timeout here so we aren't waiting around for the data.
95        timeout(REQUEST_TIMEOUT, async move {
96            // We want to iterate until the proposal is not None, or until we reach the timeout.
97            while let Ok(event) = rx.recv_direct().await {
98                if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
99                    let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
100                    if leaf.view_number() == view_number && leaf.commit() == leaf_commit {
101                        return Some(quorum_proposal.clone());
102                    }
103                }
104            }
105            None
106        })
107        .await
108    else {
109        bail!("Request for proposal failed");
110    };
111
112    let view_number = proposal.data.view_number();
113    let justify_qc = proposal.data.justify_qc().clone();
114
115    let justify_qc_epoch = justify_qc.data.epoch();
116
117    let epoch_membership = membership_coordinator.stake_table_for_epoch(justify_qc_epoch)?;
118    let membership_stake_table = StakeTableEntries::from_iter(epoch_membership.stake_table()).0;
119    let membership_success_threshold = epoch_membership.success_threshold();
120
121    justify_qc
122        .is_valid_cert(
123            &membership_stake_table,
124            membership_success_threshold,
125            upgrade_lock,
126        )
127        .context(|e| warn!("Invalid justify_qc in proposal for view {view_number}: {e}"))?;
128
129    let mut consensus_writer = consensus.write().await;
130    let leaf = Leaf2::from_quorum_proposal(&proposal.data);
131    let state = Arc::new(
132        <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
133    );
134
135    if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
136        tracing::trace!("{e:?}");
137    }
138    let view = View {
139        view_inner: ViewInner::Leaf {
140            leaf: leaf.commit(),
141            state,
142            delta: None,
143            epoch: leaf.epoch(epoch_height),
144        },
145    };
146    Ok((leaf, view))
147}
148pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
149    membership: &TYPES::Membership,
150    epoch: EpochNumber,
151    storage: &I::Storage,
152    drb_result: DrbResult,
153) {
154    tracing::debug!("Calling store_drb_result for epoch {epoch}");
155    if let Err(e) = storage.store_drb_result(epoch, drb_result).await {
156        tracing::error!("Failed to store drb result for epoch {epoch}: {e}");
157    }
158
159    membership.add_drb_result(epoch, drb_result)
160}
161
162/// Verify the DRB result from the proposal for the next epoch if this is the last block of the
163/// current epoch.
164///
165/// Uses the result from `start_drb_task`.
166///
167/// Returns an error if we should not vote.
168pub(crate) async fn verify_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
169    proposal: &QuorumProposalWrapper<TYPES>,
170    validation_info: &ValidationInfo<TYPES, I>,
171) -> Result<()> {
172    // Skip if this is not the expected block.
173    if validation_info.epoch_height == 0
174        || !is_epoch_transition(
175            proposal.block_header().block_number(),
176            validation_info.epoch_height,
177        )
178    {
179        tracing::debug!("Skipping DRB result verification");
180        return Ok(());
181    }
182
183    // #3967 REVIEW NOTE: Check if this is the right way to decide if we're doing epochs
184    // Alternatively, should we just return Err() if epochs aren't happening here? Or can we assume
185    // that epochs are definitely happening by virtue of getting here?
186    let epoch = option_epoch_from_block_number(
187        validation_info
188            .upgrade_lock
189            .epochs_enabled(proposal.view_number()),
190        proposal.block_header().block_number(),
191        validation_info.epoch_height,
192    );
193
194    let proposal_result = proposal
195        .next_drb_result()
196        .context(info!("Proposal is missing the next epoch's DRB result."))?;
197
198    if let Some(epoch_val) = epoch {
199        let current_epoch_membership = validation_info
200            .membership
201            .coordinator
202            .stake_table_for_epoch(epoch)
203            .context(warn!("No stake table for epoch {}", epoch_val))?;
204
205        let has_stake_current_epoch =
206            current_epoch_membership.has_stake(&validation_info.public_key);
207
208        if has_stake_current_epoch {
209            let computed_result = current_epoch_membership
210                .next_epoch()
211                .context(warn!("No stake table for epoch {}", epoch_val + 1))?
212                .get_epoch_drb()
213                .await
214                .clone()
215                .context(warn!("DRB result not found"))?;
216
217            ensure!(
218                proposal_result == computed_result,
219                warn!(
220                    "Our calculated DRB result is {computed_result:?}, which does not match the \
221                     proposed DRB result of {proposal_result:?}"
222                )
223            );
224        }
225
226        Ok(())
227    } else {
228        Err(error!("Epochs are not available"))
229    }
230}
231
232/// Handles calling add_epoch_root and sync_l1 on Membership if necessary.
233async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
234    decided_leaf: &Leaf2<TYPES>,
235    epoch_height: u64,
236    membership: &EpochMembershipCoordinator<TYPES>,
237    storage: &I::Storage,
238    consensus: &OuterConsensus<TYPES>,
239) {
240    let decided_leaf = decided_leaf.clone();
241    let decided_block_number = decided_leaf.block_header().block_number();
242
243    // Skip if this is not the expected block.
244    if epoch_height != 0 && is_epoch_root(decided_block_number, epoch_height) {
245        let next_epoch_number =
246            EpochNumber::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
247
248        let start = Instant::now();
249        if let Err(e) = storage
250            .store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
251            .await
252        {
253            tracing::error!("Failed to store epoch root for epoch {next_epoch_number}: {e}");
254        }
255        tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());
256
257        let membership = membership.clone();
258        let decided_block_header = decided_leaf.block_header().clone();
259        let storage = storage.clone();
260        let consensus = consensus.clone();
261
262        let consensus_reader = consensus.read().await;
263
264        drop(consensus_reader);
265
266        tokio::spawn(async move {
267            let membership_clone = membership.clone();
268
269            // First add the epoch root to `membership`
270            {
271                let start = Instant::now();
272                if let Err(e) = membership_clone
273                    .membership()
274                    .add_epoch_root(decided_block_header)
275                    .await
276                {
277                    tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
278                }
279                tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
280            }
281
282            let membership_clone = membership.clone();
283
284            let drb_result = membership_clone
285                .compute_drb_result(next_epoch_number, decided_leaf.clone())
286                .await;
287
288            let drb_result = match drb_result {
289                Ok(result) => result,
290                Err(e) => {
291                    tracing::error!("Failed to compute DRB result from decide: {e}");
292                    return;
293                },
294            };
295
296            let start = Instant::now();
297            handle_drb_result::<TYPES, I>(
298                membership.membership(),
299                next_epoch_number,
300                &storage,
301                drb_result,
302            )
303            .await;
304            tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
305        });
306    }
307}
308
309/// Helper type to give names and to the output values of the leaf chain traversal operation.
310#[derive(Debug)]
311pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
312    /// The new locked view obtained from a 2 chain starting from the proposal's parent.
313    pub new_locked_view_number: Option<ViewNumber>,
314
315    /// The new decided view obtained from a 3 chain starting from the proposal's parent.
316    pub new_decided_view_number: Option<ViewNumber>,
317
318    /// The QC signing the latest new leaf, causing it to become committed.
319    ///
320    /// Only present if a new leaf chain has been decided.
321    pub committing_qc: Option<CertificatePair<TYPES>>,
322
323    /// A second QC extending the committing QC, causing the new leaf chain to become decided.
324    ///
325    /// This is only applicable in HotStuff2, and will be [`None`] prior to HotShot version 0.3.
326    /// HotStuff1 (HotShot < 0.3) uses a different commit rule, which is not captured in this type.
327    pub deciding_qc: Option<CertificatePair<TYPES>>,
328
329    /// The decided leaves with corresponding validated state and VID info.
330    pub leaf_views: Vec<LeafInfo<TYPES>>,
331
332    /// The transactions in the block payload for each leaf.
333    pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
334
335    /// The most recent upgrade certificate from one of the leaves.
336    pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
337}
338
339/// We need Default to be implemented because the leaf ascension has very few failure branches,
340/// and when they *do* happen, we still return intermediate states. Default makes the burden
341/// of filling values easier.
342impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
343    /// The default method for this type is to set all of the returned values to `None`.
344    fn default() -> Self {
345        Self {
346            new_locked_view_number: None,
347            new_decided_view_number: None,
348            committing_qc: None,
349            deciding_qc: None,
350            leaf_views: Vec::new(),
351            included_txns: None,
352            decided_upgrade_cert: None,
353        }
354    }
355}
356
357async fn update_metrics<TYPES: NodeType>(
358    consensus: &OuterConsensus<TYPES>,
359    leaf_views: &[LeafInfo<TYPES>],
360) {
361    let consensus_reader = consensus.read().await;
362    let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
363
364    for leaf_view in leaf_views {
365        let proposal_timestamp = leaf_view.leaf.block_header().timestamp();
366
367        let Some(proposal_to_decide_time) = now.checked_sub(proposal_timestamp) else {
368            tracing::error!("Failed to calculate proposal to decide time: {proposal_timestamp}");
369            continue;
370        };
371        consensus_reader
372            .metrics
373            .proposal_to_decide_time
374            .add_point(proposal_to_decide_time as f64);
375        if let Some(txn_bytes) = leaf_view.leaf.block_payload().map(|p| p.txn_bytes()) {
376            consensus_reader
377                .metrics
378                .finalized_bytes
379                .add_point(txn_bytes as f64);
380        }
381    }
382}
383
384/// calculate the new decided leaf chain based on the rules of HotStuff 2
385///
386/// # Panics
387/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
388/// impossible.
389#[allow(clippy::too_many_arguments)]
390pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>>(
391    proposal: &QuorumProposalWrapper<TYPES>,
392    consensus: OuterConsensus<TYPES>,
393    upgrade_lock: &UpgradeLock<TYPES>,
394    public_key: &TYPES::SignatureKey,
395    with_epochs: bool,
396    membership: &EpochMembershipCoordinator<TYPES>,
397    storage: &I::Storage,
398) -> LeafChainTraversalOutcome<TYPES> {
399    let mut res = LeafChainTraversalOutcome::default();
400    let consensus_reader = consensus.read().await;
401    let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
402    res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
403
404    // If we don't have the proposals parent return early
405    let Some(parent_info) = consensus_reader
406        .parent_leaf_info(&proposed_leaf, public_key)
407        .await
408    else {
409        return res;
410    };
411    // Get the parents parent and check if it's consecutive in view to the parent, if so we can decided
412    // the grandparents view.  If not we're done.
413    let Some(grand_parent_info) = consensus_reader
414        .parent_leaf_info(&parent_info.leaf, public_key)
415        .await
416    else {
417        return res;
418    };
419    if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
420        return res;
421    }
422    res.committing_qc = Some(CertificatePair::for_parent(&parent_info.leaf));
423    res.deciding_qc = Some(CertificatePair::for_parent(&proposed_leaf));
424    let decided_view_number = grand_parent_info.leaf.view_number();
425    res.new_decided_view_number = Some(decided_view_number);
426    // We've reached decide, now get the leaf chain all the way back to the last decided view, not including it.
427    let old_anchor_view = consensus_reader.last_decided_view();
428    let mut current_leaf_info = Some(grand_parent_info);
429    let existing_upgrade_cert_reader = upgrade_lock.decided_upgrade_cert();
430    let mut txns = HashSet::new();
431    while current_leaf_info
432        .as_ref()
433        .is_some_and(|info| info.leaf.view_number() > old_anchor_view)
434    {
435        // unwrap is safe, we just checked that he option is some
436        let info = &mut current_leaf_info.unwrap();
437        // Check if there's a new upgrade certificate available.
438        if let Some(cert) = info.leaf.upgrade_certificate()
439            && info.leaf.upgrade_certificate() != existing_upgrade_cert_reader
440        {
441            if cert.data.decide_by < decided_view_number {
442                tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
443            } else {
444                tracing::info!("Reached decide on upgrade certificate: {cert:?}");
445                res.decided_upgrade_cert = Some(cert.clone());
446            }
447        }
448
449        // If the block payload is available for this leaf, include it in
450        // the leaf chain that we send to the client.
451        if let Some(payload) = consensus_reader
452            .saved_payloads()
453            .get(&info.leaf.view_number())
454        {
455            info.leaf
456                .fill_block_payload_unchecked(payload.as_ref().payload.clone());
457        }
458
459        if let Some(ref payload) = info.leaf.block_payload() {
460            for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
461                txns.insert(txn);
462            }
463        }
464
465        current_leaf_info = consensus_reader
466            .parent_leaf_info(&info.leaf, public_key)
467            .await;
468        res.leaf_views.push(info.clone());
469    }
470
471    if !txns.is_empty() {
472        res.included_txns = Some(txns);
473    }
474
475    if with_epochs && res.new_decided_view_number.is_some() {
476        let Some(first_leaf) = res.leaf_views.first() else {
477            return res;
478        };
479        let epoch_height = consensus_reader.epoch_height;
480        consensus_reader
481            .metrics
482            .last_synced_block_height
483            .set(usize::try_from(first_leaf.leaf.height()).unwrap_or(0));
484        drop(consensus_reader);
485
486        for decided_leaf_info in &res.leaf_views {
487            decide_epoch_root::<TYPES, I>(
488                &decided_leaf_info.leaf,
489                epoch_height,
490                membership,
491                storage,
492                &consensus,
493            )
494            .await;
495        }
496        update_metrics(&consensus, &res.leaf_views).await;
497    }
498
499    res
500}
501
502/// Ascends the leaf chain by traversing through the parent commitments of the proposal. We begin
503/// by obtaining the parent view, and if we are in a chain (i.e. the next view from the parent is
504/// one view newer), then we begin attempting to form the chain. This is a direct impl from
505/// [HotStuff](https://arxiv.org/pdf/1803.05069) section 5:
506///
507/// > When a node b* carries a QC that refers to a direct parent, i.e., b*.justify.node = b*.parent,
508/// > we say that it forms a One-Chain. Denote by b'' = b*.justify.node. Node b* forms a Two-Chain,
509/// > if in addition to forming a One-Chain, b''.justify.node = b''.parent.
510/// > It forms a Three-Chain, if b'' forms a Two-Chain.
511///
512/// We follow this exact logic to determine if we are able to reach a commit and a decide. A commit
513/// is reached when we have a two chain, and a decide is reached when we have a three chain.
514///
515/// # Example
516/// Suppose we have a decide for view 1, and we then move on to get undecided views 2, 3, and 4. Further,
517/// suppose that our *next* proposal is for view 5, but this leader did not see info for view 4, so the
518/// justify qc of the proposal points to view 3. This is fine, and the undecided chain now becomes
519/// 2-3-5.
520///
521/// Assuming we continue with honest leaders, we then eventually could get a chain like: 2-3-5-6-7-8. This
522/// will prompt a decide event to occur (this code), where the `proposal` is for view 8. Now, since the
523/// lowest value in the 3-chain here would be 5 (excluding 8 since we only walk the parents), we begin at
524/// the first link in the chain, and walk back through all undecided views, making our new anchor view 5,
525/// and out new locked view will be 6.
526///
527/// Upon receipt then of a proposal for view 9, assuming it is valid, this entire process will repeat, and
528/// the anchor view will be set to view 6, with the locked view as view 7.
529///
530/// # Panics
531/// If the leaf chain contains no decided leaf while reaching a decided view, which should be
532/// impossible.
533#[allow(clippy::too_many_arguments)]
534pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>>(
535    proposal: &QuorumProposalWrapper<TYPES>,
536    consensus: OuterConsensus<TYPES>,
537    upgrade_lock: &UpgradeLock<TYPES>,
538    public_key: &TYPES::SignatureKey,
539    with_epochs: bool,
540    membership: &EpochMembershipCoordinator<TYPES>,
541    storage: &I::Storage,
542    epoch_height: u64,
543) -> LeafChainTraversalOutcome<TYPES> {
544    let consensus_reader = consensus.read().await;
545    let existing_upgrade_cert_reader = upgrade_lock.decided_upgrade_cert();
546    let view_number = proposal.view_number();
547    let parent_view_number = proposal.justify_qc().view_number();
548    let old_anchor_view = consensus_reader.last_decided_view();
549
550    let mut last_view_number_visited = view_number;
551    let mut current_chain_length = 0usize;
552    let mut res = LeafChainTraversalOutcome::default();
553
554    if let Err(e) = consensus_reader.visit_leaf_ancestors(
555        parent_view_number,
556        Terminator::Exclusive(old_anchor_view),
557        true,
558        |leaf, state, delta| {
559            // This is the core paper logic. We're implementing the chain in chained hotstuff.
560            if res.new_decided_view_number.is_none() {
561                // If the last view number is the child of the leaf we've moved to...
562                if last_view_number_visited == leaf.view_number() + 1 {
563                    last_view_number_visited = leaf.view_number();
564
565                    // The chain grows by one
566                    current_chain_length += 1;
567
568                    // We emit a locked view when the chain length is 2
569                    if current_chain_length == 2 {
570                        res.new_locked_view_number = Some(leaf.view_number());
571                        // The next leaf in the chain, if there is one, is decided, so this
572                        // leaf's justify_qc would become the QC for the decided chain.
573                        res.committing_qc = Some(CertificatePair::for_parent(leaf));
574                    } else if current_chain_length == 3 {
575                        // And we decide when the chain length is 3.
576                        res.new_decided_view_number = Some(leaf.view_number());
577                    }
578                } else {
579                    // There isn't a new chain extension available, so we signal to the callback
580                    // owner that we can exit for now.
581                    return false;
582                }
583            }
584
585            // Now, if we *have* reached a decide, we need to do some state updates.
586            if let Some(new_decided_view) = res.new_decided_view_number {
587                // First, get a mutable reference to the provided leaf.
588                let mut leaf = leaf.clone();
589
590                // Update the metrics
591                if leaf.view_number() == new_decided_view {
592                    consensus_reader
593                        .metrics
594                        .last_synced_block_height
595                        .set(usize::try_from(leaf.height()).unwrap_or(0));
596                }
597
598                // Check if there's a new upgrade certificate available.
599                if let Some(cert) = leaf.upgrade_certificate()
600                    && leaf.upgrade_certificate() != existing_upgrade_cert_reader
601                {
602                    if cert.data.decide_by < view_number {
603                        tracing::warn!(
604                            "Failed to decide an upgrade certificate in time. Ignoring."
605                        );
606                    } else {
607                        tracing::info!("Reached decide on upgrade certificate: {cert:?}");
608                        res.decided_upgrade_cert = Some(cert.clone());
609                    }
610                }
611                // If the block payload is available for this leaf, include it in
612                // the leaf chain that we send to the client.
613                if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
614                    leaf.fill_block_payload_unchecked(payload.as_ref().payload.clone());
615                }
616
617                // Get the VID share at the leaf's view number, corresponding to our key
618                // (if one exists)
619                let vid_share = consensus_reader
620                    .vid_shares()
621                    .get(&leaf.view_number())
622                    .and_then(|key_map| key_map.get(public_key))
623                    .and_then(|epoch_map| epoch_map.get(&leaf.epoch(epoch_height)))
624                    .map(|prop| prop.data.clone());
625
626                let state_cert = if leaf.with_epoch
627                    && is_epoch_root(
628                        leaf.block_header().block_number(),
629                        consensus_reader.epoch_height,
630                    ) {
631                    match consensus_reader.state_cert() {
632                        // Sanity check that the state cert is for the same view as the decided leaf
633                        Some(state_cert)
634                            if state_cert.light_client_state.view_number
635                                == leaf.view_number().u64() =>
636                        {
637                            Some(state_cert.clone())
638                        },
639                        _ => None,
640                    }
641                } else {
642                    None
643                };
644
645                // Add our data into a new `LeafInfo`
646                res.leaf_views.push(LeafInfo::new(
647                    leaf.clone(),
648                    Arc::clone(&state),
649                    delta.clone(),
650                    vid_share,
651                    state_cert,
652                ));
653                if let Some(ref payload) = leaf.block_payload() {
654                    res.included_txns = Some(
655                        payload
656                            .transaction_commitments(leaf.block_header().metadata())
657                            .into_iter()
658                            .collect::<HashSet<_>>(),
659                    );
660                }
661            }
662            true
663        },
664    ) {
665        tracing::debug!("Leaf ascension failed; error={e}");
666    }
667
668    let epoch_height = consensus_reader.epoch_height;
669    drop(consensus_reader);
670
671    if with_epochs && res.new_decided_view_number.is_some() {
672        for decided_leaf_info in &res.leaf_views {
673            decide_epoch_root::<TYPES, I>(
674                &decided_leaf_info.leaf,
675                epoch_height,
676                membership,
677                storage,
678                &consensus,
679            )
680            .await;
681        }
682    }
683
684    res
685}
686
687/// Gets the parent leaf and state from the parent of a proposal, returning an [`utils::anytrace::Error`] if not.
688#[instrument(skip_all)]
689#[allow(clippy::too_many_arguments)]
690pub(crate) async fn parent_leaf_and_state<TYPES: NodeType>(
691    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
692    event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
693    membership: EpochMembershipCoordinator<TYPES>,
694    public_key: TYPES::SignatureKey,
695    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
696    consensus: OuterConsensus<TYPES>,
697    upgrade_lock: &UpgradeLock<TYPES>,
698    parent_qc: &QuorumCertificate2<TYPES>,
699    epoch_height: u64,
700) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
701    let consensus_reader = consensus.read().await;
702    let vsm_contains_parent_view = consensus_reader
703        .validated_state_map()
704        .contains_key(&parent_qc.view_number());
705    drop(consensus_reader);
706
707    if !vsm_contains_parent_view {
708        let _ = fetch_proposal(
709            parent_qc,
710            event_sender.clone(),
711            event_receiver.clone(),
712            membership,
713            consensus.clone(),
714            public_key.clone(),
715            private_key.clone(),
716            upgrade_lock,
717            epoch_height,
718        )
719        .await
720        .context(info!("Failed to fetch proposal"))?;
721    }
722
723    let consensus_reader = consensus.read().await;
724    let parent_view = consensus_reader
725        .validated_state_map()
726        .get(&parent_qc.view_number())
727        .context(debug!(
728            "Couldn't find parent view in state map, waiting for replica to see proposal; \
729             parent_view_number: {}",
730            *parent_qc.view_number()
731        ))?;
732
733    let (leaf_commitment, state) = parent_view.leaf_and_state().context(info!(
734        "Parent of high QC points to a view without a proposal; parent_view_number: {}, \
735         parent_view {:?}",
736        *parent_qc.view_number(),
737        parent_view
738    ))?;
739
740    if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
741        // NOTE: This happens on the genesis block
742        tracing::debug!(
743            "They don't equal: {:?}   {:?}",
744            leaf_commitment,
745            consensus_reader.high_qc().data().leaf_commit
746        );
747    }
748
749    let leaf = consensus_reader
750        .saved_leaves()
751        .get(&leaf_commitment)
752        .context(info!("Failed to find high QC of parent"))?;
753
754    Ok((leaf.clone(), Arc::clone(state)))
755}
756
757pub(crate) async fn update_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
758    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
759    validation_info: &ValidationInfo<TYPES, I>,
760) -> Result<()> {
761    let in_transition_epoch = proposal
762        .data
763        .justify_qc()
764        .data
765        .block_number
766        .is_some_and(|bn| {
767            !is_transition_block(bn, validation_info.epoch_height)
768                && is_epoch_transition(bn, validation_info.epoch_height)
769                && bn % validation_info.epoch_height != 0
770        });
771    let justify_qc = proposal.data.justify_qc();
772    let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc();
773    if !in_transition_epoch {
774        tracing::debug!(
775            "Storing high QC for view {:?} and height {:?}",
776            justify_qc.view_number(),
777            justify_qc.data.block_number
778        );
779        if let Err(e) = validation_info
780            .storage
781            .update_high_qc2(justify_qc.clone())
782            .await
783        {
784            bail!("Failed to store High QC, not voting; error = {e:?}");
785        }
786        if justify_qc
787            .data
788            .block_number
789            .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
790        {
791            let Some(state_cert) = proposal.data.state_cert() else {
792                bail!("Epoch root QC has no state cert, not voting!");
793            };
794            if let Err(e) = validation_info
795                .storage
796                .update_state_cert(state_cert.clone())
797                .await
798            {
799                bail!(
800                    "Failed to store the light client state update certificate, not voting; error \
801                     = {:?}",
802                    e
803                );
804            }
805            validation_info
806                .consensus
807                .write()
808                .await
809                .update_state_cert(state_cert.clone())?;
810        }
811        if let Some(next_epoch_justify_qc) = maybe_next_epoch_justify_qc
812            && let Err(e) = validation_info
813                .storage
814                .update_next_epoch_high_qc2(next_epoch_justify_qc.clone())
815                .await
816        {
817            bail!("Failed to store next epoch High QC, not voting; error = {e:?}");
818        }
819    }
820    let mut consensus_writer = validation_info.consensus.write().await;
821    if let Some(next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
822        if justify_qc
823            .data
824            .block_number
825            .is_some_and(|bn| is_transition_block(bn, validation_info.epoch_height))
826        {
827            consensus_writer.reset_high_qc(justify_qc.clone(), next_epoch_justify_qc.clone())?;
828            consensus_writer
829                .update_transition_qc(justify_qc.clone(), next_epoch_justify_qc.clone());
830            return Ok(());
831        }
832        consensus_writer.update_next_epoch_high_qc(next_epoch_justify_qc.clone())?;
833    }
834    consensus_writer.update_high_qc(justify_qc.clone())?;
835
836    Ok(())
837}
838
839async fn transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
840    validation_info: &ValidationInfo<TYPES, I>,
841) -> Option<(
842    QuorumCertificate2<TYPES>,
843    NextEpochQuorumCertificate2<TYPES>,
844)> {
845    validation_info
846        .consensus
847        .read()
848        .await
849        .transition_qc()
850        .cloned()
851}
852
853pub(crate) async fn validate_epoch_transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
854    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
855    validation_info: &ValidationInfo<TYPES, I>,
856) -> Result<()> {
857    let proposed_qc = proposal.data.justify_qc();
858    let Some(qc_block_number) = proposed_qc.data().block_number else {
859        bail!("Justify QC has no block number");
860    };
861    if !is_epoch_transition(qc_block_number, validation_info.epoch_height)
862        || qc_block_number % validation_info.epoch_height == 0
863    {
864        return Ok(());
865    }
866    let Some(next_epoch_qc) = proposal.data.next_epoch_justify_qc() else {
867        bail!("Next epoch justify QC is not present");
868    };
869    ensure!(
870        next_epoch_qc.data.leaf_commit == proposed_qc.data().leaf_commit,
871        "Next epoch QC has different leaf commit to justify QC"
872    );
873
874    if is_transition_block(qc_block_number, validation_info.epoch_height) {
875        // Height is epoch height - 2
876        ensure!(
877            transition_qc(validation_info)
878                .await
879                .is_none_or(|(qc, _)| qc.view_number() <= proposed_qc.view_number()),
880            "Proposed transition qc must have view number greater than or equal to previous \
881             transition QC"
882        );
883
884        validation_info
885            .consensus
886            .write()
887            .await
888            .update_transition_qc(proposed_qc.clone(), next_epoch_qc.clone());
889        // reset the high qc to the transition qc
890        update_high_qc(proposal, validation_info).await?;
891    } else {
892        // Height is either epoch height - 1 or epoch height
893        ensure!(
894            transition_qc(validation_info)
895                .await
896                .is_none_or(|(qc, _)| qc.view_number() < proposed_qc.view_number()),
897            "Transition block must have view number greater than previous transition QC"
898        );
899        ensure!(
900            proposal.data.view_change_evidence().is_none(),
901            "Second to last block and last block of epoch must directly extend previous block, Qc \
902             Block number: {qc_block_number}, Proposal Block number: {}",
903            proposal.data.block_header().block_number()
904        );
905        ensure!(
906            proposed_qc.view_number() + 1 == proposal.data.view_number()
907                || transition_qc(validation_info)
908                    .await
909                    .is_some_and(|(qc, _)| &qc == proposed_qc),
910            "Transition proposals must extend the previous view directly, or extend the previous \
911             transition block"
912        );
913    }
914    Ok(())
915}
916
917/// Validate the state and safety and liveness of a proposal then emit
918/// a `QuorumProposalValidated` event.
919///
920///
921/// # Errors
922/// If any validation or state update fails.
923#[allow(clippy::too_many_lines)]
924#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
925pub(crate) async fn validate_proposal_safety_and_liveness<
926    TYPES: NodeType,
927    I: NodeImplementation<TYPES>,
928>(
929    proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
930    parent_leaf: Leaf2<TYPES>,
931    validation_info: &ValidationInfo<TYPES, I>,
932    event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
933    sender: TYPES::SignatureKey,
934) -> Result<()> {
935    let view_number = proposal.data.view_number();
936
937    let mut valid_epoch_transition = false;
938    if validation_info
939        .upgrade_lock
940        .version(proposal.data.justify_qc().view_number())
941        .is_ok_and(|v| v >= EPOCH_VERSION)
942    {
943        let Some(block_number) = proposal.data.justify_qc().data.block_number else {
944            bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
945        };
946        if is_epoch_transition(block_number, validation_info.epoch_height) {
947            validate_epoch_transition_qc(&proposal, validation_info).await?;
948            valid_epoch_transition = true;
949        }
950    }
951
952    let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
953    ensure!(
954        proposed_leaf.parent_commitment() == parent_leaf.commit(),
955        "Proposed leaf does not extend the parent leaf."
956    );
957    let proposal_epoch = option_epoch_from_block_number(
958        validation_info.upgrade_lock.epochs_enabled(view_number),
959        proposed_leaf.height(),
960        validation_info.epoch_height,
961    );
962
963    let state = Arc::new(
964        <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
965    );
966
967    {
968        let mut consensus_writer = validation_info.consensus.write().await;
969        if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
970            tracing::trace!("{e:?}");
971        }
972
973        // Update our internal storage of the proposal. The proposal is valid, so
974        // we swallow this error and just log if it occurs.
975        if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
976            tracing::debug!("Internal proposal update failed; error = {e:#}");
977        };
978    }
979
980    UpgradeCertificate::validate(
981        proposal.data.upgrade_certificate(),
982        &validation_info.membership,
983        proposal_epoch,
984        &validation_info.upgrade_lock,
985    )
986    .await?;
987
988    // Validate that the upgrade certificate is re-attached, if we saw one on the parent
989    proposed_leaf.extends_upgrade(&parent_leaf, &validation_info.upgrade_lock)?;
990
991    let justify_qc = proposal.data.justify_qc().clone();
992    // Create a positive vote if either liveness or safety check
993    // passes.
994
995    {
996        let consensus_reader = validation_info.consensus.read().await;
997        // Epoch safety check:
998        // The proposal is safe if
999        // 1. the proposed block and the justify QC block belong to the same epoch or
1000        // 2. the justify QC is the eQC for the previous block
1001        let justify_qc_epoch = option_epoch_from_block_number(
1002            validation_info.upgrade_lock.epochs_enabled(view_number),
1003            parent_leaf.height(),
1004            validation_info.epoch_height,
1005        );
1006        ensure!(
1007            proposal_epoch == justify_qc_epoch
1008                || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
1009            {
1010                error!(
1011                    "Failed epoch safety check \n Proposed leaf is {proposed_leaf:?} \n justify \
1012                     QC leaf is {parent_leaf:?}"
1013                )
1014            }
1015        );
1016
1017        // Make sure that the epoch transition proposal includes the next epoch QC
1018        if is_epoch_transition(parent_leaf.height(), validation_info.epoch_height)
1019            && validation_info.upgrade_lock.epochs_enabled(view_number)
1020        {
1021            ensure!(
1022                proposal.data.next_epoch_justify_qc().is_some(),
1023                "Epoch transition proposal does not include the next epoch justify QC. Do not \
1024                 vote!"
1025            );
1026        }
1027
1028        // Liveness check.
1029        let liveness_check =
1030            justify_qc.view_number() > consensus_reader.locked_view() || valid_epoch_transition;
1031
1032        // Safety check.
1033        // Check if proposal extends from the locked leaf.
1034        let outcome = consensus_reader.visit_leaf_ancestors(
1035            justify_qc.view_number(),
1036            Terminator::Inclusive(consensus_reader.locked_view()),
1037            false,
1038            |leaf, _, _| {
1039                // if leaf view no == locked view no then we're done, report success by
1040                // returning true
1041                leaf.view_number() != consensus_reader.locked_view()
1042            },
1043        );
1044        let safety_check = outcome.is_ok();
1045
1046        ensure!(safety_check || liveness_check, {
1047            if let Err(e) = outcome {
1048                broadcast_event(
1049                    Event {
1050                        view_number,
1051                        event: EventType::Error { error: Arc::new(e) },
1052                    },
1053                    &validation_info.output_event_stream,
1054                )
1055                .await;
1056            }
1057
1058            error!(
1059                "Failed safety and liveness check \n High QC is {:?}  Proposal QC is {:?}  Locked \
1060                 view is {:?}",
1061                consensus_reader.high_qc(),
1062                proposal.data,
1063                consensus_reader.locked_view()
1064            )
1065        });
1066    }
1067
1068    // We accept the proposal, notify the application layer
1069    broadcast_event(
1070        Event {
1071            view_number,
1072            event: EventType::QuorumProposal {
1073                proposal: proposal.clone(),
1074                sender,
1075            },
1076        },
1077        &validation_info.output_event_stream,
1078    )
1079    .await;
1080
1081    // Notify other tasks
1082    broadcast_event(
1083        Arc::new(HotShotEvent::QuorumProposalValidated(
1084            proposal.clone(),
1085            parent_leaf,
1086        )),
1087        &event_stream,
1088    )
1089    .await;
1090
1091    Ok(())
1092}
1093
1094/// Validates, from a given `proposal` that the view that it is being submitted for is valid when
1095/// compared to `cur_view` which is the highest proposed view (so far) for the caller. If the proposal
1096/// is for a view that's later than expected, that the proposal includes a timeout or view sync certificate.
1097///
1098/// # Errors
1099/// If any validation or view number check fails.
1100pub(crate) async fn validate_proposal_view_and_certs<
1101    TYPES: NodeType,
1102    I: NodeImplementation<TYPES>,
1103>(
1104    proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1105    validation_info: &ValidationInfo<TYPES, I>,
1106) -> Result<()> {
1107    let view_number = proposal.data.view_number();
1108    ensure!(
1109        view_number + 1 >= validation_info.consensus.read().await.cur_view(),
1110        "Proposal is from an older view {:?}",
1111        proposal.data
1112    );
1113
1114    // Validate the proposal's signature. This should also catch if the leaf_commitment does not equal our calculated parent commitment
1115    let mut membership = validation_info.membership.clone();
1116    proposal.validate_signature(&membership)?;
1117
1118    // Verify a timeout certificate OR a view sync certificate exists and is valid.
1119    if proposal.data.justify_qc().view_number() != view_number - 1 {
1120        let received_proposal_cert =
1121            proposal
1122                .data
1123                .view_change_evidence()
1124                .clone()
1125                .context(debug!(
1126                    "Quorum proposal for view {view_number} needed a timeout or view sync \
1127                     certificate, but did not have one",
1128                ))?;
1129
1130        match received_proposal_cert {
1131            ViewChangeEvidence2::Timeout(timeout_cert) => {
1132                ensure!(
1133                    timeout_cert.data().view == view_number - 1,
1134                    "Timeout certificate for view {view_number} was not for the immediately \
1135                     preceding view"
1136                );
1137                let timeout_cert_epoch = timeout_cert.data().epoch();
1138                membership = membership.get_new_epoch(timeout_cert_epoch)?;
1139
1140                let membership_stake_table =
1141                    StakeTableEntries::from_iter(membership.stake_table()).0;
1142                let membership_success_threshold = membership.success_threshold();
1143
1144                timeout_cert
1145                    .is_valid_cert(
1146                        &membership_stake_table,
1147                        membership_success_threshold,
1148                        &validation_info.upgrade_lock,
1149                    )
1150                    .context(|e| {
1151                        warn!("Timeout certificate for view {view_number} was invalid: {e}")
1152                    })?;
1153            },
1154            ViewChangeEvidence2::ViewSync(view_sync_cert) => {
1155                ensure!(
1156                    view_sync_cert.view_number == view_number,
1157                    "View sync cert view number {:?} does not match proposal view number {:?}",
1158                    view_sync_cert.view_number,
1159                    view_number
1160                );
1161
1162                let view_sync_cert_epoch = view_sync_cert.data().epoch();
1163                membership = membership.get_new_epoch(view_sync_cert_epoch)?;
1164
1165                let membership_stake_table =
1166                    StakeTableEntries::from_iter(membership.stake_table()).0;
1167                let membership_success_threshold = membership.success_threshold();
1168
1169                // View sync certs must also be valid.
1170                view_sync_cert
1171                    .is_valid_cert(
1172                        &membership_stake_table,
1173                        membership_success_threshold,
1174                        &validation_info.upgrade_lock,
1175                    )
1176                    .context(|e| warn!("Invalid view sync finalize cert provided: {e}"))?;
1177            },
1178        }
1179    }
1180
1181    // Validate the upgrade certificate -- this is just a signature validation.
1182    // Note that we don't do anything with the certificate directly if this passes; it eventually gets stored as part of the leaf if nothing goes wrong.
1183    {
1184        let epoch = option_epoch_from_block_number(
1185            proposal.data.epoch().is_some(),
1186            proposal.data.block_header().block_number(),
1187            validation_info.epoch_height,
1188        );
1189        UpgradeCertificate::validate(
1190            proposal.data.upgrade_certificate(),
1191            &validation_info.membership,
1192            epoch,
1193            &validation_info.upgrade_lock,
1194        )
1195        .await?;
1196    }
1197
1198    Ok(())
1199}
1200
1201/// Helper function to send events and log errors
1202pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
1203    match sender.broadcast_direct(event).await {
1204        Ok(None) => (),
1205        Ok(Some(overflowed)) => {
1206            tracing::error!(
1207                "Event sender queue overflow, Oldest event removed form queue: {overflowed:?}"
1208            );
1209        },
1210        Err(SendError(e)) => {
1211            tracing::warn!("Event: {e:?}\n Sending failed, event stream probably shutdown");
1212        },
1213    }
1214}
1215
1216/// Validates qc's signatures and, if provided, validates next_epoch_qc's signatures and whether it
1217/// corresponds to the provided high_qc.
1218pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType>(
1219    qc: &QuorumCertificate2<TYPES>,
1220    maybe_next_epoch_qc: Option<&NextEpochQuorumCertificate2<TYPES>>,
1221    consensus: &OuterConsensus<TYPES>,
1222    membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1223    upgrade_lock: &UpgradeLock<TYPES>,
1224    epoch_height: u64,
1225) -> Result<()> {
1226    let cert = CertificatePair::new(qc.clone(), maybe_next_epoch_qc.cloned());
1227
1228    let mut epoch_membership = membership_coordinator.stake_table_for_epoch(cert.epoch())?;
1229
1230    let membership_stake_table = StakeTableEntries::from_iter(epoch_membership.stake_table()).0;
1231    let membership_success_threshold = epoch_membership.success_threshold();
1232
1233    if let Err(e) = cert.qc().is_valid_cert(
1234        &membership_stake_table,
1235        membership_success_threshold,
1236        upgrade_lock,
1237    ) {
1238        consensus.read().await.metrics.invalid_qc.update(1);
1239        return Err(warn!("Invalid certificate: {e}"));
1240    }
1241
1242    // Check the next epoch QC if required.
1243    if upgrade_lock.epochs_enabled(cert.view_number())
1244        && let Some(next_epoch_qc) = cert.verify_next_epoch_qc(epoch_height)?
1245    {
1246        epoch_membership = epoch_membership.next_epoch_stake_table()?;
1247        let membership_next_stake_table =
1248            StakeTableEntries::from_iter(epoch_membership.stake_table()).0;
1249        let membership_next_success_threshold = epoch_membership.success_threshold();
1250        next_epoch_qc
1251            .is_valid_cert(
1252                &membership_next_stake_table,
1253                membership_next_success_threshold,
1254                upgrade_lock,
1255            )
1256            .context(|e| warn!("Invalid next epoch certificate: {e}"))?;
1257    }
1258
1259    Ok(())
1260}
1261
1262/// Gets the second VID share, the current or the next epoch accordingly, from the shared consensus state;
1263/// makes sure it corresponds to the given DA certificate;
1264/// if it's not yet available, waits for it with the given timeout.
1265pub async fn wait_for_second_vid_share<TYPES: NodeType>(
1266    target_epoch: Option<EpochNumber>,
1267    vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
1268    da_cert: &DaCertificate2<TYPES>,
1269    consensus: &OuterConsensus<TYPES>,
1270    receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
1271    cancel_receiver: Receiver<()>,
1272    id: u64,
1273) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
1274    tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
1275    let maybe_second_vid_share = consensus
1276        .read()
1277        .await
1278        .vid_shares()
1279        .get(&vid_share.data.view_number())
1280        .and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
1281        .and_then(|epoch_map| epoch_map.get(&target_epoch))
1282        .cloned();
1283    if let Some(second_vid_share) = maybe_second_vid_share
1284        && ((target_epoch == da_cert.epoch()
1285            && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
1286            || (target_epoch != da_cert.epoch()
1287                && Some(second_vid_share.data.payload_commitment())
1288                    == da_cert.data().next_epoch_payload_commit))
1289    {
1290        return Ok(second_vid_share);
1291    }
1292
1293    let receiver = receiver.clone();
1294    let da_cert_clone = da_cert.clone();
1295    let Some(event) = EventDependency::new(
1296        receiver,
1297        cancel_receiver,
1298        format!(
1299            "VoteDependency Second VID share for view {:?}, my id {:?}",
1300            vid_share.data.view_number(),
1301            id
1302        ),
1303        Box::new(move |event| {
1304            let event = event.as_ref();
1305            if let HotShotEvent::VidShareValidated(second_vid_share) = event {
1306                if target_epoch == da_cert_clone.epoch() {
1307                    second_vid_share.data.payload_commitment()
1308                        == da_cert_clone.data().payload_commit
1309                } else {
1310                    Some(second_vid_share.data.payload_commitment())
1311                        == da_cert_clone.data().next_epoch_payload_commit
1312                }
1313            } else {
1314                false
1315            }
1316        }),
1317    )
1318    .completed()
1319    .await
1320    else {
1321        return Err(warn!("Error while waiting for the second VID share."));
1322    };
1323    let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
1324        // this shouldn't happen
1325        return Err(warn!(
1326            "Received event is not VidShareValidated but we checked it earlier. Shouldn't be \
1327             possible."
1328        ));
1329    };
1330    Ok(second_vid_share.clone())
1331}
1332
1333pub async fn broadcast_view_change<TYPES: NodeType>(
1334    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
1335    new_view_number: ViewNumber,
1336    epoch: Option<EpochNumber>,
1337    first_epoch: Option<(ViewNumber, EpochNumber)>,
1338) {
1339    let mut broadcast_epoch = epoch;
1340    if let Some((first_epoch_view, first_epoch)) = first_epoch
1341        && new_view_number == first_epoch_view
1342        && broadcast_epoch != Some(first_epoch)
1343    {
1344        broadcast_epoch = Some(first_epoch);
1345    }
1346    tracing::trace!("Sending ViewChange for view {new_view_number} and epoch {broadcast_epoch:?}");
1347    broadcast_event(
1348        Arc::new(HotShotEvent::ViewChange(new_view_number, broadcast_epoch)),
1349        sender,
1350    )
1351    .await
1352}