Skip to main content

hotshot_task_impls/quorum_proposal/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender, broadcast};
10use async_trait::async_trait;
11use either::Either;
12use hotshot_task::{
13    dependency::{AndDependency, EventDependency, OrDependency},
14    dependency_task::DependencyTask,
15    task::TaskState,
16};
17use hotshot_types::{
18    consensus::OuterConsensus,
19    data::{EpochNumber, ViewNumber},
20    epoch_membership::EpochMembershipCoordinator,
21    message::UpgradeLock,
22    simple_certificate::{
23        EpochRootQuorumCertificateV2, LightClientStateUpdateCertificateV2,
24        NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
25    },
26    stake_table::StakeTableEntries,
27    traits::{
28        node_implementation::{NodeImplementation, NodeType},
29        signature_key::SignatureKey,
30        storage::Storage,
31    },
32    utils::{EpochTransitionIndicator, is_epoch_transition, is_last_block},
33    vote::{Certificate, HasViewNumber},
34};
35use hotshot_utils::anytrace::*;
36use tracing::instrument;
37
38use self::handlers::{ProposalDependency, ProposalDependencyHandle};
39use crate::{
40    events::HotShotEvent, helpers::broadcast_view_change,
41    quorum_proposal::handlers::handle_eqc_formed,
42};
43
44mod handlers;
45
46/// The state for the quorum proposal task.
47pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
48    /// Latest view number that has been proposed for.
49    pub latest_proposed_view: ViewNumber,
50
51    /// Current epoch
52    pub cur_epoch: Option<EpochNumber>,
53
54    /// Table for the in-progress proposal dependency tasks.
55    pub proposal_dependencies: BTreeMap<ViewNumber, Sender<()>>,
56
57    /// Formed QCs
58    pub formed_quorum_certificates: BTreeMap<ViewNumber, QuorumCertificate2<TYPES>>,
59
60    /// Formed QCs for the next epoch
61    pub formed_next_epoch_quorum_certificates:
62        BTreeMap<ViewNumber, NextEpochQuorumCertificate2<TYPES>>,
63
64    /// Immutable instance state
65    pub instance_state: Arc<TYPES::InstanceState>,
66
67    /// Membership for Quorum Certs/votes
68    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
69
70    /// Our public key
71    pub public_key: TYPES::SignatureKey,
72
73    /// Our Private Key
74    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
75
76    /// View timeout from config.
77    pub timeout: u64,
78
79    /// This node's storage ref
80    pub storage: I::Storage,
81
82    /// Shared consensus task state
83    pub consensus: OuterConsensus<TYPES>,
84
85    /// The node's id
86    pub id: u64,
87
88    /// The most recent upgrade certificate this node formed.
89    /// Note: this is ONLY for certificates that have been formed internally,
90    /// so that we can propose with them.
91    ///
92    /// Certificates received from other nodes will get reattached regardless of this fields,
93    /// since they will be present in the leaf we propose off of.
94    pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
95
96    /// Lock for a decided upgrade
97    pub upgrade_lock: UpgradeLock<TYPES>,
98
99    /// Number of blocks in an epoch, zero means there are no epochs
100    pub epoch_height: u64,
101
102    /// Formed light client state update certificates
103    pub formed_state_cert: BTreeMap<EpochNumber, LightClientStateUpdateCertificateV2<TYPES>>,
104
105    /// First view in which epoch version takes effect
106    pub first_epoch: Option<(ViewNumber, EpochNumber)>,
107}
108
109impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumProposalTaskState<TYPES, I> {
110    /// Create an event dependency
111    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create event dependency", level = "info")]
112    fn create_event_dependency(
113        &self,
114        dependency_type: ProposalDependency,
115        view_number: ViewNumber,
116        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
117        cancel_receiver: Receiver<()>,
118    ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
119        let id = self.id;
120        EventDependency::new(
121            event_receiver,
122            cancel_receiver,
123            format!(
124                "ProposalDependency::{:?} for view {:?}, my id {:?}",
125                dependency_type, view_number, self.id
126            ),
127            Box::new(move |event| {
128                let event = event.as_ref();
129                let event_view = match dependency_type {
130                    ProposalDependency::Qc => {
131                        if let HotShotEvent::Qc2Formed(either::Left(qc)) = event {
132                            qc.view_number() + 1
133                        } else if let HotShotEvent::EpochRootQcFormed(root_qc) = event {
134                            root_qc.view_number() + 1
135                        } else {
136                            return false;
137                        }
138                    },
139                    ProposalDependency::TimeoutCert => {
140                        if let HotShotEvent::Qc2Formed(either::Right(timeout)) = event {
141                            timeout.view_number() + 1
142                        } else {
143                            return false;
144                        }
145                    },
146                    ProposalDependency::ViewSyncCert => {
147                        if let HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_cert) = event
148                        {
149                            view_sync_cert.view_number()
150                        } else {
151                            return false;
152                        }
153                    },
154                    ProposalDependency::Proposal => {
155                        if let HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) = event
156                        {
157                            proposal.data.view_number() + 1
158                        } else {
159                            return false;
160                        }
161                    },
162                    ProposalDependency::PayloadAndMetadata => {
163                        if let HotShotEvent::SendPayloadCommitmentAndMetadata(
164                            _payload_commitment,
165                            _builder_commitment,
166                            _metadata,
167                            view_number,
168                            _fee,
169                        ) = event
170                        {
171                            *view_number
172                        } else {
173                            return false;
174                        }
175                    },
176                    ProposalDependency::VidShare => {
177                        if let HotShotEvent::VidDisperseSend(vid_disperse, _) = event {
178                            vid_disperse.data.view_number()
179                        } else {
180                            return false;
181                        }
182                    },
183                };
184                let valid = event_view == view_number;
185                if valid {
186                    tracing::debug!(
187                        "Dependency {dependency_type:?} is complete for view {event_view:?}, my \
188                         id is {id:?}!",
189                    );
190                }
191                valid
192            }),
193        )
194    }
195
196    /// Creates the requisite dependencies for the Quorum Proposal task. It also handles any event forwarding.
197    fn create_and_complete_dependencies(
198        &self,
199        view_number: ViewNumber,
200        event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
201        event: Arc<HotShotEvent<TYPES>>,
202        cancel_receiver: &Receiver<()>,
203    ) -> AndDependency<Vec<Vec<Arc<HotShotEvent<TYPES>>>>> {
204        let mut proposal_dependency = self.create_event_dependency(
205            ProposalDependency::Proposal,
206            view_number,
207            event_receiver.clone(),
208            cancel_receiver.clone(),
209        );
210
211        let mut qc_dependency = self.create_event_dependency(
212            ProposalDependency::Qc,
213            view_number,
214            event_receiver.clone(),
215            cancel_receiver.clone(),
216        );
217
218        let mut view_sync_dependency = self.create_event_dependency(
219            ProposalDependency::ViewSyncCert,
220            view_number,
221            event_receiver.clone(),
222            cancel_receiver.clone(),
223        );
224
225        let mut timeout_dependency = self.create_event_dependency(
226            ProposalDependency::TimeoutCert,
227            view_number,
228            event_receiver.clone(),
229            cancel_receiver.clone(),
230        );
231
232        let mut payload_commitment_dependency = self.create_event_dependency(
233            ProposalDependency::PayloadAndMetadata,
234            view_number,
235            event_receiver.clone(),
236            cancel_receiver.clone(),
237        );
238
239        let mut vid_share_dependency = self.create_event_dependency(
240            ProposalDependency::VidShare,
241            view_number,
242            event_receiver.clone(),
243            cancel_receiver.clone(),
244        );
245
246        let epoch_height = self.epoch_height;
247
248        // Next epoch QC dependency is fulfilled if we get the next epoch QC or
249        // form a current qc that isn't during transition
250        let mut next_epoch_qc_dependency = EventDependency::new(
251            event_receiver.clone(),
252            cancel_receiver.clone(),
253            format!(
254                "ProposalDependency Next epoch QC for view {:?}, my id {:?}",
255                view_number, self.id
256            ),
257            Box::new(move |event| {
258                if let HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) =
259                    event.as_ref()
260                {
261                    return next_epoch_qc.view_number() + 1 == view_number;
262                }
263                if let HotShotEvent::EpochRootQcFormed(..) = event.as_ref() {
264                    // Epoch root QC is always not in epoch transition
265                    return true;
266                }
267                if let HotShotEvent::Qc2Formed(Either::Left(qc)) = event.as_ref()
268                    && qc.view_number() + 1 == view_number
269                {
270                    return qc
271                        .data
272                        .block_number
273                        .is_none_or(|bn| !is_epoch_transition(bn, epoch_height));
274                }
275                false
276            }),
277        );
278
279        match event.as_ref() {
280            HotShotEvent::SendPayloadCommitmentAndMetadata(..) => {
281                payload_commitment_dependency.mark_as_completed(Arc::clone(&event));
282            },
283            HotShotEvent::QuorumProposalPreliminarilyValidated(..) => {
284                proposal_dependency.mark_as_completed(event);
285            },
286            HotShotEvent::Qc2Formed(quorum_certificate) => match quorum_certificate {
287                Either::Right(_) => timeout_dependency.mark_as_completed(event),
288                Either::Left(qc) => {
289                    if qc
290                        .data
291                        .block_number
292                        .is_none_or(|bn| !is_epoch_transition(bn, epoch_height))
293                    {
294                        next_epoch_qc_dependency.mark_as_completed(event.clone());
295                    }
296                    qc_dependency.mark_as_completed(event);
297                },
298            },
299            HotShotEvent::EpochRootQcFormed(..) => {
300                // Epoch root QC is always not in epoch transition
301                next_epoch_qc_dependency.mark_as_completed(event.clone());
302                qc_dependency.mark_as_completed(event);
303            },
304            HotShotEvent::ViewSyncFinalizeCertificateRecv(_) => {
305                view_sync_dependency.mark_as_completed(event);
306            },
307            HotShotEvent::VidDisperseSend(..) => {
308                vid_share_dependency.mark_as_completed(event);
309            },
310            HotShotEvent::NextEpochQc2Formed(Either::Left(_)) => {
311                next_epoch_qc_dependency.mark_as_completed(event);
312            },
313            _ => {},
314        };
315
316        // We have three cases to consider:
317        let mut secondary_deps = vec![
318            // 1. A timeout cert was received
319            AndDependency::from_deps(vec![timeout_dependency]),
320            // 2. A view sync cert was received.
321            AndDependency::from_deps(vec![view_sync_dependency]),
322        ];
323        // 3. A `Qc2Formed`` event (and `QuorumProposalRecv` event)
324        if *view_number > 1 {
325            secondary_deps.push(AndDependency::from_deps(vec![
326                qc_dependency,
327                proposal_dependency,
328                next_epoch_qc_dependency,
329            ]));
330        } else {
331            secondary_deps.push(AndDependency::from_deps(vec![qc_dependency]));
332        }
333
334        let primary_deps = vec![payload_commitment_dependency, vid_share_dependency];
335
336        AndDependency::from_deps(vec![OrDependency::from_deps(vec![
337            AndDependency::from_deps(vec![
338                OrDependency::from_deps(vec![AndDependency::from_deps(primary_deps)]),
339                OrDependency::from_deps(secondary_deps),
340            ]),
341        ])])
342    }
343
344    /// Create and store an [`AndDependency`] combining [`EventDependency`]s associated with the
345    /// given view number if it doesn't exist. Also takes in the received `event` to seed a
346    /// dependency as already completed. This allows for the task to receive a proposable event
347    /// without losing the data that it received, as the dependency task would otherwise have no
348    /// ability to receive the event and, thus, would never propose.
349    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create dependency task", level = "error")]
350    async fn create_dependency_task_if_new(
351        &mut self,
352        view_number: ViewNumber,
353        epoch_number: Option<EpochNumber>,
354        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
355        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
356        event: Arc<HotShotEvent<TYPES>>,
357        epoch_transition_indicator: EpochTransitionIndicator,
358    ) -> Result<()> {
359        let epoch_membership = self
360            .membership_coordinator
361            .membership_for_epoch(epoch_number)?;
362        let leader_in_current_epoch = epoch_membership.leader(view_number)? == self.public_key;
363        // If we are in the epoch transition and we are the leader in the next epoch,
364        // we might want to start collecting dependencies for our next epoch proposal.
365
366        let leader_in_next_epoch = !leader_in_current_epoch
367            && epoch_number.is_some()
368            && matches!(
369                epoch_transition_indicator,
370                EpochTransitionIndicator::InTransition
371            )
372            && epoch_membership
373                .next_epoch()
374                .context(warn!(
375                    "Missing the randomized stake table for epoch {}",
376                    epoch_number.unwrap() + 1
377                ))?
378                .leader(view_number)?
379                == self.public_key;
380
381        // Don't even bother making the task if we are not entitled to propose anyway.
382        ensure!(
383            leader_in_current_epoch || leader_in_next_epoch,
384            debug!("We are not the leader of the next view")
385        );
386
387        // Don't try to propose twice for the same view.
388        ensure!(
389            view_number > self.latest_proposed_view,
390            "We have already proposed for this view"
391        );
392
393        tracing::debug!(
394            "Attempting to make dependency task for view {view_number} and event {event:?}"
395        );
396
397        ensure!(
398            !self.proposal_dependencies.contains_key(&view_number),
399            "Task already exists"
400        );
401
402        let (cancel_sender, cancel_receiver) = broadcast(1);
403
404        let dependency_chain = self.create_and_complete_dependencies(
405            view_number,
406            &event_receiver,
407            event,
408            &cancel_receiver,
409        );
410
411        let dependency_task = DependencyTask::new(
412            dependency_chain,
413            ProposalDependencyHandle {
414                latest_proposed_view: self.latest_proposed_view,
415                view_number,
416                sender: event_sender,
417                receiver: event_receiver,
418                membership: epoch_membership,
419                public_key: self.public_key.clone(),
420                private_key: self.private_key.clone(),
421                instance_state: Arc::clone(&self.instance_state),
422                consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
423                timeout: self.timeout,
424                formed_upgrade_certificate: self.formed_upgrade_certificate.clone(),
425                upgrade_lock: self.upgrade_lock.clone(),
426                id: self.id,
427                view_start_time: Instant::now(),
428                epoch_height: self.epoch_height,
429                cancel_receiver,
430            },
431        );
432        self.proposal_dependencies
433            .insert(view_number, cancel_sender);
434
435        dependency_task.run();
436
437        Ok(())
438    }
439
440    /// Update the latest proposed view number.
441    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Update latest proposed view", level = "error")]
442    fn update_latest_proposed_view(&mut self, new_view: ViewNumber) -> bool {
443        if *self.latest_proposed_view < *new_view {
444            tracing::debug!(
445                "Updating latest proposed view from {} to {}",
446                *self.latest_proposed_view,
447                *new_view
448            );
449
450            // Cancel the old dependency tasks.
451            for view in (*self.latest_proposed_view + 1)..=(*new_view) {
452                let maybe_cancel_sender = self.proposal_dependencies.remove(&ViewNumber::new(view));
453                if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
454                    tracing::debug!("Aborting proposal dependency task for view {view}");
455                    let _ = maybe_cancel_sender.unwrap().try_broadcast(());
456                }
457            }
458
459            self.latest_proposed_view = new_view;
460
461            return true;
462        }
463        false
464    }
465
466    /// Handles a consensus event received on the event stream
467    #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view, epoch = self.cur_epoch.map(|x| *x)), name = "handle method", level = "error", target = "QuorumProposalTaskState")]
468    pub async fn handle(
469        &mut self,
470        event: Arc<HotShotEvent<TYPES>>,
471        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
472        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
473    ) -> Result<()> {
474        let epoch_number = self.cur_epoch;
475        let maybe_high_qc_block_number = self.consensus.read().await.high_qc().data.block_number;
476        let epoch_transition_indicator = if maybe_high_qc_block_number.is_some_and(|bn| {
477            is_epoch_transition(bn, self.epoch_height) && !is_last_block(bn, self.epoch_height)
478        }) {
479            EpochTransitionIndicator::InTransition
480        } else {
481            EpochTransitionIndicator::NotInTransition
482        };
483        match event.as_ref() {
484            HotShotEvent::UpgradeCertificateFormed(cert) => {
485                tracing::debug!(
486                    "Upgrade certificate received for view {}!",
487                    *cert.view_number
488                );
489                // Update our current upgrade_cert as long as we still have a chance of reaching a decide on it in time.
490                if cert.data.decide_by >= self.latest_proposed_view + 3 {
491                    tracing::debug!("Updating current formed_upgrade_certificate");
492
493                    self.formed_upgrade_certificate = Some(cert.clone());
494                }
495            },
496            HotShotEvent::Qc2Formed(cert) => match cert.clone() {
497                either::Right(timeout_cert) => {
498                    let view_number = timeout_cert.view_number + 1;
499                    self.create_dependency_task_if_new(
500                        view_number,
501                        epoch_number,
502                        event_receiver,
503                        event_sender,
504                        Arc::clone(&event),
505                        epoch_transition_indicator,
506                    )
507                    .await?;
508                },
509                either::Left(qc) => {
510                    // Only update if the qc is from a newer view
511                    if qc.view_number <= self.consensus.read().await.high_qc().view_number {
512                        tracing::trace!(
513                            "Received a QC for a view that was not > than our current high QC"
514                        );
515                    }
516
517                    self.formed_quorum_certificates
518                        .insert(qc.view_number(), qc.clone());
519
520                    handle_eqc_formed(
521                        qc.view_number(),
522                        qc.data.leaf_commit,
523                        qc.data.block_number,
524                        self,
525                        &event_sender,
526                    )
527                    .await;
528
529                    let view_number = qc.view_number() + 1;
530                    if !qc
531                        .data
532                        .block_number
533                        .is_some_and(|bn| is_last_block(bn, self.epoch_height))
534                    {
535                        broadcast_view_change(
536                            &event_sender,
537                            view_number,
538                            qc.data.epoch,
539                            self.first_epoch,
540                        )
541                        .await;
542                    }
543                    self.create_dependency_task_if_new(
544                        view_number,
545                        epoch_number,
546                        event_receiver,
547                        event_sender,
548                        Arc::clone(&event),
549                        epoch_transition_indicator,
550                    )
551                    .await?;
552                },
553            },
554
555            HotShotEvent::EpochRootQcFormed(EpochRootQuorumCertificateV2 { qc, state_cert }) => {
556                // Only update if the qc is from a newer view
557                if qc.view_number() <= self.consensus.read().await.high_qc().view_number {
558                    tracing::trace!(
559                        "Received a QC for a view that was not > than our current high QC"
560                    );
561                }
562
563                self.formed_quorum_certificates
564                    .insert(qc.view_number(), qc.clone());
565                self.formed_state_cert
566                    .insert(state_cert.epoch, state_cert.clone());
567
568                self.storage
569                    .update_high_qc2_and_state_cert(qc.clone(), state_cert.clone())
570                    .await
571                    .wrap()
572                    .context(error!(
573                        "Failed to update the epoch root QC and state cert in storage!"
574                    ))?;
575
576                let view_number = qc.view_number() + 1;
577                broadcast_view_change(&event_sender, view_number, qc.data.epoch, self.first_epoch)
578                    .await;
579                self.create_dependency_task_if_new(
580                    view_number,
581                    epoch_number,
582                    event_receiver,
583                    event_sender,
584                    Arc::clone(&event),
585                    epoch_transition_indicator,
586                )
587                .await?;
588            },
589            HotShotEvent::SendPayloadCommitmentAndMetadata(
590                _payload_commitment,
591                _builder_commitment,
592                _metadata,
593                view_number,
594                _fee,
595            ) => {
596                let view_number = *view_number;
597
598                self.create_dependency_task_if_new(
599                    view_number,
600                    epoch_number,
601                    event_receiver,
602                    event_sender,
603                    Arc::clone(&event),
604                    epoch_transition_indicator,
605                )
606                .await?;
607            },
608            HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
609                let epoch_number = certificate.data.epoch;
610                let epoch_membership = self
611                    .membership_coordinator
612                    .stake_table_for_epoch(epoch_number)
613                    .context(warn!("No Stake Table for Epoch = {epoch_number:?}"))?;
614
615                let membership_stake_table =
616                    StakeTableEntries::from_iter(epoch_membership.stake_table()).0;
617                let membership_success_threshold = epoch_membership.success_threshold();
618
619                certificate
620                    .is_valid_cert(
621                        &membership_stake_table,
622                        membership_success_threshold,
623                        &self.upgrade_lock,
624                    )
625                    .context(|e| {
626                        warn!(
627                            "View Sync Finalize certificate {:?} was invalid: {}",
628                            certificate.data(),
629                            e
630                        )
631                    })?;
632
633                let view_number = certificate.view_number;
634
635                self.create_dependency_task_if_new(
636                    view_number,
637                    epoch_number,
638                    event_receiver,
639                    event_sender,
640                    event,
641                    epoch_transition_indicator,
642                )
643                .await?;
644            },
645            HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
646                let view_number = proposal.data.view_number();
647                // All nodes get the latest proposed view as a proxy of `cur_view` of old.
648                if !self.update_latest_proposed_view(view_number) {
649                    tracing::trace!("Failed to update latest proposed view");
650                }
651
652                self.create_dependency_task_if_new(
653                    view_number + 1,
654                    epoch_number,
655                    event_receiver,
656                    event_sender,
657                    Arc::clone(&event),
658                    epoch_transition_indicator,
659                )
660                .await?;
661            },
662            HotShotEvent::QuorumProposalSend(proposal, _) => {
663                let view = proposal.data.view_number();
664
665                ensure!(
666                    self.update_latest_proposed_view(view),
667                    "Failed to update latest proposed view"
668                );
669            },
670            HotShotEvent::VidDisperseSend(vid_disperse, _) => {
671                let view_number = vid_disperse.data.view_number();
672                self.create_dependency_task_if_new(
673                    view_number,
674                    epoch_number,
675                    event_receiver,
676                    event_sender,
677                    Arc::clone(&event),
678                    epoch_transition_indicator,
679                )
680                .await?;
681            },
682            HotShotEvent::ViewChange(view, epoch) => {
683                if epoch > &self.cur_epoch {
684                    self.cur_epoch = *epoch;
685                }
686                let keep_view = ViewNumber::new(view.saturating_sub(1));
687                self.cancel_tasks(keep_view);
688            },
689            HotShotEvent::Timeout(view, ..) => {
690                let keep_view = ViewNumber::new(view.saturating_sub(1));
691                self.cancel_tasks(keep_view);
692            },
693            HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) => {
694                // Only update if the qc is from a newer view
695                let current_next_epoch_qc =
696                    self.consensus.read().await.next_epoch_high_qc().cloned();
697                ensure!(
698                    current_next_epoch_qc.is_none()
699                        || next_epoch_qc.view_number > current_next_epoch_qc.unwrap().view_number,
700                    debug!(
701                        "Received a next epoch QC for a view that was not > than our current next \
702                         epoch high QC"
703                    )
704                );
705
706                self.formed_next_epoch_quorum_certificates
707                    .insert(next_epoch_qc.view_number(), next_epoch_qc.clone());
708
709                handle_eqc_formed(
710                    next_epoch_qc.view_number(),
711                    next_epoch_qc.data.leaf_commit,
712                    next_epoch_qc.data.block_number,
713                    self,
714                    &event_sender,
715                )
716                .await;
717
718                let view_number = next_epoch_qc.view_number() + 1;
719                self.create_dependency_task_if_new(
720                    view_number,
721                    epoch_number,
722                    event_receiver,
723                    event_sender,
724                    Arc::clone(&event),
725                    epoch_transition_indicator,
726                )
727                .await?;
728            },
729            HotShotEvent::SetFirstEpoch(view, epoch) => {
730                self.first_epoch = Some((*view, *epoch));
731            },
732            _ => {},
733        }
734        Ok(())
735    }
736
737    /// Cancel all tasks the consensus tasks has spawned before the given view
738    pub fn cancel_tasks(&mut self, view: ViewNumber) {
739        let keep = self.proposal_dependencies.split_off(&view);
740        while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
741            if !cancel_sender.is_closed() {
742                tracing::debug!("Aborting proposal dependency task for view {view}");
743                let _ = cancel_sender.try_broadcast(());
744            }
745        }
746        self.proposal_dependencies = keep;
747    }
748}
749
750#[async_trait]
751impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState
752    for QuorumProposalTaskState<TYPES, I>
753{
754    type Event = HotShotEvent<TYPES>;
755
756    async fn handle_event(
757        &mut self,
758        event: Arc<Self::Event>,
759        sender: &Sender<Arc<Self::Event>>,
760        receiver: &Receiver<Arc<Self::Event>>,
761    ) -> Result<()> {
762        if self
763            .upgrade_lock
764            .new_protocol_active(self.latest_proposed_view)
765        {
766            return Ok(());
767        }
768        self.handle(event, receiver.clone(), sender.clone()).await
769    }
770
771    fn cancel_subtasks(&mut self) {
772        while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
773            if !cancel_sender.is_closed() {
774                tracing::debug!("Aborting proposal dependency task for view {view}");
775                let _ = cancel_sender.try_broadcast(());
776            }
777        }
778    }
779}