Skip to main content

hotshot_task_impls/
vote_collection.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, HashMap, btree_map::Entry},
9    fmt::Debug,
10    future::Future,
11    marker::PhantomData,
12    sync::Arc,
13};
14
15use async_broadcast::Sender;
16use async_trait::async_trait;
17use either::Either::{Left, Right};
18use hotshot_types::{
19    data::{EpochNumber, ViewNumber},
20    epoch_membership::EpochMembership,
21    message::UpgradeLock,
22    simple_certificate::{
23        DaCertificate2, EpochRootQuorumCertificateV2, NextEpochQuorumCertificate2,
24        QuorumCertificate, QuorumCertificate2, TimeoutCertificate2, UpgradeCertificate,
25        ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
26    },
27    simple_vote::{
28        DaVote2, EpochRootQuorumVote2, NextEpochQuorumVote2, QuorumVote, QuorumVote2, TimeoutVote2,
29        UpgradeVote, ViewSyncCommitVote2, ViewSyncFinalizeVote2, ViewSyncPreCommitVote2,
30    },
31    traits::node_implementation::NodeType,
32    utils::EpochTransitionIndicator,
33    vote::{
34        Certificate, HasViewNumber, LightClientStateUpdateVoteAccumulator, Vote, VoteAccumulator,
35    },
36};
37use hotshot_utils::anytrace::*;
38
39use crate::{events::HotShotEvent, helpers::broadcast_event};
40
41/// Alias for a map of Vote Collectors
42pub type VoteCollectorsMap<TYPES, VOTE, CERT> =
43    BTreeMap<ViewNumber, VoteCollectionTaskState<TYPES, VOTE, CERT>>;
44
45/// Task state for collecting votes of one type and emitting a certificate
46pub struct VoteCollectionTaskState<
47    TYPES: NodeType,
48    VOTE: Vote<TYPES>,
49    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Debug,
50> {
51    /// Public key for this node.
52    pub public_key: TYPES::SignatureKey,
53
54    /// Membership for voting
55    pub membership: EpochMembership<TYPES>,
56
57    /// accumulator handles aggregating the votes
58    pub accumulator: Option<VoteAccumulator<TYPES, VOTE, CERT>>,
59
60    /// The view which we are collecting votes for
61    pub view: ViewNumber,
62
63    /// Node id
64    pub id: u64,
65
66    /// Whether we should check if we are the leader when handling a vote
67    pub transition_indicator: EpochTransitionIndicator,
68}
69
70/// Describes the functions a vote must implement for it to be aggregatable by the generic vote collection task
71pub trait AggregatableVote<
72    TYPES: NodeType,
73    VOTE: Vote<TYPES>,
74    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>,
75>
76{
77    /// return the leader for this votes
78    ///
79    /// # Errors
80    /// if the leader cannot be calculated
81    fn leader(
82        &self,
83        membership: &EpochMembership<TYPES>,
84    ) -> impl Future<Output = Result<TYPES::SignatureKey>>;
85
86    /// return the Hotshot event for the completion of this CERT
87    fn make_cert_event(certificate: CERT, key: &TYPES::SignatureKey) -> HotShotEvent<TYPES>;
88}
89
90impl<
91    TYPES: NodeType,
92    VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT>,
93    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Clone + Debug,
94> VoteCollectionTaskState<TYPES, VOTE, CERT>
95{
96    /// Take one vote and accumulate it. Returns either the cert or the updated state
97    /// after the vote is accumulated
98    ///
99    /// # Errors
100    /// If are unable to accumulate the vote
101    #[allow(clippy::question_mark)]
102    pub async fn accumulate_vote(
103        &mut self,
104        vote: &VOTE,
105        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
106    ) -> Result<Option<CERT>> {
107        // TODO create this only once
108        ensure!(
109            matches!(
110                self.transition_indicator,
111                EpochTransitionIndicator::InTransition
112            ) || vote.leader(&self.membership).await? == self.public_key,
113            info!("Received vote for a view in which we were not the leader.")
114        );
115
116        ensure!(
117            vote.view_number() == self.view,
118            error!(
119                "Vote view does not match! vote view is {} current view is {}. This vote should \
120                 not have been passed to this accumulator.",
121                *vote.view_number(),
122                *self.view
123            )
124        );
125
126        let accumulator = self.accumulator.as_mut().context(warn!(
127            "No accumulator to handle vote with. This shouldn't happen."
128        ))?;
129
130        match accumulator.accumulate(vote, self.membership.clone()) {
131            None => Ok(None),
132            Some(cert) => {
133                tracing::debug!("Certificate Formed! {cert:?}");
134
135                broadcast_event(
136                    Arc::new(VOTE::make_cert_event(cert.clone(), &self.public_key)),
137                    event_stream,
138                )
139                .await;
140                self.accumulator = None;
141
142                Ok(Some(cert))
143            },
144        }
145    }
146}
147
148/// Trait for types which will handle a vote event.
149#[async_trait]
150pub trait HandleVoteEvent<TYPES, VOTE, CERT>
151where
152    TYPES: NodeType,
153    VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT>,
154    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment> + Debug,
155{
156    /// Handle a vote event
157    ///
158    /// # Errors
159    /// Returns an error if we fail to handle the vote
160    async fn handle_vote_event(
161        &mut self,
162        event: Arc<HotShotEvent<TYPES>>,
163        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
164    ) -> Result<Option<CERT>>;
165
166    /// Event filter to use for this event
167    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool;
168}
169
170/// Info needed to create a vote accumulator task
171pub struct AccumulatorInfo<TYPES: NodeType> {
172    /// This nodes Pub Key
173    pub public_key: TYPES::SignatureKey,
174
175    /// Membership we are accumulation votes for
176    pub membership: EpochMembership<TYPES>,
177
178    /// View of the votes we are collecting
179    pub view: ViewNumber,
180
181    /// This nodes id
182    pub id: u64,
183}
184
185/// Generic function for spawning a vote task.  Returns the event stream id of the spawned task if created
186///
187/// # Errors
188/// If we failed to create the accumulator
189///
190/// # Panics
191/// Calls unwrap but should never panic.
192pub async fn create_vote_accumulator<TYPES, VOTE, CERT>(
193    info: &AccumulatorInfo<TYPES>,
194    event: Arc<HotShotEvent<TYPES>>,
195    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
196    upgrade_lock: UpgradeLock<TYPES>,
197    transition_indicator: EpochTransitionIndicator,
198) -> Result<VoteCollectionTaskState<TYPES, VOTE, CERT>>
199where
200    TYPES: NodeType,
201    VOTE: Vote<TYPES>
202        + AggregatableVote<TYPES, VOTE, CERT>
203        + std::marker::Send
204        + std::marker::Sync
205        + 'static,
206    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>
207        + Debug
208        + std::marker::Send
209        + std::marker::Sync
210        + 'static,
211    VoteCollectionTaskState<TYPES, VOTE, CERT>: HandleVoteEvent<TYPES, VOTE, CERT>,
212{
213    let new_accumulator = VoteAccumulator {
214        vote_outcomes: HashMap::new(),
215        signers: HashMap::new(),
216        phantom: PhantomData,
217        upgrade_lock,
218    };
219
220    let mut state = VoteCollectionTaskState::<TYPES, VOTE, CERT> {
221        membership: info.membership.clone(),
222        public_key: info.public_key.clone(),
223        accumulator: Some(new_accumulator),
224        view: info.view,
225        id: info.id,
226        transition_indicator,
227    };
228
229    state.handle_vote_event(Arc::clone(&event), sender).await?;
230
231    Ok(state)
232}
233
234/// A helper function that handles a vote regardless whether it's the first vote in the view or not.
235///
236/// # Errors
237/// If we fail to handle the vote
238#[allow(clippy::too_many_arguments)]
239pub async fn handle_vote<
240    TYPES: NodeType,
241    VOTE: Vote<TYPES> + AggregatableVote<TYPES, VOTE, CERT> + Send + Sync + 'static,
242    CERT: Certificate<TYPES, VOTE::Commitment, Voteable = VOTE::Commitment>
243        + Debug
244        + Send
245        + Sync
246        + 'static,
247>(
248    collectors: &mut VoteCollectorsMap<TYPES, VOTE, CERT>,
249    vote: &VOTE,
250    public_key: TYPES::SignatureKey,
251    membership: &EpochMembership<TYPES>,
252    id: u64,
253    event: &Arc<HotShotEvent<TYPES>>,
254    event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
255    upgrade_lock: &UpgradeLock<TYPES>,
256    transition_indicator: EpochTransitionIndicator,
257) -> Result<()>
258where
259    VoteCollectionTaskState<TYPES, VOTE, CERT>: HandleVoteEvent<TYPES, VOTE, CERT>,
260{
261    match collectors.entry(vote.view_number()) {
262        Entry::Vacant(entry) => {
263            tracing::debug!("Starting vote handle for view {:?}", vote.view_number());
264            let info = AccumulatorInfo {
265                public_key,
266                membership: membership.clone(),
267                view: vote.view_number(),
268                id,
269            };
270            let collector = create_vote_accumulator(
271                &info,
272                Arc::clone(event),
273                event_stream,
274                upgrade_lock.clone(),
275                transition_indicator,
276            )
277            .await?;
278
279            entry.insert(collector);
280
281            Ok(())
282        },
283        Entry::Occupied(mut entry) => {
284            // handle the vote, and garbage collect if the vote collector is finished
285            if entry
286                .get_mut()
287                .handle_vote_event(Arc::clone(event), event_stream)
288                .await?
289                .is_some()
290            {
291                entry.remove();
292                *collectors = collectors.split_off(&vote.view_number());
293            }
294
295            Ok(())
296        },
297    }
298}
299
300/// Alias for Quorum vote accumulator
301type QuorumVoteState<TYPES> =
302    VoteCollectionTaskState<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>;
303/// Alias for Quorum vote accumulator
304type NextEpochQuorumVoteState<TYPES> =
305    VoteCollectionTaskState<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>;
306/// Alias for DA vote accumulator
307type DaVoteState<TYPES> = VoteCollectionTaskState<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>;
308/// Alias for Timeout vote accumulator
309type TimeoutVoteState<TYPES> =
310    VoteCollectionTaskState<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>;
311/// Alias for upgrade vote accumulator
312type UpgradeVoteState<TYPES> =
313    VoteCollectionTaskState<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>;
314/// Alias for View Sync Pre Commit vote accumulator
315type ViewSyncPreCommitState<TYPES> = VoteCollectionTaskState<
316    TYPES,
317    ViewSyncPreCommitVote2<TYPES>,
318    ViewSyncPreCommitCertificate2<TYPES>,
319>;
320/// Alias for View Sync Commit vote accumulator
321type ViewSyncCommitVoteState<TYPES> =
322    VoteCollectionTaskState<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>;
323/// Alias for View Sync Finalize vote accumulator
324type ViewSyncFinalizeVoteState<TYPES> = VoteCollectionTaskState<
325    TYPES,
326    ViewSyncFinalizeVote2<TYPES>,
327    ViewSyncFinalizeCertificate2<TYPES>,
328>;
329
330impl<TYPES: NodeType> AggregatableVote<TYPES, QuorumVote<TYPES>, QuorumCertificate<TYPES>>
331    for QuorumVote<TYPES>
332{
333    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
334        membership.leader(self.view_number() + 1)
335    }
336    fn make_cert_event(
337        certificate: QuorumCertificate<TYPES>,
338        _key: &TYPES::SignatureKey,
339    ) -> HotShotEvent<TYPES> {
340        HotShotEvent::QcFormed(Left(certificate))
341    }
342}
343
344impl<TYPES: NodeType> AggregatableVote<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>
345    for QuorumVote2<TYPES>
346{
347    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
348        membership.leader(self.view_number() + 1)
349    }
350    fn make_cert_event(
351        certificate: QuorumCertificate2<TYPES>,
352        _key: &TYPES::SignatureKey,
353    ) -> HotShotEvent<TYPES> {
354        HotShotEvent::Qc2Formed(Left(certificate))
355    }
356}
357
358impl<TYPES: NodeType>
359    AggregatableVote<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>
360    for NextEpochQuorumVote2<TYPES>
361{
362    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
363        let epoch = membership
364            .epoch()
365            .map(|e| EpochNumber::new(e.saturating_sub(1)));
366        membership
367            .get_new_epoch(epoch)?
368            .leader(self.view_number() + 1)
369    }
370    fn make_cert_event(
371        certificate: NextEpochQuorumCertificate2<TYPES>,
372        _key: &TYPES::SignatureKey,
373    ) -> HotShotEvent<TYPES> {
374        HotShotEvent::NextEpochQc2Formed(Left(certificate))
375    }
376}
377
378impl<TYPES: NodeType> AggregatableVote<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>
379    for UpgradeVote<TYPES>
380{
381    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
382        membership.leader(self.view_number())
383    }
384    fn make_cert_event(
385        certificate: UpgradeCertificate<TYPES>,
386        _key: &TYPES::SignatureKey,
387    ) -> HotShotEvent<TYPES> {
388        HotShotEvent::UpgradeCertificateFormed(certificate)
389    }
390}
391
392impl<TYPES: NodeType> AggregatableVote<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>
393    for DaVote2<TYPES>
394{
395    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
396        membership.leader(self.view_number())
397    }
398    fn make_cert_event(
399        certificate: DaCertificate2<TYPES>,
400        key: &TYPES::SignatureKey,
401    ) -> HotShotEvent<TYPES> {
402        HotShotEvent::DacSend(certificate, key.clone())
403    }
404}
405
406impl<TYPES: NodeType> AggregatableVote<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>
407    for TimeoutVote2<TYPES>
408{
409    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
410        membership.leader(self.view_number() + 1)
411    }
412    fn make_cert_event(
413        certificate: TimeoutCertificate2<TYPES>,
414        _key: &TYPES::SignatureKey,
415    ) -> HotShotEvent<TYPES> {
416        HotShotEvent::Qc2Formed(Right(certificate))
417    }
418}
419
420impl<TYPES: NodeType>
421    AggregatableVote<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>
422    for ViewSyncCommitVote2<TYPES>
423{
424    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
425        membership.leader(self.date().round + self.date().relay)
426    }
427    fn make_cert_event(
428        certificate: ViewSyncCommitCertificate2<TYPES>,
429        key: &TYPES::SignatureKey,
430    ) -> HotShotEvent<TYPES> {
431        HotShotEvent::ViewSyncCommitCertificateSend(certificate, key.clone())
432    }
433}
434
435impl<TYPES: NodeType>
436    AggregatableVote<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>
437    for ViewSyncPreCommitVote2<TYPES>
438{
439    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
440        membership.leader(self.date().round + self.date().relay)
441    }
442    fn make_cert_event(
443        certificate: ViewSyncPreCommitCertificate2<TYPES>,
444        key: &TYPES::SignatureKey,
445    ) -> HotShotEvent<TYPES> {
446        HotShotEvent::ViewSyncPreCommitCertificateSend(certificate, key.clone())
447    }
448}
449
450impl<TYPES: NodeType>
451    AggregatableVote<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>
452    for ViewSyncFinalizeVote2<TYPES>
453{
454    async fn leader(&self, membership: &EpochMembership<TYPES>) -> Result<TYPES::SignatureKey> {
455        membership.leader(self.date().round + self.date().relay)
456    }
457    fn make_cert_event(
458        certificate: ViewSyncFinalizeCertificate2<TYPES>,
459        key: &TYPES::SignatureKey,
460    ) -> HotShotEvent<TYPES> {
461        HotShotEvent::ViewSyncFinalizeCertificateSend(certificate, key.clone())
462    }
463}
464
465// Handlers for all vote accumulators
466#[async_trait]
467impl<TYPES: NodeType> HandleVoteEvent<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>
468    for QuorumVoteState<TYPES>
469{
470    async fn handle_vote_event(
471        &mut self,
472        event: Arc<HotShotEvent<TYPES>>,
473        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
474    ) -> Result<Option<QuorumCertificate2<TYPES>>> {
475        match event.as_ref() {
476            HotShotEvent::QuorumVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
477            _ => Ok(None),
478        }
479    }
480    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
481        matches!(event.as_ref(), HotShotEvent::QuorumVoteRecv(_))
482    }
483}
484
485// Handlers for all vote accumulators
486#[async_trait]
487impl<TYPES: NodeType>
488    HandleVoteEvent<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>
489    for NextEpochQuorumVoteState<TYPES>
490{
491    async fn handle_vote_event(
492        &mut self,
493        event: Arc<HotShotEvent<TYPES>>,
494        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
495    ) -> Result<Option<NextEpochQuorumCertificate2<TYPES>>> {
496        match event.as_ref() {
497            HotShotEvent::QuorumVoteRecv(vote) => {
498                // #3967 REVIEW NOTE: Should we error if self.epoch is None?
499                self.accumulate_vote(&vote.clone().into(), sender).await
500            },
501            _ => Ok(None),
502        }
503    }
504    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
505        matches!(event.as_ref(), HotShotEvent::QuorumVoteRecv(_))
506    }
507}
508
509// Handlers for all vote accumulators
510#[async_trait]
511impl<TYPES: NodeType> HandleVoteEvent<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>
512    for UpgradeVoteState<TYPES>
513{
514    async fn handle_vote_event(
515        &mut self,
516        event: Arc<HotShotEvent<TYPES>>,
517        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
518    ) -> Result<Option<UpgradeCertificate<TYPES>>> {
519        match event.as_ref() {
520            HotShotEvent::UpgradeVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
521            _ => Ok(None),
522        }
523    }
524    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
525        matches!(event.as_ref(), HotShotEvent::UpgradeVoteRecv(_))
526    }
527}
528
529#[async_trait]
530impl<TYPES: NodeType> HandleVoteEvent<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>
531    for DaVoteState<TYPES>
532{
533    async fn handle_vote_event(
534        &mut self,
535        event: Arc<HotShotEvent<TYPES>>,
536        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
537    ) -> Result<Option<DaCertificate2<TYPES>>> {
538        match event.as_ref() {
539            HotShotEvent::DaVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
540            _ => Ok(None),
541        }
542    }
543    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
544        matches!(event.as_ref(), HotShotEvent::DaVoteRecv(_))
545    }
546}
547
548#[async_trait]
549impl<TYPES: NodeType> HandleVoteEvent<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>
550    for TimeoutVoteState<TYPES>
551{
552    async fn handle_vote_event(
553        &mut self,
554        event: Arc<HotShotEvent<TYPES>>,
555        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
556    ) -> Result<Option<TimeoutCertificate2<TYPES>>> {
557        match event.as_ref() {
558            HotShotEvent::TimeoutVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
559            _ => Ok(None),
560        }
561    }
562    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
563        matches!(event.as_ref(), HotShotEvent::TimeoutVoteRecv(_))
564    }
565}
566
567#[async_trait]
568impl<TYPES: NodeType>
569    HandleVoteEvent<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>
570    for ViewSyncPreCommitState<TYPES>
571{
572    async fn handle_vote_event(
573        &mut self,
574        event: Arc<HotShotEvent<TYPES>>,
575        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
576    ) -> Result<Option<ViewSyncPreCommitCertificate2<TYPES>>> {
577        match event.as_ref() {
578            HotShotEvent::ViewSyncPreCommitVoteRecv(vote) => {
579                self.accumulate_vote(vote, sender).await
580            },
581            _ => Ok(None),
582        }
583    }
584    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
585        matches!(event.as_ref(), HotShotEvent::ViewSyncPreCommitVoteRecv(_))
586    }
587}
588
589#[async_trait]
590impl<TYPES: NodeType>
591    HandleVoteEvent<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>
592    for ViewSyncCommitVoteState<TYPES>
593{
594    async fn handle_vote_event(
595        &mut self,
596        event: Arc<HotShotEvent<TYPES>>,
597        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
598    ) -> Result<Option<ViewSyncCommitCertificate2<TYPES>>> {
599        match event.as_ref() {
600            HotShotEvent::ViewSyncCommitVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
601            _ => Ok(None),
602        }
603    }
604    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
605        matches!(event.as_ref(), HotShotEvent::ViewSyncCommitVoteRecv(_))
606    }
607}
608
609#[async_trait]
610impl<TYPES: NodeType>
611    HandleVoteEvent<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>
612    for ViewSyncFinalizeVoteState<TYPES>
613{
614    async fn handle_vote_event(
615        &mut self,
616        event: Arc<HotShotEvent<TYPES>>,
617        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
618    ) -> Result<Option<ViewSyncFinalizeCertificate2<TYPES>>> {
619        match event.as_ref() {
620            HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
621                self.accumulate_vote(vote, sender).await
622            },
623            _ => Ok(None),
624        }
625    }
626    fn filter(event: Arc<HotShotEvent<TYPES>>) -> bool {
627        matches!(event.as_ref(), HotShotEvent::ViewSyncFinalizeVoteRecv(_))
628    }
629}
630
631/// A map for extended quorum vote collectors
632pub type EpochRootVoteCollectorsMap<TYPES> =
633    BTreeMap<ViewNumber, EpochRootVoteCollectionTaskState<TYPES>>;
634
635pub struct EpochRootVoteCollectionTaskState<TYPES: NodeType> {
636    /// Public key for this node.
637    pub public_key: TYPES::SignatureKey,
638
639    /// Membership for voting
640    pub membership: EpochMembership<TYPES>,
641
642    /// accumulator for quorum votes
643    pub accumulator: Option<VoteAccumulator<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>>,
644
645    /// accumulator for light client state update votes
646    pub state_vote_accumulator: Option<LightClientStateUpdateVoteAccumulator<TYPES>>,
647
648    /// The view which we are collecting votes for
649    pub view: ViewNumber,
650
651    /// The epoch which we are collecting votes for
652    pub epoch: Option<EpochNumber>,
653
654    /// Node id
655    pub id: u64,
656}
657
658// Handlers for extended quorum vote accumulators
659impl<TYPES: NodeType> EpochRootVoteCollectionTaskState<TYPES> {
660    /// Take one vote and accumulate it. Returns the certs once formed.
661    async fn handle_vote_event(
662        &mut self,
663        event: Arc<HotShotEvent<TYPES>>,
664        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
665    ) -> Result<Option<EpochRootQuorumCertificateV2<TYPES>>> {
666        match event.as_ref() {
667            HotShotEvent::EpochRootQuorumVoteRecv(vote) => self.accumulate_vote(vote, sender).await,
668            _ => Ok(None),
669        }
670    }
671
672    /// Accumulate a vote and return the certificates if formed
673    async fn accumulate_vote(
674        &mut self,
675        vote: &EpochRootQuorumVote2<TYPES>,
676        event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
677    ) -> Result<Option<EpochRootQuorumCertificateV2<TYPES>>> {
678        let EpochRootQuorumVote2 { vote, state_vote } = vote;
679        ensure!(
680            vote.view_number() == self.view,
681            error!(
682                "Vote view does not match! vote view is {} current view is {}. This vote should \
683                 not have been passed to this accumulator.",
684                *vote.view_number(),
685                *self.view
686            )
687        );
688
689        let accumulator = self.accumulator.as_mut().context(warn!(
690            "No accumulator to handle extended quorum vote with. This shouldn't happen."
691        ))?;
692
693        let state_vote_accumulator = self.state_vote_accumulator.as_mut().context(warn!(
694            "No accumulator to handle light client state update vote with. This shouldn't happen."
695        ))?;
696
697        match (
698            accumulator.accumulate(vote, self.membership.clone()),
699            state_vote_accumulator.accumulate(&vote.signing_key(), state_vote, &self.membership),
700        ) {
701            (None, None) => Ok(None),
702            (Some(cert), Some(state_cert)) => {
703                let root_qc = EpochRootQuorumCertificateV2 {
704                    qc: cert,
705                    state_cert,
706                };
707
708                tracing::debug!("Certificate Formed! {root_qc:?}");
709
710                broadcast_event(
711                    Arc::new(HotShotEvent::EpochRootQcFormed(root_qc.clone())),
712                    event_stream,
713                )
714                .await;
715                self.accumulator = None;
716
717                Ok(Some(root_qc))
718            },
719            _ => Err(error!(
720                "Only one certificate formed for the epoch root, this should not happen."
721            )),
722        }
723    }
724}
725
726async fn create_epoch_root_vote_collection_task_state<TYPES: NodeType>(
727    info: &AccumulatorInfo<TYPES>,
728    event: Arc<HotShotEvent<TYPES>>,
729    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
730    upgrade_lock: UpgradeLock<TYPES>,
731) -> Result<EpochRootVoteCollectionTaskState<TYPES>> {
732    let new_accumulator = VoteAccumulator::<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>> {
733        vote_outcomes: HashMap::new(),
734        signers: HashMap::new(),
735        phantom: PhantomData,
736        upgrade_lock: upgrade_lock.clone(),
737    };
738    let state_vote_accumulator = LightClientStateUpdateVoteAccumulator {
739        vote_outcomes: HashMap::new(),
740        upgrade_lock,
741    };
742
743    let mut state = EpochRootVoteCollectionTaskState::<TYPES> {
744        membership: info.membership.clone(),
745        public_key: info.public_key.clone(),
746        accumulator: Some(new_accumulator),
747        state_vote_accumulator: Some(state_vote_accumulator),
748        view: info.view,
749        epoch: info.membership.epoch(),
750        id: info.id,
751    };
752
753    state.handle_vote_event(Arc::clone(&event), sender).await?;
754
755    Ok(state)
756}
757
758/// A helper function that handles quorum vote collection for epoch root
759///
760/// # Errors
761/// If we fail to handle the vote
762#[allow(clippy::too_many_arguments)]
763pub async fn handle_epoch_root_vote<TYPES: NodeType>(
764    collectors: &mut EpochRootVoteCollectorsMap<TYPES>,
765    vote: &EpochRootQuorumVote2<TYPES>,
766    public_key: TYPES::SignatureKey,
767    membership: &EpochMembership<TYPES>,
768    id: u64,
769    event: &Arc<HotShotEvent<TYPES>>,
770    event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
771    upgrade_lock: &UpgradeLock<TYPES>,
772) -> Result<()> {
773    match collectors.entry(vote.view_number()) {
774        Entry::Vacant(entry) => {
775            tracing::debug!(
776                "Starting epoch root quorum vote handle for view {:?}",
777                vote.view_number()
778            );
779            let info = AccumulatorInfo {
780                public_key,
781                membership: membership.clone(),
782                view: vote.view_number(),
783                id,
784            };
785            let collector = create_epoch_root_vote_collection_task_state(
786                &info,
787                Arc::clone(event),
788                event_stream,
789                upgrade_lock.clone(),
790            )
791            .await?;
792
793            entry.insert(collector);
794
795            Ok(())
796        },
797        Entry::Occupied(mut entry) => {
798            // handle the vote, and garbage collect if the vote collector is finished
799            if entry
800                .get_mut()
801                .handle_vote_event(Arc::clone(event), event_stream)
802                .await?
803                .is_some()
804            {
805                entry.remove();
806                *collectors = collectors.split_off(&vote.view_number());
807            }
808
809            Ok(())
810        },
811    }
812}