1use std::{
8 collections::{BTreeMap, HashMap, btree_map::Entry},
9 fmt::Debug,
10 future::Future,
11 marker::PhantomData,
12 sync::Arc,
13};
14
15use async_broadcast::Sender;
16use async_trait::async_trait;
17use either::Either::{Left, Right};
18use hotshot_types::{
19 data::{EpochNumber, ViewNumber},
20 epoch_membership::EpochMembership,
21 message::UpgradeLock,
22 simple_certificate::{
23 DaCertificate2, EpochRootQuorumCertificateV2, NextEpochQuorumCertificate2,
24 QuorumCertificate, QuorumCertificate2, TimeoutCertificate2, UpgradeCertificate,
25 ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
26 },
27 simple_vote::{
28 DaVote2, EpochRootQuorumVote2, NextEpochQuorumVote2, QuorumVote, QuorumVote2, TimeoutVote2,
29 UpgradeVote, ViewSyncCommitVote2, ViewSyncFinalizeVote2, ViewSyncPreCommitVote2,
30 },
31 traits::node_implementation::NodeType,
32 utils::EpochTransitionIndicator,
33 vote::{
34 Certificate, HasViewNumber, LightClientStateUpdateVoteAccumulator, Vote, VoteAccumulator,
35 },
36};
37use hotshot_utils::anytrace::*;
38
39use crate::{events::HotShotEvent, helpers::broadcast_event};
40
41pub type VoteCollectorsMap<TYPES, VOTE, CERT> =
43 BTreeMap<ViewNumber, VoteCollectionTaskState<TYPES, VOTE, CERT>>;
44
45pub struct VoteCollectionTaskState<
47 TYPES: NodeType,
48 VOTE: Vote<TYPES>,
49 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Debug,
50> {
51 pub public_key: TYPES::SignatureKey,
53
54 pub membership: EpochMembership<TYPES>,
56
57 pub accumulator: Option<VoteAccumulator<TYPES, VOTE, CERT>>,
59
60 pub view: ViewNumber,
62
63 pub id: u64,
65
66 pub transition_indicator: EpochTransitionIndicator,
68}
69
70pub trait AggregatableVote<
72 TYPES: NodeType,
73 VOTE: Vote<TYPES>,
74 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>,
75>
76{
77 fn leader(
82 &self,
83 membership: &EpochMembership<TYPES>,
84 ) -> impl Future<Output = Result<TYPES::SignatureKey>>;
85
86 fn make_cert_event(certificate: CERT, key: &TYPES::SignatureKey) -> HotShotEvent<TYPES>;
88}
89
90impl<
91 TYPES: NodeType,
92 VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT>,
93 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Clone + Debug,
94> VoteCollectionTaskState<TYPES, VOTE, CERT>
95{
96 #[allow(clippy::question_mark)]
102 pub async fn accumulate_vote(
103 &mut self,
104 vote: &VOTE,
105 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
106 ) -> Result<Option<CERT>> {
107 ensure!(
109 matches!(
110 self.transition_indicator,
111 EpochTransitionIndicator::InTransition
112 ) || vote.leader(&self.membership).await? == self.public_key,
113 info!("Received vote for a view in which we were not the leader.")
114 );
115
116 ensure!(
117 vote.view_number() == self.view,
118 error!(
119 "Vote view does not match! vote view is {} current view is {}. This vote should \
120 not have been passed to this accumulator.",
121 *vote.view_number(),
122 *self.view
123 )
124 );
125
126 let accumulator = self.accumulator.as_mut().context(warn!(
127 "No accumulator to handle vote with. This shouldn't happen."
128 ))?;
129
130 match accumulator.accumulate(vote, self.membership.clone()) {
131 None => Ok(None),
132 Some(cert) => {
133 tracing::debug!("Certificate Formed! {cert:?}");
134
135 broadcast_event(
136 Arc::new(VOTE::make_cert_event(cert.clone(), &self.public_key)),
137 event_stream,
138 )
139 .await;
140 self.accumulator = None;
141
142 Ok(Some(cert))
143 },
144 }
145 }
146}
147
148#[async_trait]
150pub trait HandleVoteEvent<TYPES, VOTE, CERT>
151where
152 TYPES: NodeType,
153 VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT>,
154 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Debug,
155{
156 async fn handle_vote_event(
161 &mut self,
162 event: Arc<HotShotEvent<TYPES>>,
163 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
164 ) -> Result<Option<CERT>>;
165
166 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool;
168}
169
170pub struct AccumulatorInfo<TYPES: NodeType> {
172 pub public_key: TYPES::SignatureKey,
174
175 pub membership: EpochMembership<TYPES>,
177
178 pub view: ViewNumber,
180
181 pub id: u64,
183}
184
185pub async fn create_vote_accumulator<TYPES, VOTE, CERT>(
193 info: &AccumulatorInfo<TYPES>,
194 event: Arc<HotShotEvent<TYPES>>,
195 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
196 upgrade_lock: UpgradeLock<TYPES>,
197 transition_indicator: EpochTransitionIndicator,
198) -> Result<VoteCollectionTaskState<TYPES, VOTE, CERT>>
199where
200 TYPES: NodeType,
201 VOTE: Vote<TYPES>
202 + AggregatableVote<TYPES, VOTE, CERT>
203 + std::marker::Send
204 + std::marker::Sync
205 + 'static,
206 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>
207 + Debug
208 + std::marker::Send
209 + std::marker::Sync
210 + 'static,
211 VoteCollectionTaskState<TYPES, VOTE, CERT>: HandleVoteEvent<TYPES, VOTE, CERT>,
212{
213 let new_accumulator = VoteAccumulator {
214 vote_outcomes: HashMap::new(),
215 signers: HashMap::new(),
216 phantom: PhantomData,
217 upgrade_lock,
218 };
219
220 let mut state = VoteCollectionTaskState::<TYPES, VOTE, CERT> {
221 membership: info.membership.clone(),
222 public_key: info.public_key.clone(),
223 accumulator: Some(new_accumulator),
224 view: info.view,
225 id: info.id,
226 transition_indicator,
227 };
228
229 state.handle_vote_event(Arc::clone(&event), sender).await?;
230
231 Ok(state)
232}
233
234#[allow(clippy::too_many_arguments)]
239pub async fn handle_vote<
240 TYPES: NodeType,
241 VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT> + Send + Sync + 'static,
242 CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>
243 + Debug
244 + Send
245 + Sync
246 + 'static,
247>(
248 collectors: &mut VoteCollectorsMap<TYPES, VOTE, CERT>,
249 vote: &VOTE,
250 public_key: TYPES::SignatureKey,
251 membership: &EpochMembership<TYPES>,
252 id: u64,
253 event: &Arc<HotShotEvent<TYPES>>,
254 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
255 upgrade_lock: &UpgradeLock<TYPES>,
256 transition_indicator: EpochTransitionIndicator,
257) -> Result<()>
258where
259 VoteCollectionTaskState<TYPES, VOTE, CERT>: HandleVoteEvent<TYPES, VOTE, CERT>,
260{
261 match collectors.entry(vote.view_number()) {
262 Entry::Vacant(entry) => {
263 tracing::debug!("Starting vote handle for view {:?}", vote.view_number());
264 let info = AccumulatorInfo {
265 public_key,
266 membership: membership.clone(),
267 view: vote.view_number(),
268 id,
269 };
270 let collector = create_vote_accumulator(
271 &info,
272 Arc::clone(event),
273 event_stream,
274 upgrade_lock.clone(),
275 transition_indicator,
276 )
277 .await?;
278
279 entry.insert(collector);
280
281 Ok(())
282 },
283 Entry::Occupied(mut entry) => {
284 if entry
286 .get_mut()
287 .handle_vote_event(Arc::clone(event), event_stream)
288 .await?
289 .is_some()
290 {
291 entry.remove();
292 *collectors = collectors.split_off(&vote.view_number());
293 }
294
295 Ok(())
296 },
297 }
298}
299
300type QuorumVoteState<TYPES> =
302 VoteCollectionTaskState<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>;
303type NextEpochQuorumVoteState<TYPES> =
305 VoteCollectionTaskState<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>;
306type DaVoteState<TYPES> = VoteCollectionTaskState<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>;
308type TimeoutVoteState<TYPES> =
310 VoteCollectionTaskState<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>;
311type UpgradeVoteState<TYPES> =
313 VoteCollectionTaskState<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>;
314type ViewSyncPreCommitState<TYPES> = VoteCollectionTaskState<
316 TYPES,
317 ViewSyncPreCommitVote2<TYPES>,
318 ViewSyncPreCommitCertificate2<TYPES>,
319>;
320type ViewSyncCommitVoteState<TYPES> =
322 VoteCollectionTaskState<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>;
323type ViewSyncFinalizeVoteState<TYPES> = VoteCollectionTaskState<
325 TYPES,
326 ViewSyncFinalizeVote2<TYPES>,
327 ViewSyncFinalizeCertificate2<TYPES>,
328>;
329
330impl<TYPES: NodeType> AggregatableVote<TYPES, QuorumVote<TYPES>, QuorumCertificate<TYPES>>
331 for QuorumVote<TYPES>
332{
333 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
334 membership.leader(self.view_number() + 1)
335 }
336 fn make_cert_event(
337 certificate: QuorumCertificate<TYPES>,
338 _key: &TYPES::SignatureKey,
339 ) -> HotShotEvent<TYPES> {
340 HotShotEvent::QcFormed(Left(certificate))
341 }
342}
343
344impl<TYPES: NodeType> AggregatableVote<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>
345 for QuorumVote2<TYPES>
346{
347 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
348 membership.leader(self.view_number() + 1)
349 }
350 fn make_cert_event(
351 certificate: QuorumCertificate2<TYPES>,
352 _key: &TYPES::SignatureKey,
353 ) -> HotShotEvent<TYPES> {
354 HotShotEvent::Qc2Formed(Left(certificate))
355 }
356}
357
358impl<TYPES: NodeType>
359 AggregatableVote<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>
360 for NextEpochQuorumVote2<TYPES>
361{
362 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
363 let epoch = membership
364 .epoch()
365 .map(|e| EpochNumber::new(e.saturating_sub(1)));
366 membership
367 .get_new_epoch(epoch)?
368 .leader(self.view_number() + 1)
369 }
370 fn make_cert_event(
371 certificate: NextEpochQuorumCertificate2<TYPES>,
372 _key: &TYPES::SignatureKey,
373 ) -> HotShotEvent<TYPES> {
374 HotShotEvent::NextEpochQc2Formed(Left(certificate))
375 }
376}
377
378impl<TYPES: NodeType> AggregatableVote<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>
379 for UpgradeVote<TYPES>
380{
381 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
382 membership.leader(self.view_number())
383 }
384 fn make_cert_event(
385 certificate: UpgradeCertificate<TYPES>,
386 _key: &TYPES::SignatureKey,
387 ) -> HotShotEvent<TYPES> {
388 HotShotEvent::UpgradeCertificateFormed(certificate)
389 }
390}
391
392impl<TYPES: NodeType> AggregatableVote<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>
393 for DaVote2<TYPES>
394{
395 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
396 membership.leader(self.view_number())
397 }
398 fn make_cert_event(
399 certificate: DaCertificate2<TYPES>,
400 key: &TYPES::SignatureKey,
401 ) -> HotShotEvent<TYPES> {
402 HotShotEvent::DacSend(certificate, key.clone())
403 }
404}
405
406impl<TYPES: NodeType> AggregatableVote<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>
407 for TimeoutVote2<TYPES>
408{
409 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
410 membership.leader(self.view_number() + 1)
411 }
412 fn make_cert_event(
413 certificate: TimeoutCertificate2<TYPES>,
414 _key: &TYPES::SignatureKey,
415 ) -> HotShotEvent<TYPES> {
416 HotShotEvent::Qc2Formed(Right(certificate))
417 }
418}
419
420impl<TYPES: NodeType>
421 AggregatableVote<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>
422 for ViewSyncCommitVote2<TYPES>
423{
424 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
425 membership.leader(self.date().round + self.date().relay)
426 }
427 fn make_cert_event(
428 certificate: ViewSyncCommitCertificate2<TYPES>,
429 key: &TYPES::SignatureKey,
430 ) -> HotShotEvent<TYPES> {
431 HotShotEvent::ViewSyncCommitCertificateSend(certificate, key.clone())
432 }
433}
434
435impl<TYPES: NodeType>
436 AggregatableVote<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>
437 for ViewSyncPreCommitVote2<TYPES>
438{
439 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
440 membership.leader(self.date().round + self.date().relay)
441 }
442 fn make_cert_event(
443 certificate: ViewSyncPreCommitCertificate2<TYPES>,
444 key: &TYPES::SignatureKey,
445 ) -> HotShotEvent<TYPES> {
446 HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, key.clone())
447 }
448}
449
450impl<TYPES: NodeType>
451 AggregatableVote<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>
452 for ViewSyncFinalizeVote2<TYPES>
453{
454 async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
455 membership.leader(self.date().round + self.date().relay)
456 }
457 fn make_cert_event(
458 certificate: ViewSyncFinalizeCertificate2<TYPES>,
459 key: &TYPES::SignatureKey,
460 ) -> HotShotEvent<TYPES> {
461 HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, key.clone())
462 }
463}
464
465#[async_trait]
467impl<TYPES: NodeType> HandleVoteEvent<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>
468 for QuorumVoteState<TYPES>
469{
470 async fn handle_vote_event(
471 &mut self,
472 event: Arc<HotShotEvent<TYPES>>,
473 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
474 ) -> Result<Option<QuorumCertificate2<TYPES>>> {
475 match event.as_ref() {
476 HotShotEvent::QuorumVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
477 _ => Ok(None),
478 }
479 }
480 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
481 matches!(event.as_ref(), HotShotEvent::QuorumVoteRecv(_))
482 }
483}
484
485#[async_trait]
487impl<TYPES: NodeType>
488 HandleVoteEvent<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>
489 for NextEpochQuorumVoteState<TYPES>
490{
491 async fn handle_vote_event(
492 &mut self,
493 event: Arc<HotShotEvent<TYPES>>,
494 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
495 ) -> Result<Option<NextEpochQuorumCertificate2<TYPES>>> {
496 match event.as_ref() {
497 HotShotEvent::QuorumVoteRecv(vote) => {
498 self.accumulate_vote(&vote.clone().into(), sender).await
500 },
501 _ => Ok(None),
502 }
503 }
504 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
505 matches!(event.as_ref(), HotShotEvent::QuorumVoteRecv(_))
506 }
507}
508
509#[async_trait]
511impl<TYPES: NodeType> HandleVoteEvent<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>
512 for UpgradeVoteState<TYPES>
513{
514 async fn handle_vote_event(
515 &mut self,
516 event: Arc<HotShotEvent<TYPES>>,
517 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
518 ) -> Result<Option<UpgradeCertificate<TYPES>>> {
519 match event.as_ref() {
520 HotShotEvent::UpgradeVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
521 _ => Ok(None),
522 }
523 }
524 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
525 matches!(event.as_ref(), HotShotEvent::UpgradeVoteRecv(_))
526 }
527}
528
529#[async_trait]
530impl<TYPES: NodeType> HandleVoteEvent<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>
531 for DaVoteState<TYPES>
532{
533 async fn handle_vote_event(
534 &mut self,
535 event: Arc<HotShotEvent<TYPES>>,
536 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
537 ) -> Result<Option<DaCertificate2<TYPES>>> {
538 match event.as_ref() {
539 HotShotEvent::DaVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
540 _ => Ok(None),
541 }
542 }
543 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
544 matches!(event.as_ref(), HotShotEvent::DaVoteRecv(_))
545 }
546}
547
548#[async_trait]
549impl<TYPES: NodeType> HandleVoteEvent<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>
550 for TimeoutVoteState<TYPES>
551{
552 async fn handle_vote_event(
553 &mut self,
554 event: Arc<HotShotEvent<TYPES>>,
555 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
556 ) -> Result<Option<TimeoutCertificate2<TYPES>>> {
557 match event.as_ref() {
558 HotShotEvent::TimeoutVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
559 _ => Ok(None),
560 }
561 }
562 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
563 matches!(event.as_ref(), HotShotEvent::TimeoutVoteRecv(_))
564 }
565}
566
567#[async_trait]
568impl<TYPES: NodeType>
569 HandleVoteEvent<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>
570 for ViewSyncPreCommitState<TYPES>
571{
572 async fn handle_vote_event(
573 &mut self,
574 event: Arc<HotShotEvent<TYPES>>,
575 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
576 ) -> Result<Option<ViewSyncPreCommitCertificate2<TYPES>>> {
577 match event.as_ref() {
578 HotShotEvent::ViewSyncPreCommitVoteRecv(vote) => {
579 self.accumulate_vote(vote, sender).await
580 },
581 _ => Ok(None),
582 }
583 }
584 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
585 matches!(event.as_ref(), HotShotEvent::ViewSyncPreCommitVoteRecv(_))
586 }
587}
588
589#[async_trait]
590impl<TYPES: NodeType>
591 HandleVoteEvent<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>
592 for ViewSyncCommitVoteState<TYPES>
593{
594 async fn handle_vote_event(
595 &mut self,
596 event: Arc<HotShotEvent<TYPES>>,
597 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
598 ) -> Result<Option<ViewSyncCommitCertificate2<TYPES>>> {
599 match event.as_ref() {
600 HotShotEvent::ViewSyncCommitVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
601 _ => Ok(None),
602 }
603 }
604 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
605 matches!(event.as_ref(), HotShotEvent::ViewSyncCommitVoteRecv(_))
606 }
607}
608
609#[async_trait]
610impl<TYPES: NodeType>
611 HandleVoteEvent<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>
612 for ViewSyncFinalizeVoteState<TYPES>
613{
614 async fn handle_vote_event(
615 &mut self,
616 event: Arc<HotShotEvent<TYPES>>,
617 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
618 ) -> Result<Option<ViewSyncFinalizeCertificate2<TYPES>>> {
619 match event.as_ref() {
620 HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
621 self.accumulate_vote(vote, sender).await
622 },
623 _ => Ok(None),
624 }
625 }
626 fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
627 matches!(event.as_ref(), HotShotEvent::ViewSyncFinalizeVoteRecv(_))
628 }
629}
630
631pub type EpochRootVoteCollectorsMap<TYPES> =
633 BTreeMap<ViewNumber, EpochRootVoteCollectionTaskState<TYPES>>;
634
635pub struct EpochRootVoteCollectionTaskState<TYPES: NodeType> {
636 pub public_key: TYPES::SignatureKey,
638
639 pub membership: EpochMembership<TYPES>,
641
642 pub accumulator: Option<VoteAccumulator<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>>,
644
645 pub state_vote_accumulator: Option<LightClientStateUpdateVoteAccumulator<TYPES>>,
647
648 pub view: ViewNumber,
650
651 pub epoch: Option<EpochNumber>,
653
654 pub id: u64,
656}
657
658impl<TYPES: NodeType> EpochRootVoteCollectionTaskState<TYPES> {
660 async fn handle_vote_event(
662 &mut self,
663 event: Arc<HotShotEvent<TYPES>>,
664 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
665 ) -> Result<Option<EpochRootQuorumCertificateV2<TYPES>>> {
666 match event.as_ref() {
667 HotShotEvent::EpochRootQuorumVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
668 _ => Ok(None),
669 }
670 }
671
672 async fn accumulate_vote(
674 &mut self,
675 vote: &EpochRootQuorumVote2<TYPES>,
676 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
677 ) -> Result<Option<EpochRootQuorumCertificateV2<TYPES>>> {
678 let EpochRootQuorumVote2 { vote, state_vote } = vote;
679 ensure!(
680 vote.view_number() == self.view,
681 error!(
682 "Vote view does not match! vote view is {} current view is {}. This vote should \
683 not have been passed to this accumulator.",
684 *vote.view_number(),
685 *self.view
686 )
687 );
688
689 let accumulator = self.accumulator.as_mut().context(warn!(
690 "No accumulator to handle extended quorum vote with. This shouldn't happen."
691 ))?;
692
693 let state_vote_accumulator = self.state_vote_accumulator.as_mut().context(warn!(
694 "No accumulator to handle light client state update vote with. This shouldn't happen."
695 ))?;
696
697 match (
698 accumulator.accumulate(vote, self.membership.clone()),
699 state_vote_accumulator.accumulate(&vote.signing_key(), state_vote, &self.membership),
700 ) {
701 (None, None) => Ok(None),
702 (Some(cert), Some(state_cert)) => {
703 let root_qc = EpochRootQuorumCertificateV2 {
704 qc: cert,
705 state_cert,
706 };
707
708 tracing::debug!("Certificate Formed! {root_qc:?}");
709
710 broadcast_event(
711 Arc::new(HotShotEvent::EpochRootQcFormed(root_qc.clone())),
712 event_stream,
713 )
714 .await;
715 self.accumulator = None;
716
717 Ok(Some(root_qc))
718 },
719 _ => Err(error!(
720 "Only one certificate formed for the epoch root, this should not happen."
721 )),
722 }
723 }
724}
725
726async fn create_epoch_root_vote_collection_task_state<TYPES: NodeType>(
727 info: &AccumulatorInfo<TYPES>,
728 event: Arc<HotShotEvent<TYPES>>,
729 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
730 upgrade_lock: UpgradeLock<TYPES>,
731) -> Result<EpochRootVoteCollectionTaskState<TYPES>> {
732 let new_accumulator = VoteAccumulator::<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>> {
733 vote_outcomes: HashMap::new(),
734 signers: HashMap::new(),
735 phantom: PhantomData,
736 upgrade_lock: upgrade_lock.clone(),
737 };
738 let state_vote_accumulator = LightClientStateUpdateVoteAccumulator {
739 vote_outcomes: HashMap::new(),
740 upgrade_lock,
741 };
742
743 let mut state = EpochRootVoteCollectionTaskState::<TYPES> {
744 membership: info.membership.clone(),
745 public_key: info.public_key.clone(),
746 accumulator: Some(new_accumulator),
747 state_vote_accumulator: Some(state_vote_accumulator),
748 view: info.view,
749 epoch: info.membership.epoch(),
750 id: info.id,
751 };
752
753 state.handle_vote_event(Arc::clone(&event), sender).await?;
754
755 Ok(state)
756}
757
758#[allow(clippy::too_many_arguments)]
763pub async fn handle_epoch_root_vote<TYPES: NodeType>(
764 collectors: &mut EpochRootVoteCollectorsMap<TYPES>,
765 vote: &EpochRootQuorumVote2<TYPES>,
766 public_key: TYPES::SignatureKey,
767 membership: &EpochMembership<TYPES>,
768 id: u64,
769 event: &Arc<HotShotEvent<TYPES>>,
770 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
771 upgrade_lock: &UpgradeLock<TYPES>,
772) -> Result<()> {
773 match collectors.entry(vote.view_number()) {
774 Entry::Vacant(entry) => {
775 tracing::debug!(
776 "Starting epoch root quorum vote handle for view {:?}",
777 vote.view_number()
778 );
779 let info = AccumulatorInfo {
780 public_key,
781 membership: membership.clone(),
782 view: vote.view_number(),
783 id,
784 };
785 let collector = create_epoch_root_vote_collection_task_state(
786 &info,
787 Arc::clone(event),
788 event_stream,
789 upgrade_lock.clone(),
790 )
791 .await?;
792
793 entry.insert(collector);
794
795 Ok(())
796 },
797 Entry::Occupied(mut entry) => {
798 if entry
800 .get_mut()
801 .handle_vote_event(Arc::clone(event), event_stream)
802 .await?
803 .is_some()
804 {
805 entry.remove();
806 *collectors = collectors.split_off(&vote.view_number());
807 }
808
809 Ok(())
810 },
811 }
812}