Skip to main content

hotshot_task_impls/quorum_vote/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{InactiveReceiver, Receiver, Sender, broadcast};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::{
13    dependency::{AndDependency, EventDependency},
14    dependency_task::{DependencyTask, HandleDepOutput},
15    task::TaskState,
16};
17use hotshot_types::{
18    VersionedDaCommittee,
19    consensus::{ConsensusMetricsValue, OuterConsensus},
20    data::{EpochNumber, Leaf2, ViewNumber, vid_disperse::vid_total_weight},
21    epoch_membership::EpochMembershipCoordinator,
22    event::Event,
23    message::UpgradeLock,
24    simple_vote::HasEpoch,
25    stake_table::StakeTableEntries,
26    storage_metrics::StorageMetricsValue,
27    traits::{
28        block_contents::BlockHeader,
29        node_implementation::{NodeImplementation, NodeType},
30        signature_key::{SignatureKey, StateSignatureKey},
31        storage::Storage,
32    },
33    utils::{is_epoch_root, is_epoch_transition, is_last_block, option_epoch_from_block_number},
34    vote::{Certificate, HasViewNumber},
35};
36use hotshot_utils::anytrace::*;
37use tracing::instrument;
38
39use crate::{
40    events::HotShotEvent,
41    helpers::{broadcast_event, broadcast_view_change, wait_for_second_vid_share},
42    quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state},
43};
44
45/// Event handlers for `QuorumProposalValidated`.
46mod handlers;
47
48/// Vote dependency types.
49#[derive(Debug, PartialEq)]
50enum VoteDependency {
51    /// For the `QuorumProposalValidated` event after validating `QuorumProposalRecv`.
52    QuorumProposal,
53    /// For the `DaCertificateRecv` event.
54    Dac,
55    /// For the `VidShareRecv` event.
56    Vid,
57}
58
59/// Handler for the vote dependency.
60pub struct VoteDependencyHandle<TYPES: NodeType, I: NodeImplementation<TYPES>> {
61    /// Public key.
62    pub public_key: TYPES::SignatureKey,
63
64    /// Private Key.
65    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
66
67    /// Reference to consensus. The replica will require a write lock on this.
68    pub consensus: OuterConsensus<TYPES>,
69
70    /// Immutable instance state
71    pub instance_state: Arc<TYPES::InstanceState>,
72
73    /// Membership for Quorum certs/votes.
74    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
75
76    /// Reference to the storage.
77    pub storage: I::Storage,
78
79    /// Storage metrics
80    pub storage_metrics: Arc<StorageMetricsValue>,
81
82    /// View number to vote on.
83    pub view_number: ViewNumber,
84
85    /// Event sender.
86    pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
87
88    /// Event receiver.
89    pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
90
91    /// Lock for a decided upgrade
92    pub upgrade_lock: UpgradeLock<TYPES>,
93
94    /// The consensus metrics
95    pub consensus_metrics: Arc<ConsensusMetricsValue>,
96
97    /// The node's id
98    pub id: u64,
99
100    /// Number of blocks in an epoch, zero means there are no epochs
101    pub epoch_height: u64,
102
103    /// Signature key for light client state
104    pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106    /// First view in which epoch version takes effect
107    pub first_epoch: Option<(ViewNumber, EpochNumber)>,
108
109    /// Stake table capacity for light client use
110    pub stake_table_capacity: usize,
111
112    pub cancel_receiver: Receiver<()>,
113}
114
115impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static> HandleDepOutput
116    for VoteDependencyHandle<TYPES, I>
117{
118    type Output = Vec<Arc<HotShotEvent<TYPES>>>;
119
120    #[allow(clippy::too_many_lines)]
121    #[instrument(skip_all, fields(id = self.id, view = *self.view_number))]
122    async fn handle_dep_result(self, res: Self::Output) {
123        let mut cancel_receiver = self.cancel_receiver.clone();
124        let result = tokio::select! { result = self.handle_vote_deps(&res) => {
125            result
126        }
127        _ = cancel_receiver.recv() => {
128            tracing::warn!("Vote dependency task cancelled");
129            return;
130        }
131        };
132        if result.is_err() {
133            log!(result);
134            self.print_vote_events(&res)
135        }
136    }
137}
138
139impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static> VoteDependencyHandle<TYPES, I> {
140    fn print_vote_events(&self, res: &[Arc<HotShotEvent<TYPES>>]) {
141        let events: Vec<_> = res.iter().map(Arc::as_ref).collect();
142        tracing::warn!("Failed to vote, events: {:?}", events);
143    }
144
145    async fn handle_vote_deps(&self, res: &[Arc<HotShotEvent<TYPES>>]) -> Result<()> {
146        let mut payload_commitment = None;
147        let mut next_epoch_payload_commitment = None;
148        let mut leaf = None;
149        let mut vid_share = None;
150        let mut da_cert = None;
151        let mut parent_view_number = None;
152        for event in res.iter() {
153            match event.as_ref() {
154                #[allow(unused_assignments)]
155                HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
156                    let proposal_payload_comm = proposal.data.block_header().payload_commitment();
157                    let parent_commitment = parent_leaf.commit();
158                    let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
159
160                    if let Some(ref comm) = payload_commitment {
161                        ensure!(
162                            proposal_payload_comm == *comm,
163                            error!(
164                                "Quorum proposal has inconsistent payload commitment with DAC or \
165                                 VID."
166                            )
167                        );
168                    } else {
169                        payload_commitment = Some(proposal_payload_comm);
170                    }
171
172                    ensure!(
173                        proposed_leaf.parent_commitment() == parent_commitment,
174                        warn!(
175                            "Proposed leaf parent commitment does not match parent leaf payload \
176                             commitment. Aborting vote."
177                        )
178                    );
179
180                    let now = Instant::now();
181                    // Update our persistent storage of the proposal. If we cannot store the proposal return
182                    // and error so we don't vote
183                    self.storage
184                        .append_proposal_wrapper(proposal)
185                        .await
186                        .map_err(|e| {
187                            error!("failed to store proposal, not voting.  error = {e:#}")
188                        })?;
189                    self.storage_metrics
190                        .append_quorum_duration
191                        .add_point(now.elapsed().as_secs_f64());
192
193                    leaf = Some(proposed_leaf);
194                    parent_view_number = Some(parent_leaf.view_number());
195                },
196                HotShotEvent::DaCertificateValidated(cert) => {
197                    let cert_payload_comm = &cert.data().payload_commit;
198                    let next_epoch_cert_payload_comm = cert.data().next_epoch_payload_commit;
199                    if let Some(ref comm) = payload_commitment {
200                        ensure!(
201                            cert_payload_comm == comm,
202                            error!(
203                                "DAC has inconsistent payload commitment with quorum proposal or \
204                                 VID."
205                            )
206                        );
207                    } else {
208                        payload_commitment = Some(*cert_payload_comm);
209                    }
210                    if next_epoch_payload_commitment.is_some()
211                        && next_epoch_payload_commitment != next_epoch_cert_payload_comm
212                    {
213                        bail!(error!(
214                            "DAC has inconsistent next epoch payload commitment with VID."
215                        ));
216                    } else {
217                        next_epoch_payload_commitment = next_epoch_cert_payload_comm;
218                    }
219                    da_cert = Some(cert.clone());
220                },
221                HotShotEvent::VidShareValidated(share) => {
222                    let vid_payload_commitment = &share.data.payload_commitment();
223                    vid_share = Some(share.clone());
224                    let is_next_epoch_vid = share.data.epoch() != share.data.target_epoch();
225                    if is_next_epoch_vid {
226                        if let Some(ref comm) = next_epoch_payload_commitment {
227                            ensure!(
228                                vid_payload_commitment == comm,
229                                error!(
230                                    "VID has inconsistent next epoch payload commitment with DAC."
231                                )
232                            );
233                        } else {
234                            next_epoch_payload_commitment = Some(*vid_payload_commitment);
235                        }
236                    } else if let Some(ref comm) = payload_commitment {
237                        ensure!(
238                            vid_payload_commitment == comm,
239                            error!(
240                                "VID has inconsistent payload commitment with quorum proposal or \
241                                 DAC."
242                            )
243                        );
244                    } else {
245                        payload_commitment = Some(*vid_payload_commitment);
246                    }
247                },
248                _ => {},
249            }
250        }
251
252        let Some(vid_share) = vid_share else {
253            bail!(error!(
254                "We don't have the VID share for this view {}, but we should, because the vote \
255                 dependencies have completed.",
256                self.view_number
257            ));
258        };
259
260        let Some(leaf) = leaf else {
261            bail!(error!(
262                "We don't have the leaf for this view {}, but we should, because the vote \
263                 dependencies have completed.",
264                self.view_number
265            ));
266        };
267
268        let Some(da_cert) = da_cert else {
269            bail!(error!(
270                "We don't have the DA cert for this view {}, but we should, because the vote \
271                 dependencies have completed.",
272                self.view_number
273            ));
274        };
275
276        let mut maybe_current_epoch_vid_share = None;
277        // If this is an epoch transition block, we might need two VID shares.
278        if self.upgrade_lock.epochs_enabled(leaf.view_number())
279            && is_epoch_transition(leaf.block_header().block_number(), self.epoch_height)
280        {
281            let current_epoch = option_epoch_from_block_number(
282                leaf.with_epoch,
283                leaf.block_header().block_number(),
284                self.epoch_height,
285            );
286            let next_epoch = current_epoch.map(|e| e + 1);
287
288            let Ok(current_epoch_membership) = self
289                .membership_coordinator
290                .stake_table_for_epoch(current_epoch)
291            else {
292                bail!(warn!(
293                    "Couldn't acquire current epoch membership. Do not vote!"
294                ));
295            };
296            let Ok(next_epoch_membership) = self
297                .membership_coordinator
298                .stake_table_for_epoch(next_epoch)
299            else {
300                bail!(warn!(
301                    "Couldn't acquire next epoch membership. Do not vote!"
302                ));
303            };
304
305            // If we belong to both epochs, we require VID shares from both epochs.
306            if current_epoch_membership.has_stake(&self.public_key)
307                && next_epoch_membership.has_stake(&self.public_key)
308            {
309                let other_target_epoch = if vid_share.data.target_epoch() == current_epoch {
310                    maybe_current_epoch_vid_share = Some(vid_share.clone());
311                    next_epoch
312                } else {
313                    current_epoch
314                };
315                match wait_for_second_vid_share(
316                    other_target_epoch,
317                    &vid_share,
318                    &da_cert,
319                    &self.consensus,
320                    &self.receiver.activate_cloned(),
321                    self.cancel_receiver.clone(),
322                    self.id,
323                )
324                .await
325                {
326                    Ok(other_vid_share) => {
327                        if maybe_current_epoch_vid_share.is_none() {
328                            maybe_current_epoch_vid_share = Some(other_vid_share);
329                        }
330                        ensure!(
331                            leaf.block_header().payload_commitment()
332                                == maybe_current_epoch_vid_share
333                                    .as_ref()
334                                    .unwrap()
335                                    .data
336                                    .payload_commitment(),
337                            error!(
338                                "We have both epochs vid shares but the leaf's vid commit doesn't \
339                                 match the old epoch vid share's commit. It should never happen."
340                            )
341                        );
342                    },
343                    Err(e) => {
344                        bail!(warn!(
345                            "This is an epoch transition block, we are in both epochs but we \
346                             received only one VID share. Do not vote! Error: {e:?}"
347                        ));
348                    },
349                }
350            }
351        }
352
353        // Update internal state
354        update_shared_state::<TYPES>(
355            OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
356            self.sender.clone(),
357            self.receiver.clone(),
358            self.membership_coordinator.clone(),
359            self.public_key.clone(),
360            self.private_key.clone(),
361            self.upgrade_lock.clone(),
362            self.view_number,
363            Arc::clone(&self.instance_state),
364            &leaf,
365            maybe_current_epoch_vid_share.as_ref().unwrap_or(&vid_share),
366            parent_view_number,
367            self.epoch_height,
368        )
369        .await
370        .context(error!("Failed to update shared consensus state"))?;
371
372        let cur_epoch =
373            option_epoch_from_block_number(leaf.with_epoch, leaf.height(), self.epoch_height);
374
375        let now = Instant::now();
376        // We use this `epoch_membership` to vote,
377        // meaning that we must know the leader for the current view in the current epoch
378        // and must therefore perform the full DRB catchup.
379        let epoch_membership = self
380            .membership_coordinator
381            .membership_for_epoch(cur_epoch)?;
382
383        let duration = now.elapsed();
384        tracing::info!("membership_for_epoch time: {duration:?}");
385
386        let is_vote_leaf_extended = is_last_block(leaf.height(), self.epoch_height);
387        let is_vote_epoch_root = is_epoch_root(leaf.height(), self.epoch_height);
388        if cur_epoch.is_none() || !is_vote_leaf_extended {
389            // We're voting for the proposal that will probably form the eQC. We don't want to change
390            // the view here because we will probably change it when we form the eQC.
391            // The main reason is to handle view change event only once in the transaction task.
392            broadcast_view_change(
393                &self.sender,
394                leaf.view_number() + 1,
395                cur_epoch,
396                self.first_epoch,
397            )
398            .await;
399        }
400
401        let leader = epoch_membership.leader(self.view_number);
402        if let (Ok(leader_key), Some(cur_epoch)) = (leader, cur_epoch) {
403            self.consensus
404                .write()
405                .await
406                .update_validator_participation(leader_key, cur_epoch, true);
407        }
408
409        submit_vote::<TYPES, I>(
410            self.sender.clone(),
411            epoch_membership,
412            self.public_key.clone(),
413            self.private_key.clone(),
414            self.upgrade_lock.clone(),
415            self.view_number,
416            self.storage.clone(),
417            Arc::clone(&self.storage_metrics),
418            leaf,
419            maybe_current_epoch_vid_share.unwrap_or(vid_share),
420            is_vote_leaf_extended,
421            is_vote_epoch_root,
422            self.epoch_height,
423            &self.state_private_key,
424            self.stake_table_capacity,
425        )
426        .await
427    }
428}
429
430/// The state for the quorum vote task.
431///
432/// Contains all of the information for the quorum vote.
433pub struct QuorumVoteTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
434    /// Public key.
435    pub public_key: TYPES::SignatureKey,
436
437    /// Private Key.
438    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
439
440    /// Reference to consensus. The replica will require a write lock on this.
441    pub consensus: OuterConsensus<TYPES>,
442
443    /// Immutable instance state
444    pub instance_state: Arc<TYPES::InstanceState>,
445
446    /// Latest view number that has been voted for.
447    pub latest_voted_view: ViewNumber,
448
449    /// Table for the in-progress dependency tasks.
450    pub vote_dependencies: BTreeMap<ViewNumber, Sender<()>>,
451
452    /// The underlying network
453    pub network: Arc<I::Network>,
454
455    /// Membership for Quorum certs/votes and DA committee certs/votes.
456    pub membership: EpochMembershipCoordinator<TYPES>,
457
458    /// Output events to application
459    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
460
461    /// The node's id
462    pub id: u64,
463
464    /// The consensus metrics
465    pub consensus_metrics: Arc<ConsensusMetricsValue>,
466
467    /// Reference to the storage.
468    pub storage: I::Storage,
469
470    /// Storage metrics
471    pub storage_metrics: Arc<StorageMetricsValue>,
472
473    /// Lock for a decided upgrade
474    pub upgrade_lock: UpgradeLock<TYPES>,
475
476    /// Number of blocks in an epoch, zero means there are no epochs
477    pub epoch_height: u64,
478
479    /// Signature key for light client state
480    pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
481
482    /// First view in which epoch version takes effect
483    pub first_epoch: Option<(ViewNumber, EpochNumber)>,
484
485    /// Stake table capacity for light client use
486    pub stake_table_capacity: usize,
487
488    /// DA committees from HotShotConfig, to apply when an upgrade is decided
489    pub da_committees: Vec<VersionedDaCommittee<TYPES>>,
490}
491
492impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumVoteTaskState<TYPES, I> {
493    /// Create an event dependency.
494    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create event dependency", level = "error")]
495    fn create_event_dependency(
496        &self,
497        dependency_type: VoteDependency,
498        view_number: ViewNumber,
499        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
500        cancel_receiver: Receiver<()>,
501    ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
502        let id = self.id;
503        EventDependency::new(
504            event_receiver,
505            cancel_receiver,
506            format!(
507                "VoteDependency::{:?} for view {:?}, my id {:?}",
508                dependency_type, view_number, self.id
509            ),
510            Box::new(move |event| {
511                let event = event.as_ref();
512                let event_view = match dependency_type {
513                    VoteDependency::QuorumProposal => {
514                        if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
515                            proposal.data.view_number()
516                        } else {
517                            return false;
518                        }
519                    },
520                    VoteDependency::Dac => {
521                        if let HotShotEvent::DaCertificateValidated(cert) = event {
522                            cert.view_number
523                        } else {
524                            return false;
525                        }
526                    },
527                    VoteDependency::Vid => {
528                        if let HotShotEvent::VidShareValidated(disperse) = event {
529                            disperse.data.view_number()
530                        } else {
531                            return false;
532                        }
533                    },
534                };
535                if event_view == view_number {
536                    tracing::debug!(
537                        "Vote dependency {dependency_type:?} completed for view {view_number}, my \
538                         id is {id}"
539                    );
540                    return true;
541                }
542                false
543            }),
544        )
545    }
546
547    /// Create and store an [`AndDependency`] combining [`EventDependency`]s associated with the
548    /// given view number if it doesn't exist.
549    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create dependency task if new", level = "error")]
550    fn create_dependency_task_if_new(
551        &mut self,
552        view_number: ViewNumber,
553        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
554        event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
555        event: Arc<HotShotEvent<TYPES>>,
556    ) {
557        tracing::debug!(
558            "Attempting to make dependency task for view {view_number} and event {event:?}"
559        );
560
561        if self.vote_dependencies.contains_key(&view_number) {
562            return;
563        }
564
565        let (cancel_sender, cancel_receiver) = broadcast(1);
566
567        let mut quorum_proposal_dependency = self.create_event_dependency(
568            VoteDependency::QuorumProposal,
569            view_number,
570            event_receiver.clone(),
571            cancel_receiver.clone(),
572        );
573        let dac_dependency = self.create_event_dependency(
574            VoteDependency::Dac,
575            view_number,
576            event_receiver.clone(),
577            cancel_receiver.clone(),
578        );
579        let vid_dependency = self.create_event_dependency(
580            VoteDependency::Vid,
581            view_number,
582            event_receiver.clone(),
583            cancel_receiver.clone(),
584        );
585        // If we have an event provided to us
586        if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() {
587            quorum_proposal_dependency.mark_as_completed(event);
588        }
589
590        let deps = vec![quorum_proposal_dependency, dac_dependency, vid_dependency];
591
592        let dependency_chain = AndDependency::from_deps(deps);
593
594        let dependency_task = DependencyTask::new(
595            dependency_chain,
596            VoteDependencyHandle::<TYPES, I> {
597                public_key: self.public_key.clone(),
598                private_key: self.private_key.clone(),
599                consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
600                instance_state: Arc::clone(&self.instance_state),
601                membership_coordinator: self.membership.clone(),
602                storage: self.storage.clone(),
603                storage_metrics: Arc::clone(&self.storage_metrics),
604                view_number,
605                sender: event_sender.clone(),
606                receiver: event_receiver.clone().deactivate(),
607                upgrade_lock: self.upgrade_lock.clone(),
608                id: self.id,
609                epoch_height: self.epoch_height,
610                consensus_metrics: Arc::clone(&self.consensus_metrics),
611                state_private_key: self.state_private_key.clone(),
612                first_epoch: self.first_epoch,
613                stake_table_capacity: self.stake_table_capacity,
614                cancel_receiver,
615            },
616        );
617        self.vote_dependencies.insert(view_number, cancel_sender);
618
619        dependency_task.run();
620    }
621
622    /// Update the latest voted view number.
623    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote update latest voted view", level = "error")]
624    async fn update_latest_voted_view(&mut self, new_view: ViewNumber) -> bool {
625        if *self.latest_voted_view < *new_view {
626            tracing::debug!(
627                "Updating next vote view from {} to {} in the quorum vote task",
628                *self.latest_voted_view,
629                *new_view
630            );
631
632            // Cancel the old dependency tasks.
633            for view in *self.latest_voted_view..(*new_view) {
634                let maybe_cancel_sender = self.vote_dependencies.remove(&ViewNumber::new(view));
635                if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
636                    tracing::warn!("Aborting vote dependency task for view {view}");
637                    let _ = maybe_cancel_sender.unwrap().try_broadcast(());
638                }
639            }
640
641            // Update the metric for the last voted view
642            if let Ok(last_voted_view_usize) = usize::try_from(*new_view) {
643                self.consensus_metrics
644                    .last_voted_view
645                    .set(last_voted_view_usize);
646            } else {
647                tracing::warn!("Failed to convert last voted view to a usize: {new_view}");
648            }
649
650            self.latest_voted_view = new_view;
651
652            return true;
653        }
654        false
655    }
656
657    /// Handle a vote dependent event received on the event stream
658    #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote handle", level = "error", target = "QuorumVoteTaskState")]
659    pub async fn handle(
660        &mut self,
661        event: Arc<HotShotEvent<TYPES>>,
662        event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
663        event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
664    ) -> Result<()> {
665        match event.as_ref() {
666            HotShotEvent::QuorumProposalValidated(proposal, _parent_leaf) => {
667                tracing::trace!(
668                    "Received Proposal for view {}",
669                    *proposal.data.view_number()
670                );
671
672                // Handle the event before creating the dependency task.
673                if let Err(e) =
674                    handle_quorum_proposal_validated(&proposal.data, self, &event_sender).await
675                {
676                    tracing::debug!(
677                        "Failed to handle QuorumProposalValidated event; error = {e:#}"
678                    );
679                }
680
681                ensure!(
682                    proposal.data.view_number() > self.latest_voted_view,
683                    "We have already voted for this view"
684                );
685
686                self.create_dependency_task_if_new(
687                    proposal.data.view_number(),
688                    event_receiver,
689                    &event_sender,
690                    Arc::clone(&event),
691                );
692            },
693            HotShotEvent::DaCertificateRecv(cert) => {
694                let view = cert.view_number;
695
696                tracing::trace!("Received DAC for view {view}");
697                // Do nothing if the DAC is old
698                ensure!(
699                    view > self.latest_voted_view,
700                    "Received DAC for an older view."
701                );
702
703                let cert_epoch = cert.data.epoch;
704
705                let epoch_membership = self.membership.stake_table_for_epoch(cert_epoch)?;
706                let membership_da_stake_table =
707                    StakeTableEntries::from_iter(epoch_membership.da_stake_table()).0;
708                let membership_da_success_threshold = epoch_membership.da_success_threshold();
709
710                // Validate the DAC.
711                cert.is_valid_cert(
712                    &membership_da_stake_table,
713                    membership_da_success_threshold,
714                    &self.upgrade_lock,
715                )
716                .context(|e| warn!("Invalid DAC: {e}"))?;
717
718                // Add to the storage.
719                self.consensus
720                    .write()
721                    .await
722                    .update_saved_da_certs(view, cert.clone());
723
724                broadcast_event(
725                    Arc::new(HotShotEvent::DaCertificateValidated(cert.clone())),
726                    &event_sender.clone(),
727                )
728                .await;
729                self.create_dependency_task_if_new(
730                    view,
731                    event_receiver,
732                    &event_sender,
733                    Arc::clone(&event),
734                );
735            },
736            HotShotEvent::VidShareRecv(sender, share) => {
737                let view = share.data.view_number();
738                // Do nothing if the VID share is old
739                tracing::trace!("Received VID share for view {view}");
740                ensure!(
741                    view > self.latest_voted_view,
742                    "Received VID share for an older view."
743                );
744
745                // Validate the VID share.
746                let payload_commitment = share.data.payload_commitment_ref();
747
748                // Check that the signature is valid
749                ensure!(
750                    sender.validate(&share.signature, payload_commitment.as_ref()),
751                    warn!(
752                        "VID share signature is invalid, sender: {}, signature: {:?}, \
753                         payload_commitment: {:?}",
754                        sender, share.signature, payload_commitment
755                    )
756                );
757
758                let vid_epoch = share.data.epoch();
759                let target_epoch = share.data.target_epoch();
760                let membership_reader = self.membership.membership_for_epoch(vid_epoch)?;
761                // ensure that the VID share was sent by a DA member OR the view leader
762                ensure!(
763                    membership_reader
764                        .da_committee_members(view)
765                        .any(|k| k == sender)
766                        || *sender == membership_reader.leader(view)?,
767                    "VID share was not sent by a DA member or the view leader."
768                );
769
770                let membership = self.membership.membership_for_epoch(target_epoch)?;
771                let total_weight = vid_total_weight(membership.stake_table(), target_epoch);
772
773                if !share.data.verify(total_weight) {
774                    bail!("Failed to verify VID share");
775                }
776
777                self.consensus
778                    .write()
779                    .await
780                    .update_vid_shares(view, share.clone());
781
782                ensure!(
783                    *share.data.recipient_key() == self.public_key,
784                    "Got a Valid VID share but it's not for our key"
785                );
786
787                broadcast_event(
788                    Arc::new(HotShotEvent::VidShareValidated(share.clone())),
789                    &event_sender.clone(),
790                )
791                .await;
792                self.create_dependency_task_if_new(
793                    view,
794                    event_receiver,
795                    &event_sender,
796                    Arc::clone(&event),
797                );
798            },
799            HotShotEvent::Timeout(view, ..) => {
800                let view = ViewNumber::new(view.saturating_sub(1));
801                // cancel old tasks
802                let current_tasks = self.vote_dependencies.split_off(&view);
803                while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
804                    if !cancel_sender.is_closed() {
805                        tracing::error!("Aborting vote dependency task for view {view}");
806                        let _ = cancel_sender.try_broadcast(());
807                    }
808                }
809                self.vote_dependencies = current_tasks;
810            },
811            &HotShotEvent::ViewChange(mut view, _) => {
812                view = ViewNumber::new(view.saturating_sub(1));
813                if !self.update_latest_voted_view(view).await {
814                    tracing::debug!("view not updated");
815                }
816                // cancel old tasks
817                let current_tasks = self.vote_dependencies.split_off(&view);
818                while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
819                    if !cancel_sender.is_closed() {
820                        tracing::error!("Aborting vote dependency task for view {view}");
821                        let _ = cancel_sender.try_broadcast(());
822                    }
823                }
824                self.vote_dependencies = current_tasks;
825            },
826            HotShotEvent::SetFirstEpoch(view, epoch) => {
827                self.first_epoch = Some((*view, *epoch));
828            },
829            _ => {},
830        }
831        Ok(())
832    }
833}
834
835#[async_trait]
836impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for QuorumVoteTaskState<TYPES, I> {
837    type Event = HotShotEvent<TYPES>;
838
839    async fn handle_event(
840        &mut self,
841        event: Arc<Self::Event>,
842        sender: &Sender<Arc<Self::Event>>,
843        receiver: &Receiver<Arc<Self::Event>>,
844    ) -> Result<()> {
845        if self
846            .upgrade_lock
847            .new_protocol_active(self.latest_voted_view)
848        {
849            return Ok(());
850        }
851        self.handle(event, receiver.clone(), sender.clone()).await
852    }
853
854    fn cancel_subtasks(&mut self) {
855        while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
856            if !cancel_sender.is_closed() {
857                tracing::error!("Aborting vote dependency task for view {view}");
858                let _ = cancel_sender.try_broadcast(());
859            }
860        }
861    }
862}