1use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{InactiveReceiver, Receiver, Sender, broadcast};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::{
13 dependency::{AndDependency, EventDependency},
14 dependency_task::{DependencyTask, HandleDepOutput},
15 task::TaskState,
16};
17use hotshot_types::{
18 VersionedDaCommittee,
19 consensus::{ConsensusMetricsValue, OuterConsensus},
20 data::{EpochNumber, Leaf2, ViewNumber, vid_disperse::vid_total_weight},
21 epoch_membership::EpochMembershipCoordinator,
22 event::Event,
23 message::UpgradeLock,
24 simple_vote::HasEpoch,
25 stake_table::StakeTableEntries,
26 storage_metrics::StorageMetricsValue,
27 traits::{
28 block_contents::BlockHeader,
29 node_implementation::{NodeImplementation, NodeType},
30 signature_key::{SignatureKey, StateSignatureKey},
31 storage::Storage,
32 },
33 utils::{is_epoch_root, is_epoch_transition, is_last_block, option_epoch_from_block_number},
34 vote::{Certificate, HasViewNumber},
35};
36use hotshot_utils::anytrace::*;
37use tracing::instrument;
38
39use crate::{
40 events::HotShotEvent,
41 helpers::{broadcast_event, broadcast_view_change, wait_for_second_vid_share},
42 quorum_vote::handlers::{handle_quorum_proposal_validated, submit_vote, update_shared_state},
43};
44
45mod handlers;
47
48#[derive(Debug, PartialEq)]
50enum VoteDependency {
51 QuorumProposal,
53 Dac,
55 Vid,
57}
58
59pub struct VoteDependencyHandle<TYPES: NodeType, I: NodeImplementation<TYPES>> {
61 pub public_key: TYPES::SignatureKey,
63
64 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
66
67 pub consensus: OuterConsensus<TYPES>,
69
70 pub instance_state: Arc<TYPES::InstanceState>,
72
73 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
75
76 pub storage: I::Storage,
78
79 pub storage_metrics: Arc<StorageMetricsValue>,
81
82 pub view_number: ViewNumber,
84
85 pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
87
88 pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
90
91 pub upgrade_lock: UpgradeLock<TYPES>,
93
94 pub consensus_metrics: Arc<ConsensusMetricsValue>,
96
97 pub id: u64,
99
100 pub epoch_height: u64,
102
103 pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
105
106 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
108
109 pub stake_table_capacity: usize,
111
112 pub cancel_receiver: Receiver<()>,
113}
114
115impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static> HandleDepOutput
116 for VoteDependencyHandle<TYPES, I>
117{
118 type Output = Vec<Arc<HotShotEvent<TYPES>>>;
119
120 #[allow(clippy::too_many_lines)]
121 #[instrument(skip_all, fields(id = self.id, view = *self.view_number))]
122 async fn handle_dep_result(self, res: Self::Output) {
123 let mut cancel_receiver = self.cancel_receiver.clone();
124 let result = tokio::select! { result = self.handle_vote_deps(&res) => {
125 result
126 }
127 _ = cancel_receiver.recv() => {
128 tracing::warn!("Vote dependency task cancelled");
129 return;
130 }
131 };
132 if result.is_err() {
133 log!(result);
134 self.print_vote_events(&res)
135 }
136 }
137}
138
139impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static> VoteDependencyHandle<TYPES, I> {
140 fn print_vote_events(&self, res: &[Arc<HotShotEvent<TYPES>>]) {
141 let events: Vec<_> = res.iter().map(Arc::as_ref).collect();
142 tracing::warn!("Failed to vote, events: {:?}", events);
143 }
144
145 async fn handle_vote_deps(&self, res: &[Arc<HotShotEvent<TYPES>>]) -> Result<()> {
146 let mut payload_commitment = None;
147 let mut next_epoch_payload_commitment = None;
148 let mut leaf = None;
149 let mut vid_share = None;
150 let mut da_cert = None;
151 let mut parent_view_number = None;
152 for event in res.iter() {
153 match event.as_ref() {
154 #[allow(unused_assignments)]
155 HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
156 let proposal_payload_comm = proposal.data.block_header().payload_commitment();
157 let parent_commitment = parent_leaf.commit();
158 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
159
160 if let Some(ref comm) = payload_commitment {
161 ensure!(
162 proposal_payload_comm == *comm,
163 error!(
164 "Quorum proposal has inconsistent payload commitment with DAC or \
165 VID."
166 )
167 );
168 } else {
169 payload_commitment = Some(proposal_payload_comm);
170 }
171
172 ensure!(
173 proposed_leaf.parent_commitment() == parent_commitment,
174 warn!(
175 "Proposed leaf parent commitment does not match parent leaf payload \
176 commitment. Aborting vote."
177 )
178 );
179
180 let now = Instant::now();
181 self.storage
184 .append_proposal_wrapper(proposal)
185 .await
186 .map_err(|e| {
187 error!("failed to store proposal, not voting. error = {e:#}")
188 })?;
189 self.storage_metrics
190 .append_quorum_duration
191 .add_point(now.elapsed().as_secs_f64());
192
193 leaf = Some(proposed_leaf);
194 parent_view_number = Some(parent_leaf.view_number());
195 },
196 HotShotEvent::DaCertificateValidated(cert) => {
197 let cert_payload_comm = &cert.data().payload_commit;
198 let next_epoch_cert_payload_comm = cert.data().next_epoch_payload_commit;
199 if let Some(ref comm) = payload_commitment {
200 ensure!(
201 cert_payload_comm == comm,
202 error!(
203 "DAC has inconsistent payload commitment with quorum proposal or \
204 VID."
205 )
206 );
207 } else {
208 payload_commitment = Some(*cert_payload_comm);
209 }
210 if next_epoch_payload_commitment.is_some()
211 && next_epoch_payload_commitment != next_epoch_cert_payload_comm
212 {
213 bail!(error!(
214 "DAC has inconsistent next epoch payload commitment with VID."
215 ));
216 } else {
217 next_epoch_payload_commitment = next_epoch_cert_payload_comm;
218 }
219 da_cert = Some(cert.clone());
220 },
221 HotShotEvent::VidShareValidated(share) => {
222 let vid_payload_commitment = &share.data.payload_commitment();
223 vid_share = Some(share.clone());
224 let is_next_epoch_vid = share.data.epoch() != share.data.target_epoch();
225 if is_next_epoch_vid {
226 if let Some(ref comm) = next_epoch_payload_commitment {
227 ensure!(
228 vid_payload_commitment == comm,
229 error!(
230 "VID has inconsistent next epoch payload commitment with DAC."
231 )
232 );
233 } else {
234 next_epoch_payload_commitment = Some(*vid_payload_commitment);
235 }
236 } else if let Some(ref comm) = payload_commitment {
237 ensure!(
238 vid_payload_commitment == comm,
239 error!(
240 "VID has inconsistent payload commitment with quorum proposal or \
241 DAC."
242 )
243 );
244 } else {
245 payload_commitment = Some(*vid_payload_commitment);
246 }
247 },
248 _ => {},
249 }
250 }
251
252 let Some(vid_share) = vid_share else {
253 bail!(error!(
254 "We don't have the VID share for this view {}, but we should, because the vote \
255 dependencies have completed.",
256 self.view_number
257 ));
258 };
259
260 let Some(leaf) = leaf else {
261 bail!(error!(
262 "We don't have the leaf for this view {}, but we should, because the vote \
263 dependencies have completed.",
264 self.view_number
265 ));
266 };
267
268 let Some(da_cert) = da_cert else {
269 bail!(error!(
270 "We don't have the DA cert for this view {}, but we should, because the vote \
271 dependencies have completed.",
272 self.view_number
273 ));
274 };
275
276 let mut maybe_current_epoch_vid_share = None;
277 if self.upgrade_lock.epochs_enabled(leaf.view_number())
279 && is_epoch_transition(leaf.block_header().block_number(), self.epoch_height)
280 {
281 let current_epoch = option_epoch_from_block_number(
282 leaf.with_epoch,
283 leaf.block_header().block_number(),
284 self.epoch_height,
285 );
286 let next_epoch = current_epoch.map(|e| e + 1);
287
288 let Ok(current_epoch_membership) = self
289 .membership_coordinator
290 .stake_table_for_epoch(current_epoch)
291 else {
292 bail!(warn!(
293 "Couldn't acquire current epoch membership. Do not vote!"
294 ));
295 };
296 let Ok(next_epoch_membership) = self
297 .membership_coordinator
298 .stake_table_for_epoch(next_epoch)
299 else {
300 bail!(warn!(
301 "Couldn't acquire next epoch membership. Do not vote!"
302 ));
303 };
304
305 if current_epoch_membership.has_stake(&self.public_key)
307 && next_epoch_membership.has_stake(&self.public_key)
308 {
309 let other_target_epoch = if vid_share.data.target_epoch() == current_epoch {
310 maybe_current_epoch_vid_share = Some(vid_share.clone());
311 next_epoch
312 } else {
313 current_epoch
314 };
315 match wait_for_second_vid_share(
316 other_target_epoch,
317 &vid_share,
318 &da_cert,
319 &self.consensus,
320 &self.receiver.activate_cloned(),
321 self.cancel_receiver.clone(),
322 self.id,
323 )
324 .await
325 {
326 Ok(other_vid_share) => {
327 if maybe_current_epoch_vid_share.is_none() {
328 maybe_current_epoch_vid_share = Some(other_vid_share);
329 }
330 ensure!(
331 leaf.block_header().payload_commitment()
332 == maybe_current_epoch_vid_share
333 .as_ref()
334 .unwrap()
335 .data
336 .payload_commitment(),
337 error!(
338 "We have both epochs vid shares but the leaf's vid commit doesn't \
339 match the old epoch vid share's commit. It should never happen."
340 )
341 );
342 },
343 Err(e) => {
344 bail!(warn!(
345 "This is an epoch transition block, we are in both epochs but we \
346 received only one VID share. Do not vote! Error: {e:?}"
347 ));
348 },
349 }
350 }
351 }
352
353 update_shared_state::<TYPES>(
355 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
356 self.sender.clone(),
357 self.receiver.clone(),
358 self.membership_coordinator.clone(),
359 self.public_key.clone(),
360 self.private_key.clone(),
361 self.upgrade_lock.clone(),
362 self.view_number,
363 Arc::clone(&self.instance_state),
364 &leaf,
365 maybe_current_epoch_vid_share.as_ref().unwrap_or(&vid_share),
366 parent_view_number,
367 self.epoch_height,
368 )
369 .await
370 .context(error!("Failed to update shared consensus state"))?;
371
372 let cur_epoch =
373 option_epoch_from_block_number(leaf.with_epoch, leaf.height(), self.epoch_height);
374
375 let now = Instant::now();
376 let epoch_membership = self
380 .membership_coordinator
381 .membership_for_epoch(cur_epoch)?;
382
383 let duration = now.elapsed();
384 tracing::info!("membership_for_epoch time: {duration:?}");
385
386 let is_vote_leaf_extended = is_last_block(leaf.height(), self.epoch_height);
387 let is_vote_epoch_root = is_epoch_root(leaf.height(), self.epoch_height);
388 if cur_epoch.is_none() || !is_vote_leaf_extended {
389 broadcast_view_change(
393 &self.sender,
394 leaf.view_number() + 1,
395 cur_epoch,
396 self.first_epoch,
397 )
398 .await;
399 }
400
401 let leader = epoch_membership.leader(self.view_number);
402 if let (Ok(leader_key), Some(cur_epoch)) = (leader, cur_epoch) {
403 self.consensus
404 .write()
405 .await
406 .update_validator_participation(leader_key, cur_epoch, true);
407 }
408
409 submit_vote::<TYPES, I>(
410 self.sender.clone(),
411 epoch_membership,
412 self.public_key.clone(),
413 self.private_key.clone(),
414 self.upgrade_lock.clone(),
415 self.view_number,
416 self.storage.clone(),
417 Arc::clone(&self.storage_metrics),
418 leaf,
419 maybe_current_epoch_vid_share.unwrap_or(vid_share),
420 is_vote_leaf_extended,
421 is_vote_epoch_root,
422 self.epoch_height,
423 &self.state_private_key,
424 self.stake_table_capacity,
425 )
426 .await
427 }
428}
429
430pub struct QuorumVoteTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
434 pub public_key: TYPES::SignatureKey,
436
437 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
439
440 pub consensus: OuterConsensus<TYPES>,
442
443 pub instance_state: Arc<TYPES::InstanceState>,
445
446 pub latest_voted_view: ViewNumber,
448
449 pub vote_dependencies: BTreeMap<ViewNumber, Sender<()>>,
451
452 pub network: Arc<I::Network>,
454
455 pub membership: EpochMembershipCoordinator<TYPES>,
457
458 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
460
461 pub id: u64,
463
464 pub consensus_metrics: Arc<ConsensusMetricsValue>,
466
467 pub storage: I::Storage,
469
470 pub storage_metrics: Arc<StorageMetricsValue>,
472
473 pub upgrade_lock: UpgradeLock<TYPES>,
475
476 pub epoch_height: u64,
478
479 pub state_private_key: <TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
481
482 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
484
485 pub stake_table_capacity: usize,
487
488 pub da_committees: Vec<VersionedDaCommittee<TYPES>>,
490}
491
492impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumVoteTaskState<TYPES, I> {
493 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create event dependency", level = "error")]
495 fn create_event_dependency(
496 &self,
497 dependency_type: VoteDependency,
498 view_number: ViewNumber,
499 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
500 cancel_receiver: Receiver<()>,
501 ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
502 let id = self.id;
503 EventDependency::new(
504 event_receiver,
505 cancel_receiver,
506 format!(
507 "VoteDependency::{:?} for view {:?}, my id {:?}",
508 dependency_type, view_number, self.id
509 ),
510 Box::new(move |event| {
511 let event = event.as_ref();
512 let event_view = match dependency_type {
513 VoteDependency::QuorumProposal => {
514 if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
515 proposal.data.view_number()
516 } else {
517 return false;
518 }
519 },
520 VoteDependency::Dac => {
521 if let HotShotEvent::DaCertificateValidated(cert) = event {
522 cert.view_number
523 } else {
524 return false;
525 }
526 },
527 VoteDependency::Vid => {
528 if let HotShotEvent::VidShareValidated(disperse) = event {
529 disperse.data.view_number()
530 } else {
531 return false;
532 }
533 },
534 };
535 if event_view == view_number {
536 tracing::debug!(
537 "Vote dependency {dependency_type:?} completed for view {view_number}, my \
538 id is {id}"
539 );
540 return true;
541 }
542 false
543 }),
544 )
545 }
546
547 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote create dependency task if new", level = "error")]
550 fn create_dependency_task_if_new(
551 &mut self,
552 view_number: ViewNumber,
553 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
554 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
555 event: Arc<HotShotEvent<TYPES>>,
556 ) {
557 tracing::debug!(
558 "Attempting to make dependency task for view {view_number} and event {event:?}"
559 );
560
561 if self.vote_dependencies.contains_key(&view_number) {
562 return;
563 }
564
565 let (cancel_sender, cancel_receiver) = broadcast(1);
566
567 let mut quorum_proposal_dependency = self.create_event_dependency(
568 VoteDependency::QuorumProposal,
569 view_number,
570 event_receiver.clone(),
571 cancel_receiver.clone(),
572 );
573 let dac_dependency = self.create_event_dependency(
574 VoteDependency::Dac,
575 view_number,
576 event_receiver.clone(),
577 cancel_receiver.clone(),
578 );
579 let vid_dependency = self.create_event_dependency(
580 VoteDependency::Vid,
581 view_number,
582 event_receiver.clone(),
583 cancel_receiver.clone(),
584 );
585 if let HotShotEvent::QuorumProposalValidated(..) = event.as_ref() {
587 quorum_proposal_dependency.mark_as_completed(event);
588 }
589
590 let deps = vec![quorum_proposal_dependency, dac_dependency, vid_dependency];
591
592 let dependency_chain = AndDependency::from_deps(deps);
593
594 let dependency_task = DependencyTask::new(
595 dependency_chain,
596 VoteDependencyHandle::<TYPES, I> {
597 public_key: self.public_key.clone(),
598 private_key: self.private_key.clone(),
599 consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
600 instance_state: Arc::clone(&self.instance_state),
601 membership_coordinator: self.membership.clone(),
602 storage: self.storage.clone(),
603 storage_metrics: Arc::clone(&self.storage_metrics),
604 view_number,
605 sender: event_sender.clone(),
606 receiver: event_receiver.clone().deactivate(),
607 upgrade_lock: self.upgrade_lock.clone(),
608 id: self.id,
609 epoch_height: self.epoch_height,
610 consensus_metrics: Arc::clone(&self.consensus_metrics),
611 state_private_key: self.state_private_key.clone(),
612 first_epoch: self.first_epoch,
613 stake_table_capacity: self.stake_table_capacity,
614 cancel_receiver,
615 },
616 );
617 self.vote_dependencies.insert(view_number, cancel_sender);
618
619 dependency_task.run();
620 }
621
622 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote update latest voted view", level = "error")]
624 async fn update_latest_voted_view(&mut self, new_view: ViewNumber) -> bool {
625 if *self.latest_voted_view < *new_view {
626 tracing::debug!(
627 "Updating next vote view from {} to {} in the quorum vote task",
628 *self.latest_voted_view,
629 *new_view
630 );
631
632 for view in *self.latest_voted_view..(*new_view) {
634 let maybe_cancel_sender = self.vote_dependencies.remove(&ViewNumber::new(view));
635 if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
636 tracing::warn!("Aborting vote dependency task for view {view}");
637 let _ = maybe_cancel_sender.unwrap().try_broadcast(());
638 }
639 }
640
641 if let Ok(last_voted_view_usize) = usize::try_from(*new_view) {
643 self.consensus_metrics
644 .last_voted_view
645 .set(last_voted_view_usize);
646 } else {
647 tracing::warn!("Failed to convert last voted view to a usize: {new_view}");
648 }
649
650 self.latest_voted_view = new_view;
651
652 return true;
653 }
654 false
655 }
656
657 #[instrument(skip_all, fields(id = self.id, latest_voted_view = *self.latest_voted_view), name = "Quorum vote handle", level = "error", target = "QuorumVoteTaskState")]
659 pub async fn handle(
660 &mut self,
661 event: Arc<HotShotEvent<TYPES>>,
662 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
663 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
664 ) -> Result<()> {
665 match event.as_ref() {
666 HotShotEvent::QuorumProposalValidated(proposal, _parent_leaf) => {
667 tracing::trace!(
668 "Received Proposal for view {}",
669 *proposal.data.view_number()
670 );
671
672 if let Err(e) =
674 handle_quorum_proposal_validated(&proposal.data, self, &event_sender).await
675 {
676 tracing::debug!(
677 "Failed to handle QuorumProposalValidated event; error = {e:#}"
678 );
679 }
680
681 ensure!(
682 proposal.data.view_number() > self.latest_voted_view,
683 "We have already voted for this view"
684 );
685
686 self.create_dependency_task_if_new(
687 proposal.data.view_number(),
688 event_receiver,
689 &event_sender,
690 Arc::clone(&event),
691 );
692 },
693 HotShotEvent::DaCertificateRecv(cert) => {
694 let view = cert.view_number;
695
696 tracing::trace!("Received DAC for view {view}");
697 ensure!(
699 view > self.latest_voted_view,
700 "Received DAC for an older view."
701 );
702
703 let cert_epoch = cert.data.epoch;
704
705 let epoch_membership = self.membership.stake_table_for_epoch(cert_epoch)?;
706 let membership_da_stake_table =
707 StakeTableEntries::from_iter(epoch_membership.da_stake_table()).0;
708 let membership_da_success_threshold = epoch_membership.da_success_threshold();
709
710 cert.is_valid_cert(
712 &membership_da_stake_table,
713 membership_da_success_threshold,
714 &self.upgrade_lock,
715 )
716 .context(|e| warn!("Invalid DAC: {e}"))?;
717
718 self.consensus
720 .write()
721 .await
722 .update_saved_da_certs(view, cert.clone());
723
724 broadcast_event(
725 Arc::new(HotShotEvent::DaCertificateValidated(cert.clone())),
726 &event_sender.clone(),
727 )
728 .await;
729 self.create_dependency_task_if_new(
730 view,
731 event_receiver,
732 &event_sender,
733 Arc::clone(&event),
734 );
735 },
736 HotShotEvent::VidShareRecv(sender, share) => {
737 let view = share.data.view_number();
738 tracing::trace!("Received VID share for view {view}");
740 ensure!(
741 view > self.latest_voted_view,
742 "Received VID share for an older view."
743 );
744
745 let payload_commitment = share.data.payload_commitment_ref();
747
748 ensure!(
750 sender.validate(&share.signature, payload_commitment.as_ref()),
751 warn!(
752 "VID share signature is invalid, sender: {}, signature: {:?}, \
753 payload_commitment: {:?}",
754 sender, share.signature, payload_commitment
755 )
756 );
757
758 let vid_epoch = share.data.epoch();
759 let target_epoch = share.data.target_epoch();
760 let membership_reader = self.membership.membership_for_epoch(vid_epoch)?;
761 ensure!(
763 membership_reader
764 .da_committee_members(view)
765 .any(|k| k == sender)
766 || *sender == membership_reader.leader(view)?,
767 "VID share was not sent by a DA member or the view leader."
768 );
769
770 let membership = self.membership.membership_for_epoch(target_epoch)?;
771 let total_weight = vid_total_weight(membership.stake_table(), target_epoch);
772
773 if !share.data.verify(total_weight) {
774 bail!("Failed to verify VID share");
775 }
776
777 self.consensus
778 .write()
779 .await
780 .update_vid_shares(view, share.clone());
781
782 ensure!(
783 *share.data.recipient_key() == self.public_key,
784 "Got a Valid VID share but it's not for our key"
785 );
786
787 broadcast_event(
788 Arc::new(HotShotEvent::VidShareValidated(share.clone())),
789 &event_sender.clone(),
790 )
791 .await;
792 self.create_dependency_task_if_new(
793 view,
794 event_receiver,
795 &event_sender,
796 Arc::clone(&event),
797 );
798 },
799 HotShotEvent::Timeout(view, ..) => {
800 let view = ViewNumber::new(view.saturating_sub(1));
801 let current_tasks = self.vote_dependencies.split_off(&view);
803 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
804 if !cancel_sender.is_closed() {
805 tracing::error!("Aborting vote dependency task for view {view}");
806 let _ = cancel_sender.try_broadcast(());
807 }
808 }
809 self.vote_dependencies = current_tasks;
810 },
811 &HotShotEvent::ViewChange(mut view, _) => {
812 view = ViewNumber::new(view.saturating_sub(1));
813 if !self.update_latest_voted_view(view).await {
814 tracing::debug!("view not updated");
815 }
816 let current_tasks = self.vote_dependencies.split_off(&view);
818 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
819 if !cancel_sender.is_closed() {
820 tracing::error!("Aborting vote dependency task for view {view}");
821 let _ = cancel_sender.try_broadcast(());
822 }
823 }
824 self.vote_dependencies = current_tasks;
825 },
826 HotShotEvent::SetFirstEpoch(view, epoch) => {
827 self.first_epoch = Some((*view, *epoch));
828 },
829 _ => {},
830 }
831 Ok(())
832 }
833}
834
835#[async_trait]
836impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for QuorumVoteTaskState<TYPES, I> {
837 type Event = HotShotEvent<TYPES>;
838
839 async fn handle_event(
840 &mut self,
841 event: Arc<Self::Event>,
842 sender: &Sender<Arc<Self::Event>>,
843 receiver: &Receiver<Arc<Self::Event>>,
844 ) -> Result<()> {
845 if self
846 .upgrade_lock
847 .new_protocol_active(self.latest_voted_view)
848 {
849 return Ok(());
850 }
851 self.handle(event, receiver.clone(), sender.clone()).await
852 }
853
854 fn cancel_subtasks(&mut self) {
855 while let Some((view, cancel_sender)) = self.vote_dependencies.pop_last() {
856 if !cancel_sender.is_closed() {
857 tracing::error!("Aborting vote dependency task for view {view}");
858 let _ = cancel_sender.try_broadcast(());
859 }
860 }
861 }
862}