Skip to main content

hotshot_task_impls/quorum_proposal/
handlers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! This module holds the dependency task for the QuorumProposalTask. It is spawned whenever an event that could
8//! initiate a proposal occurs.
9
10use std::{
11    marker::PhantomData,
12    sync::Arc,
13    time::{Duration, Instant},
14};
15
16use async_broadcast::{Receiver, Sender};
17use committable::{Commitment, Committable};
18use hotshot_contract_adapter::light_client::validate_light_client_state_update_certificate;
19use hotshot_task::dependency_task::HandleDepOutput;
20use hotshot_types::{
21    consensus::{CommitmentAndMetadata, OuterConsensus},
22    data::{
23        Leaf2, QuorumProposal2, QuorumProposalWrapper, VidDisperse, ViewChangeEvidence2, ViewNumber,
24    },
25    epoch_membership::EpochMembership,
26    message::Proposal,
27    simple_certificate::{
28        LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
29        UpgradeCertificate, check_qc_state_cert_correspondence,
30    },
31    traits::{
32        BlockPayload,
33        block_contents::BlockHeader,
34        node_implementation::{NodeImplementation, NodeType},
35        signature_key::SignatureKey,
36        storage::Storage,
37    },
38    utils::{
39        epoch_from_block_number, is_epoch_root, is_epoch_transition, is_last_block,
40        is_transition_block, option_epoch_from_block_number,
41    },
42    vote::HasViewNumber,
43};
44use hotshot_utils::anytrace::*;
45use tracing::instrument;
46use versions::EPOCH_VERSION;
47
48use crate::{
49    events::HotShotEvent,
50    helpers::{broadcast_event, parent_leaf_and_state, validate_qc_and_next_epoch_qc},
51    quorum_proposal::{QuorumProposalTaskState, UpgradeLock},
52};
53
54/// Proposal dependency types. These types represent events that precipitate a proposal.
55#[derive(PartialEq, Debug)]
56pub(crate) enum ProposalDependency {
57    /// For the `SendPayloadCommitmentAndMetadata` event.
58    PayloadAndMetadata,
59
60    /// For the `Qc2Formed`, `ExtendedQc2Formed`, and `EpochRootQcFormed` event.
61    Qc,
62
63    /// For the `ViewSyncFinalizeCertificateRecv` event.
64    ViewSyncCert,
65
66    /// For the `Qc2Formed`, `ExtendedQc2Formed`, and `EpochRootQcFormed` event timeout branch.
67    TimeoutCert,
68
69    /// For the `QuorumProposalRecv` event.
70    Proposal,
71
72    /// For the `VidShareValidated` event.
73    VidShare,
74}
75
76/// Handler for the proposal dependency
77pub struct ProposalDependencyHandle<TYPES: NodeType> {
78    /// Latest view number that has been proposed for (proxy for cur_view).
79    pub latest_proposed_view: ViewNumber,
80
81    /// The view number to propose for.
82    pub view_number: ViewNumber,
83
84    /// The event sender.
85    pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
86
87    /// The event receiver.
88    pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
89
90    /// Immutable instance state
91    pub instance_state: Arc<TYPES::InstanceState>,
92
93    /// Membership for Quorum Certs/votes
94    pub membership: EpochMembership<TYPES>,
95
96    /// Our public key
97    pub public_key: TYPES::SignatureKey,
98
99    /// Our Private Key
100    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
101
102    /// Shared consensus task state
103    pub consensus: OuterConsensus<TYPES>,
104
105    /// View timeout from config.
106    pub timeout: u64,
107
108    /// The most recent upgrade certificate this node formed.
109    /// Note: this is ONLY for certificates that have been formed internally,
110    /// so that we can propose with them.
111    ///
112    /// Certificates received from other nodes will get reattached regardless of this fields,
113    /// since they will be present in the leaf we propose off of.
114    pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
115
116    /// Lock for a decided upgrade
117    pub upgrade_lock: UpgradeLock<TYPES>,
118
119    /// The node's id
120    pub id: u64,
121
122    /// The time this view started
123    pub view_start_time: Instant,
124
125    /// Number of blocks in an epoch, zero means there are no epochs
126    pub epoch_height: u64,
127
128    pub cancel_receiver: Receiver<()>,
129}
130
131impl<TYPES: NodeType> ProposalDependencyHandle<TYPES> {
132    /// Return the next HighQc we get from the event stream
133    async fn wait_for_qc_event(
134        &self,
135        mut rx: Receiver<Arc<HotShotEvent<TYPES>>>,
136    ) -> Option<(
137        QuorumCertificate2<TYPES>,
138        Option<NextEpochQuorumCertificate2<TYPES>>,
139        Option<LightClientStateUpdateCertificateV2<TYPES>>,
140    )> {
141        while let Ok(event) = rx.recv_direct().await {
142            let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
143                HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
144                    (qc, maybe_next_epoch_qc, None)
145                },
146                HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
147                    (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
148                },
149                _ => continue,
150            };
151            if validate_qc_and_next_epoch_qc(
152                qc,
153                maybe_next_epoch_qc.as_ref(),
154                &self.consensus,
155                &self.membership.coordinator,
156                &self.upgrade_lock,
157                self.epoch_height,
158            )
159            .await
160            .is_ok()
161            {
162                if qc
163                    .data
164                    .block_number
165                    .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
166                {
167                    // Validate the state cert
168                    if let Some(state_cert) = &maybe_state_cert {
169                        if validate_light_client_state_update_certificate(
170                            state_cert,
171                            &self.membership.coordinator,
172                            &self.upgrade_lock,
173                        )
174                        .await
175                        .is_err()
176                            || !check_qc_state_cert_correspondence(
177                                qc,
178                                state_cert,
179                                self.epoch_height,
180                            )
181                        {
182                            tracing::error!("Failed to validate state cert");
183                            return None;
184                        }
185                    } else {
186                        tracing::error!(
187                            "Received an epoch root QC but we don't have the corresponding state \
188                             cert."
189                        );
190                        return None;
191                    }
192                } else {
193                    maybe_state_cert = None;
194                }
195                return Some((qc.clone(), maybe_next_epoch_qc.clone(), maybe_state_cert));
196            }
197        }
198        None
199    }
200
201    async fn wait_for_transition_qc(
202        &self,
203    ) -> Result<
204        Option<(
205            QuorumCertificate2<TYPES>,
206            NextEpochQuorumCertificate2<TYPES>,
207        )>,
208    > {
209        ensure!(
210            self.upgrade_lock.epochs_enabled(self.view_number),
211            error!("Epochs are not enabled yet we tried to wait for Highest QC.")
212        );
213
214        let mut transition_qc = self.consensus.read().await.transition_qc().cloned();
215
216        let wait_duration = Duration::from_millis(self.timeout / 2);
217
218        let mut rx = self.receiver.clone();
219
220        // drain any qc off the queue
221        // We don't watch for EpochRootQcRecv events here because it's not in transition.
222        while let Ok(event) = rx.try_recv() {
223            if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
224                if let Some(block_number) = qc.data.block_number {
225                    if !is_transition_block(block_number, self.epoch_height) {
226                        continue;
227                    }
228                } else {
229                    continue;
230                }
231                let Some(next_epoch_qc) = maybe_next_epoch_qc else {
232                    continue;
233                };
234                if validate_qc_and_next_epoch_qc(
235                    qc,
236                    Some(next_epoch_qc),
237                    &self.consensus,
238                    &self.membership.coordinator,
239                    &self.upgrade_lock,
240                    self.epoch_height,
241                )
242                .await
243                .is_ok()
244                    && transition_qc
245                        .as_ref()
246                        .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
247                {
248                    transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
249                }
250            }
251        }
252        // TODO configure timeout
253        while self.view_start_time.elapsed() < wait_duration {
254            let time_spent = Instant::now()
255                .checked_duration_since(self.view_start_time)
256                .ok_or(error!(
257                    "Time elapsed since the start of the task is negative. This should never \
258                     happen."
259                ))?;
260            let time_left = wait_duration
261                .checked_sub(time_spent)
262                .ok_or(info!("No time left"))?;
263            let Ok(Ok(event)) = tokio::time::timeout(time_left, rx.recv_direct()).await else {
264                return Ok(transition_qc);
265            };
266            if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
267                if let Some(block_number) = qc.data.block_number {
268                    if !is_transition_block(block_number, self.epoch_height) {
269                        continue;
270                    }
271                } else {
272                    continue;
273                }
274                let Some(next_epoch_qc) = maybe_next_epoch_qc else {
275                    continue;
276                };
277                if validate_qc_and_next_epoch_qc(
278                    qc,
279                    Some(next_epoch_qc),
280                    &self.consensus,
281                    &self.membership.coordinator,
282                    &self.upgrade_lock,
283                    self.epoch_height,
284                )
285                .await
286                .is_ok()
287                    && transition_qc
288                        .as_ref()
289                        .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
290                {
291                    transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
292                }
293            }
294        }
295        Ok(transition_qc)
296    }
297    /// Waits for the configured timeout for nodes to send HighQc messages to us.  We'll
298    /// then propose with the highest QC from among these proposals. A light client state
299    /// update certificate is also returned if the highest QC is an epoch root QC.
300    async fn wait_for_highest_qc(
301        &self,
302    ) -> Result<(
303        QuorumCertificate2<TYPES>,
304        Option<NextEpochQuorumCertificate2<TYPES>>,
305        Option<LightClientStateUpdateCertificateV2<TYPES>>,
306    )> {
307        tracing::debug!("waiting for QC");
308        // If we haven't upgraded to Hotstuff 2 just return the high qc right away
309        ensure!(
310            self.upgrade_lock.epochs_enabled(self.view_number),
311            error!("Epochs are not enabled yet we tried to wait for Highest QC.")
312        );
313
314        let consensus_reader = self.consensus.read().await;
315        let mut highest_qc = consensus_reader.high_qc().clone();
316        let mut state_cert = if highest_qc
317            .data
318            .block_number
319            .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
320        {
321            consensus_reader.state_cert().cloned()
322        } else {
323            None
324        };
325        let mut next_epoch_qc = if highest_qc
326            .data
327            .block_number
328            .is_some_and(|bn| is_last_block(bn, self.epoch_height))
329        {
330            let maybe_neqc = consensus_reader.next_epoch_high_qc().cloned();
331            if maybe_neqc
332                .as_ref()
333                .is_some_and(|neqc| neqc.data.leaf_commit == highest_qc.data.leaf_commit)
334            {
335                maybe_neqc
336            } else {
337                None
338            }
339        } else {
340            None
341        };
342        drop(consensus_reader);
343
344        let wait_duration = Duration::from_millis(self.timeout / 2);
345
346        let mut rx = self.receiver.clone();
347
348        // drain any qc off the queue
349        while let Ok(event) = rx.try_recv() {
350            let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
351                HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
352                    (qc, maybe_next_epoch_qc, None)
353                },
354                HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
355                    (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
356                },
357                _ => continue,
358            };
359            if validate_qc_and_next_epoch_qc(
360                qc,
361                maybe_next_epoch_qc.as_ref(),
362                &self.consensus,
363                &self.membership.coordinator,
364                &self.upgrade_lock,
365                self.epoch_height,
366            )
367            .await
368            .is_ok()
369            {
370                if qc
371                    .data
372                    .block_number
373                    .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
374                {
375                    // Validate the state cert
376                    if let Some(state_cert) = &maybe_state_cert {
377                        if validate_light_client_state_update_certificate(
378                            state_cert,
379                            &self.membership.coordinator,
380                            &self.upgrade_lock,
381                        )
382                        .await
383                        .is_err()
384                            || !check_qc_state_cert_correspondence(
385                                qc,
386                                state_cert,
387                                self.epoch_height,
388                            )
389                        {
390                            tracing::error!("Failed to validate state cert");
391                            continue;
392                        }
393                    } else {
394                        tracing::error!(
395                            "Received an epoch root QC but we don't have the corresponding state \
396                             cert."
397                        );
398                        continue;
399                    }
400                } else {
401                    maybe_state_cert = None;
402                }
403                if qc.view_number() > highest_qc.view_number() {
404                    highest_qc = qc.clone();
405                    next_epoch_qc = maybe_next_epoch_qc.clone();
406                    state_cert = maybe_state_cert;
407                }
408            }
409        }
410
411        // TODO configure timeout
412        while self.view_start_time.elapsed() < wait_duration {
413            let time_spent = Instant::now()
414                .checked_duration_since(self.view_start_time)
415                .ok_or(error!(
416                    "Time elapsed since the start of the task is negative. This should never \
417                     happen."
418                ))?;
419            let time_left = wait_duration
420                .checked_sub(time_spent)
421                .ok_or(info!("No time left"))?;
422            let Ok(maybe_qc_state_cert) =
423                tokio::time::timeout(time_left, self.wait_for_qc_event(rx.clone())).await
424            else {
425                tracing::info!(
426                    "Some nodes did not respond with their HighQc in time. Continuing with the \
427                     highest QC that we received: {highest_qc:?}"
428                );
429                return Ok((highest_qc, next_epoch_qc, state_cert));
430            };
431            let Some((qc, maybe_next_epoch_qc, maybe_state_cert)) = maybe_qc_state_cert else {
432                continue;
433            };
434            if qc.view_number() > highest_qc.view_number() {
435                highest_qc = qc;
436                next_epoch_qc = maybe_next_epoch_qc;
437                state_cert = maybe_state_cert;
438            }
439        }
440        Ok((highest_qc, next_epoch_qc, state_cert))
441    }
442    /// Publishes a proposal given the [`CommitmentAndMetadata`], [`VidDisperse`]
443    /// and high qc [`hotshot_types::simple_certificate::QuorumCertificate`],
444    /// with optional [`ViewChangeEvidence`].
445    #[allow(clippy::too_many_arguments)]
446    #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
447    async fn publish_proposal(
448        &self,
449        commitment_and_metadata: CommitmentAndMetadata<TYPES>,
450        _vid_share: Proposal<TYPES, VidDisperse<TYPES>>,
451        view_change_evidence: Option<ViewChangeEvidence2<TYPES>>,
452        formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
453        parent_qc: QuorumCertificate2<TYPES>,
454        maybe_next_epoch_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
455        maybe_state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
456    ) -> Result<()> {
457        let (parent_leaf, state) = parent_leaf_and_state(
458            &self.sender,
459            &self.receiver,
460            self.membership.coordinator.clone(),
461            self.public_key.clone(),
462            self.private_key.clone(),
463            OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
464            &self.upgrade_lock,
465            &parent_qc,
466            self.epoch_height,
467        )
468        .await?;
469
470        // In order of priority, we should try to attach:
471        //   - the parent certificate if it exists, or
472        //   - our own certificate that we formed.
473        // In either case, we need to ensure that the certificate is still relevant.
474        //
475        // Note: once we reach a point of potentially propose with our formed upgrade certificate,
476        // we will ALWAYS drop it. If we cannot immediately use it for whatever reason, we choose
477        // to discard it.
478        //
479        // It is possible that multiple nodes form separate upgrade certificates for the some
480        // upgrade if we are not careful about voting. But this shouldn't bother us: the first
481        // leader to propose is the one whose certificate will be used. And if that fails to reach
482        // a decide for whatever reason, we may lose our own certificate, but something will likely
483        // have gone wrong there anyway.
484        let mut upgrade_certificate = parent_leaf
485            .upgrade_certificate()
486            .or(formed_upgrade_certificate);
487
488        if let Some(cert) = upgrade_certificate.clone()
489            && cert.is_relevant(self.view_number).await.is_err()
490        {
491            upgrade_certificate = None;
492        }
493
494        let proposal_certificate = view_change_evidence
495            .as_ref()
496            .filter(|cert| cert.is_valid_for_view(&self.view_number))
497            .cloned();
498
499        ensure!(
500            commitment_and_metadata.block_view == self.view_number,
501            "Cannot propose because our VID payload commitment and metadata is for an older view."
502        );
503
504        let version = self.upgrade_lock.version(self.view_number)?;
505
506        let builder_commitment = commitment_and_metadata.builder_commitment.clone();
507        let metadata = commitment_and_metadata.metadata.clone();
508
509        if version >= EPOCH_VERSION
510            && parent_qc.view_number()
511                > self
512                    .upgrade_lock
513                    .upgrade_view()
514                    .unwrap_or(ViewNumber::new(0))
515        {
516            let Some(parent_block_number) = parent_qc.data.block_number else {
517                tracing::error!("Parent QC does not have a block number. Do not propose.");
518                return Ok(());
519            };
520            if is_epoch_transition(parent_block_number, self.epoch_height)
521                && !is_last_block(parent_block_number, self.epoch_height)
522            {
523                let (empty_payload, empty_metadata) = <TYPES as NodeType>::BlockPayload::empty();
524                tracing::info!("Reached end of epoch.");
525                ensure!(
526                    builder_commitment == empty_payload.builder_commitment(&metadata)
527                        && metadata == empty_metadata,
528                    "We're trying to propose non empty block in the epoch transition. Do not \
529                     propose. View number: {}. Parent Block number: {}",
530                    self.view_number,
531                    parent_block_number,
532                );
533            }
534            if is_epoch_root(parent_block_number, self.epoch_height) {
535                ensure!(
536                    maybe_state_cert.as_ref().is_some_and(|state_cert| {
537                        check_qc_state_cert_correspondence(
538                            &parent_qc,
539                            state_cert,
540                            self.epoch_height,
541                        )
542                    }),
543                    "We are proposing with parent epoch root QC but we don't have the \
544                     corresponding state cert."
545                );
546            }
547        }
548        let block_header = TYPES::BlockHeader::new(
549            state.as_ref(),
550            self.instance_state.as_ref(),
551            &parent_leaf,
552            commitment_and_metadata.commitment,
553            builder_commitment,
554            metadata,
555            commitment_and_metadata.fees.first().clone(),
556            version,
557            *self.view_number,
558        )
559        .await
560        .wrap()
561        .context(warn!("Failed to construct block header"))?;
562        let epoch = option_epoch_from_block_number(
563            version >= EPOCH_VERSION,
564            block_header.block_number(),
565            self.epoch_height,
566        );
567
568        let epoch_membership = self.membership.coordinator.membership_for_epoch(epoch)?;
569        // Make sure we are the leader for the view and epoch.
570        // We might have ended up here because we were in the epoch transition.
571        if epoch_membership.leader(self.view_number)? != self.public_key {
572            tracing::warn!(
573                "We are not the leader in the epoch for which we are about to propose. Do not \
574                 send the quorum proposal."
575            );
576            return Ok(());
577        }
578        let is_high_qc_for_transition_block = parent_qc
579            .data
580            .block_number
581            .is_some_and(|block_number| is_epoch_transition(block_number, self.epoch_height));
582        let next_epoch_qc = if self.upgrade_lock.epochs_enabled(self.view_number)
583            && is_high_qc_for_transition_block
584        {
585            ensure!(
586                maybe_next_epoch_qc
587                    .as_ref()
588                    .is_some_and(|neqc| neqc.data.leaf_commit == parent_qc.data.leaf_commit),
589                "Justify QC on our proposal is for an epoch transition block but we don't have \
590                 the corresponding next epoch QC. Do not propose."
591            );
592            maybe_next_epoch_qc
593        } else {
594            None
595        };
596        let next_drb_result = if is_epoch_transition(block_header.block_number(), self.epoch_height)
597        {
598            if let Some(epoch_val) = &epoch {
599                let drb_result = epoch_membership
600                    .next_epoch()
601                    .context(warn!("No stake table for epoch {}", *epoch_val + 1))?
602                    .get_epoch_drb()
603                    .await
604                    .clone()
605                    .context(warn!("No DRB result for epoch {}", *epoch_val + 1))?;
606
607                Some(drb_result)
608            } else {
609                None
610            }
611        } else {
612            None
613        };
614
615        let proposal = QuorumProposalWrapper {
616            proposal: QuorumProposal2 {
617                block_header,
618                view_number: self.view_number,
619                epoch,
620                justify_qc: parent_qc,
621                next_epoch_justify_qc: next_epoch_qc,
622                upgrade_certificate,
623                view_change_evidence: proposal_certificate,
624                next_drb_result,
625                state_cert: maybe_state_cert,
626            },
627        };
628
629        let proposed_leaf = Leaf2::from_quorum_proposal(&proposal);
630        ensure!(
631            proposed_leaf.parent_commitment() == parent_leaf.commit(),
632            "Proposed leaf parent does not equal high qc"
633        );
634
635        let signature =
636            TYPES::SignatureKey::sign(&self.private_key, proposed_leaf.commit().as_ref())
637                .wrap()
638                .context(error!("Failed to compute proposed_leaf.commit()"))?;
639
640        let message = Proposal {
641            data: proposal,
642            signature,
643            _pd: PhantomData,
644        };
645        tracing::info!(
646            "Sending proposal for view {}, height {}, justify_qc view: {}",
647            proposed_leaf.view_number(),
648            proposed_leaf.height(),
649            proposed_leaf.justify_qc().view_number()
650        );
651
652        broadcast_event(
653            Arc::new(HotShotEvent::QuorumProposalSend(
654                message.clone(),
655                self.public_key.clone(),
656            )),
657            &self.sender,
658        )
659        .await;
660
661        Ok(())
662    }
663
664    fn print_proposal_events(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) {
665        let events: Vec<_> = res.iter().flatten().flatten().map(Arc::as_ref).collect();
666        tracing::warn!("Failed to propose, events: {:#?}", events);
667    }
668
669    async fn handle_proposal_deps(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) -> Result<()> {
670        let mut commit_and_metadata: Option<CommitmentAndMetadata<TYPES>> = None;
671        let mut timeout_certificate = None;
672        let mut view_sync_finalize_cert = None;
673        let mut vid_share = None;
674        let mut parent_qc = None;
675        let mut next_epoch_qc = None;
676        let mut state_cert = None;
677        for event in res.iter().flatten().flatten() {
678            match event.as_ref() {
679                HotShotEvent::SendPayloadCommitmentAndMetadata(
680                    payload_commitment,
681                    builder_commitment,
682                    metadata,
683                    view,
684                    fees,
685                ) => {
686                    commit_and_metadata = Some(CommitmentAndMetadata {
687                        commitment: *payload_commitment,
688                        builder_commitment: builder_commitment.clone(),
689                        metadata: metadata.clone(),
690                        fees: fees.clone(),
691                        block_view: *view,
692                    });
693                },
694                HotShotEvent::Qc2Formed(cert) => match cert {
695                    either::Right(timeout) => {
696                        timeout_certificate = Some(timeout.clone());
697                    },
698                    either::Left(qc) => {
699                        parent_qc = Some(qc.clone());
700                    },
701                },
702                HotShotEvent::EpochRootQcFormed(root_qc) => {
703                    parent_qc = Some(root_qc.qc.clone());
704                    state_cert = Some(root_qc.state_cert.clone());
705                },
706                HotShotEvent::ViewSyncFinalizeCertificateRecv(cert) => {
707                    view_sync_finalize_cert = Some(cert.clone());
708                },
709                HotShotEvent::VidDisperseSend(share, _) => {
710                    vid_share = Some(share.clone());
711                },
712                HotShotEvent::NextEpochQc2Formed(either::Left(qc)) => {
713                    next_epoch_qc = Some(qc.clone());
714                },
715                _ => {},
716            }
717        }
718
719        let Ok(version) = self.upgrade_lock.version(self.view_number) else {
720            bail!(error!(
721                "Failed to get version for view {:?}, not proposing",
722                self.view_number
723            ));
724        };
725
726        let mut maybe_epoch = None;
727        let proposal_cert = if let Some(view_sync_cert) = view_sync_finalize_cert {
728            maybe_epoch = view_sync_cert.data.epoch;
729            Some(ViewChangeEvidence2::ViewSync(view_sync_cert))
730        } else {
731            match timeout_certificate {
732                Some(timeout_cert) => {
733                    maybe_epoch = timeout_cert.data.epoch;
734                    Some(ViewChangeEvidence2::Timeout(timeout_cert))
735                },
736                None => None,
737            }
738        };
739
740        let (parent_qc, maybe_next_epoch_qc, maybe_state_cert) = if let Some(qc) = parent_qc {
741            if qc
742                .data
743                .block_number
744                .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
745                && next_epoch_qc
746                    .as_ref()
747                    .is_none_or(|neqc| neqc.data.leaf_commit != qc.data.leaf_commit)
748            {
749                bail!(error!(
750                    "We've formed a transition QC but we haven't formed the corresponding next \
751                     epoch QC. Do not propose."
752                ));
753            }
754            (qc, next_epoch_qc, state_cert)
755        } else if version < EPOCH_VERSION {
756            (self.consensus.read().await.high_qc().clone(), None, None)
757        } else if proposal_cert.is_some() {
758            // If we have a view change evidence, we need to wait to propose with the transition QC
759            if let Ok(Some((qc, next_epoch_qc))) = self.wait_for_transition_qc().await {
760                let Some(epoch) = maybe_epoch else {
761                    bail!(error!(
762                        "No epoch found on view change evidence, but we are in epoch mode"
763                    ));
764                };
765                if qc
766                    .data
767                    .block_number
768                    .is_some_and(|bn| epoch_from_block_number(bn, self.epoch_height) == *epoch)
769                {
770                    (qc, Some(next_epoch_qc), None)
771                } else {
772                    match self.wait_for_highest_qc().await {
773                        Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
774                            (qc, maybe_next_epoch_qc, maybe_state_cert)
775                        },
776                        Err(e) => {
777                            bail!(error!("Error while waiting for highest QC: {e:?}"));
778                        },
779                    }
780                }
781            } else {
782                let Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) =
783                    self.wait_for_highest_qc().await
784                else {
785                    bail!(error!("Error while waiting for highest QC"));
786                };
787                if qc.data.block_number.is_some_and(|bn| {
788                    is_epoch_transition(bn, self.epoch_height)
789                        && !is_last_block(bn, self.epoch_height)
790                }) {
791                    bail!(error!(
792                        "High is in transition but we need to propose with transition QC, do \
793                         nothing"
794                    ));
795                }
796                (qc, maybe_next_epoch_qc, maybe_state_cert)
797            }
798        } else {
799            match self.wait_for_highest_qc().await {
800                Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
801                    (qc, maybe_next_epoch_qc, maybe_state_cert)
802                },
803                Err(e) => {
804                    bail!(error!("Error while waiting for highest QC: {e:?}"));
805                },
806            }
807        };
808
809        ensure!(
810            commit_and_metadata.is_some(),
811            error!(
812                "Somehow completed the proposal dependency task without a commitment and metadata"
813            )
814        );
815
816        ensure!(
817            vid_share.is_some(),
818            error!("Somehow completed the proposal dependency task without a VID share")
819        );
820
821        self.publish_proposal(
822            commit_and_metadata.unwrap(),
823            vid_share.unwrap(),
824            proposal_cert,
825            self.formed_upgrade_certificate.clone(),
826            parent_qc,
827            maybe_next_epoch_qc,
828            maybe_state_cert,
829        )
830        .await
831        .inspect_err(|e| tracing::error!("Failed to publish proposal: {e:?}"))
832    }
833}
834
835impl<TYPES: NodeType> HandleDepOutput for ProposalDependencyHandle<TYPES> {
836    type Output = Vec<Vec<Vec<Arc<HotShotEvent<TYPES>>>>>;
837
838    #[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)]
839    #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
840    async fn handle_dep_result(self, res: Self::Output) {
841        let mut cancel_receiver = self.cancel_receiver.clone();
842        let result = tokio::select! {
843            result = self.handle_proposal_deps(&res) => {
844                result
845            }
846            _ = cancel_receiver.recv() => {
847                tracing::warn!("Proposal dependency task cancelled");
848                return;
849            }
850        };
851        if result.is_err() {
852            log!(result);
853            self.print_proposal_events(&res)
854        }
855    }
856}
857
858pub(super) async fn handle_eqc_formed<TYPES: NodeType, I: NodeImplementation<TYPES>>(
859    cert_view: ViewNumber,
860    leaf_commit: Commitment<Leaf2<TYPES>>,
861    block_number: Option<u64>,
862    task_state: &mut QuorumProposalTaskState<TYPES, I>,
863    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
864) {
865    if !task_state.upgrade_lock.epochs_enabled(cert_view) {
866        tracing::debug!("QC2 formed but epochs not enabled. Do nothing");
867        return;
868    }
869    if !block_number.is_some_and(|bn| is_last_block(bn, task_state.epoch_height)) {
870        tracing::debug!("We formed QC but not eQC. Do nothing");
871        return;
872    }
873
874    let Some(current_epoch_qc) = task_state.formed_quorum_certificates.get(&cert_view) else {
875        tracing::debug!("We formed the eQC but we don't have the current epoch QC at all.");
876        return;
877    };
878    if current_epoch_qc.view_number() != cert_view
879        || current_epoch_qc.data.leaf_commit != leaf_commit
880    {
881        tracing::debug!("We haven't yet formed the eQC. Do nothing");
882        return;
883    }
884    let Some(next_epoch_qc) = task_state
885        .formed_next_epoch_quorum_certificates
886        .get(&cert_view)
887    else {
888        tracing::debug!("We formed the eQC but we don't have the next epoch eQC at all.");
889        return;
890    };
891    if current_epoch_qc.view_number() != cert_view || current_epoch_qc.data != *next_epoch_qc.data {
892        tracing::debug!(
893            "We formed the eQC but the current and next epoch QCs do not correspond to each other."
894        );
895        return;
896    }
897    let current_epoch_qc_clone = current_epoch_qc.clone();
898
899    let mut consensus_writer = task_state.consensus.write().await;
900    let _ = consensus_writer.update_high_qc(current_epoch_qc_clone.clone());
901    let _ = consensus_writer.update_next_epoch_high_qc(next_epoch_qc.clone());
902    drop(consensus_writer);
903
904    if let Err(e) = task_state
905        .storage
906        .update_eqc(current_epoch_qc.clone(), next_epoch_qc.clone())
907        .await
908    {
909        tracing::error!("Failed to store EQC: {}", e);
910    }
911
912    task_state.formed_quorum_certificates =
913        task_state.formed_quorum_certificates.split_off(&cert_view);
914    task_state.formed_next_epoch_quorum_certificates = task_state
915        .formed_next_epoch_quorum_certificates
916        .split_off(&cert_view);
917
918    broadcast_event(
919        Arc::new(HotShotEvent::ExtendedQc2Formed(current_epoch_qc_clone)),
920        event_sender,
921    )
922    .await;
923}