Skip to main content

hotshot_task_impls/
view_sync.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{collections::BTreeMap, fmt::Debug, sync::Arc, time::Duration};
8
9use async_broadcast::{Receiver, Sender};
10use async_lock::RwLock;
11use async_trait::async_trait;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14    data::{EpochNumber, ViewNumber},
15    epoch_membership::{EpochMembership, EpochMembershipCoordinator},
16    message::UpgradeLock,
17    simple_certificate::{
18        ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
19    },
20    simple_vote::{
21        HasEpoch, ViewSyncCommitData2, ViewSyncCommitVote2, ViewSyncFinalizeData2,
22        ViewSyncFinalizeVote2, ViewSyncPreCommitData2, ViewSyncPreCommitVote2,
23    },
24    stake_table::StakeTableEntries,
25    traits::{node_implementation::NodeType, signature_key::SignatureKey},
26    utils::EpochTransitionIndicator,
27    vote::{Certificate, HasViewNumber, Vote},
28};
29use hotshot_utils::anytrace::*;
30use tokio::{spawn, task::JoinHandle, time::sleep};
31use tracing::instrument;
32
33use crate::{
34    events::{HotShotEvent, HotShotTaskCompleted},
35    helpers::{broadcast_event, broadcast_view_change},
36    vote_collection::{
37        AccumulatorInfo, HandleVoteEvent, VoteCollectionTaskState, create_vote_accumulator,
38    },
39};
40
41#[derive(PartialEq, PartialOrd, Clone, Debug, Eq, Hash)]
42/// Phases of view sync
43pub enum ViewSyncPhase {
44    /// No phase; before the protocol has begun
45    None,
46    /// PreCommit phase
47    PreCommit,
48    /// Commit phase
49    Commit,
50    /// Finalize phase
51    Finalize,
52}
53
54type TaskMap<VAL> = BTreeMap<Option<EpochNumber>, BTreeMap<ViewNumber, VAL>>;
55
56/// Type alias for a map from View Number to Relay to Vote Task
57type RelayMap<TYPES, VOTE, CERT> =
58    TaskMap<BTreeMap<u64, VoteCollectionTaskState<TYPES, VOTE, CERT>>>;
59
60type ReplicaTaskMap<TYPES> = TaskMap<ViewSyncReplicaTaskState<TYPES>>;
61
62/// Main view sync task state
63pub struct ViewSyncTaskState<TYPES: NodeType> {
64    /// View HotShot is currently in
65    pub cur_view: ViewNumber,
66
67    /// View HotShot wishes to be in
68    pub next_view: ViewNumber,
69
70    /// Epoch HotShot is currently in
71    pub cur_epoch: Option<EpochNumber>,
72
73    /// Membership for the quorum
74    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
75
76    /// This Nodes Public Key
77    pub public_key: TYPES::SignatureKey,
78
79    /// Our Private Key
80    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
81
82    /// Our node id; for logging
83    pub id: u64,
84
85    /// How many timeouts we've seen in a row; is reset upon a successful view change
86    pub num_timeouts_tracked: u64,
87
88    /// Map of running replica tasks
89    pub replica_task_map: RwLock<ReplicaTaskMap<TYPES>>,
90
91    /// Map of pre-commit vote accumulates for the relay
92    pub pre_commit_relay_map: RwLock<
93        RelayMap<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>,
94    >,
95
96    /// Map of commit vote accumulates for the relay
97    pub commit_relay_map:
98        RwLock<RelayMap<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>>,
99
100    /// Map of finalize vote accumulates for the relay
101    pub finalize_relay_map:
102        RwLock<RelayMap<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>>,
103
104    /// Timeout duration for view sync rounds
105    pub view_sync_timeout: Duration,
106
107    /// Last view we garbage collected old tasks
108    pub last_garbage_collected_view: ViewNumber,
109
110    /// Lock for a decided upgrade
111    pub upgrade_lock: UpgradeLock<TYPES>,
112
113    /// First view in which epoch version takes effect
114    pub first_epoch: Option<(ViewNumber, EpochNumber)>,
115
116    /// Keeps track of the highest finalized view and epoch, used for garbage collection
117    pub highest_finalized_epoch_view: (Option<EpochNumber>, ViewNumber),
118
119    pub epoch_height: u64,
120}
121
122#[async_trait]
123impl<TYPES: NodeType> TaskState for ViewSyncTaskState<TYPES> {
124    type Event = HotShotEvent<TYPES>;
125
126    async fn handle_event(
127        &mut self,
128        event: Arc<Self::Event>,
129        sender: &Sender<Arc<Self::Event>>,
130        _receiver: &Receiver<Arc<Self::Event>>,
131    ) -> Result<()> {
132        if self.upgrade_lock.new_protocol_active(self.cur_view) {
133            return Ok(());
134        }
135        self.handle(event, sender.clone()).await
136    }
137
138    fn cancel_subtasks(&mut self) {}
139}
140
141/// State of a view sync replica task
142pub struct ViewSyncReplicaTaskState<TYPES: NodeType> {
143    /// Timeout for view sync rounds
144    pub view_sync_timeout: Duration,
145
146    /// Current round HotShot is in
147    pub cur_view: ViewNumber,
148
149    /// Round HotShot wishes to be in
150    pub next_view: ViewNumber,
151
152    /// The relay index we are currently on
153    pub relay: u64,
154
155    /// Whether we have seen a finalized certificate
156    pub finalized: bool,
157
158    /// Whether we have already sent a view change event for `next_view`
159    pub sent_view_change_event: bool,
160
161    /// Timeout task handle, when it expires we try the next relay
162    pub timeout_task: Option<JoinHandle<()>>,
163
164    /// Our node id; for logging
165    pub id: u64,
166
167    /// Membership for the quorum
168    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
169
170    /// This Nodes Public Key
171    pub public_key: TYPES::SignatureKey,
172
173    /// Our Private Key
174    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
175
176    /// Lock for a decided upgrade
177    pub upgrade_lock: UpgradeLock<TYPES>,
178
179    /// Epoch HotShot was in when this task was created
180    pub cur_epoch: Option<EpochNumber>,
181
182    /// First view in which epoch version takes effect
183    pub first_epoch: Option<(ViewNumber, EpochNumber)>,
184}
185
186#[async_trait]
187impl<TYPES: NodeType> TaskState for ViewSyncReplicaTaskState<TYPES> {
188    type Event = HotShotEvent<TYPES>;
189
190    async fn handle_event(
191        &mut self,
192        event: Arc<Self::Event>,
193        sender: &Sender<Arc<Self::Event>>,
194        _receiver: &Receiver<Arc<Self::Event>>,
195    ) -> Result<()> {
196        if self.upgrade_lock.new_protocol_active(self.cur_view) {
197            return Ok(());
198        }
199        self.handle(event, sender.clone()).await;
200
201        Ok(())
202    }
203
204    fn cancel_subtasks(&mut self) {}
205}
206
207impl<TYPES: NodeType> ViewSyncTaskState<TYPES> {
208    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "View Sync Main Task", level = "error")]
209    #[allow(clippy::type_complexity)]
210    /// Handles incoming events for the main view sync task
211    pub async fn send_to_or_create_replica(
212        &mut self,
213        event: Arc<HotShotEvent<TYPES>>,
214        view: ViewNumber,
215        epoch: Option<EpochNumber>,
216        sender: &Sender<Arc<HotShotEvent<TYPES>>>,
217    ) {
218        let mut task_map = self.replica_task_map.write().await;
219
220        if let Some(replica_task) = task_map.get_mut(&epoch).and_then(|x| x.get_mut(&view)) {
221            // Forward event then return
222            tracing::debug!("Forwarding message");
223            let result = replica_task
224                .handle(Arc::clone(&event), sender.clone())
225                .await;
226
227            if result == Some(HotShotTaskCompleted) {
228                // The protocol has finished
229                if epoch >= self.highest_finalized_epoch_view.0
230                    && view > self.highest_finalized_epoch_view.1
231                {
232                    self.highest_finalized_epoch_view = (epoch, view);
233                } else if view > self.highest_finalized_epoch_view.1 {
234                    tracing::error!(
235                        "We finalized a higher view but the epoch is lower. This should never \
236                         happen. Current highest finalized epoch view: {:?}, new highest \
237                         finalized epoch view: {:?}",
238                        self.highest_finalized_epoch_view,
239                        (epoch, view)
240                    );
241                }
242                task_map.get_mut(&epoch).and_then(|x| x.remove(&view));
243                task_map.retain(|_, x| !x.is_empty());
244                drop(task_map);
245
246                // Garbage collect old tasks
247                self.garbage_collect_tasks().await;
248                return;
249            }
250
251            return;
252        }
253
254        // We do not have a replica task already running, so start one
255        let mut replica_state: ViewSyncReplicaTaskState<TYPES> = ViewSyncReplicaTaskState {
256            cur_view: view,
257            next_view: view,
258            relay: 0,
259            finalized: false,
260            sent_view_change_event: false,
261            timeout_task: None,
262            membership_coordinator: self.membership_coordinator.clone(),
263            public_key: self.public_key.clone(),
264            private_key: self.private_key.clone(),
265            view_sync_timeout: self.view_sync_timeout,
266            id: self.id,
267            upgrade_lock: self.upgrade_lock.clone(),
268            cur_epoch: self.cur_epoch,
269            first_epoch: self.first_epoch,
270        };
271
272        let result = replica_state
273            .handle(Arc::clone(&event), sender.clone())
274            .await;
275
276        if result == Some(HotShotTaskCompleted) {
277            // The protocol has finished
278            return;
279        }
280
281        task_map
282            .entry(epoch)
283            .or_default()
284            .insert(view, replica_state);
285    }
286
287    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Main Task", level = "error")]
288    #[allow(clippy::type_complexity)]
289    /// Handles incoming events for the main view sync task
290    pub async fn handle(
291        &mut self,
292        event: Arc<HotShotEvent<TYPES>>,
293        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
294    ) -> Result<()> {
295        match event.as_ref() {
296            HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
297                tracing::debug!("Received view sync cert for phase {certificate:?}");
298                let view = certificate.view_number();
299                self.send_to_or_create_replica(
300                    Arc::clone(&event),
301                    view,
302                    certificate.epoch(),
303                    &event_stream,
304                )
305                .await;
306            },
307            HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
308                tracing::debug!("Received view sync cert for phase {certificate:?}");
309                let view = certificate.view_number();
310                self.send_to_or_create_replica(
311                    Arc::clone(&event),
312                    view,
313                    certificate.epoch(),
314                    &event_stream,
315                )
316                .await;
317            },
318            HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
319                tracing::debug!("Received view sync cert for phase {certificate:?}");
320                let view = certificate.view_number();
321                self.send_to_or_create_replica(
322                    Arc::clone(&event),
323                    view,
324                    certificate.epoch(),
325                    &event_stream,
326                )
327                .await;
328            },
329            HotShotEvent::ViewSyncTimeout(view, ..) => {
330                tracing::debug!("view sync timeout in main task {view:?}");
331                let view = *view;
332                self.send_to_or_create_replica(
333                    Arc::clone(&event),
334                    view,
335                    self.cur_epoch,
336                    &event_stream,
337                )
338                .await;
339            },
340
341            HotShotEvent::ViewSyncPreCommitVoteRecv(vote) => {
342                let mut map = self.pre_commit_relay_map.write().await;
343                let vote_view = vote.view_number();
344                let relay = vote.date().relay;
345                let relay_map = map
346                    .entry(vote.date().epoch)
347                    .or_insert(BTreeMap::new())
348                    .entry(vote_view)
349                    .or_insert(BTreeMap::new());
350                if let Some(relay_task) = relay_map.get_mut(&relay) {
351                    tracing::debug!("Forwarding message");
352
353                    // Handle the vote and check if the accumulator has returned successfully
354                    if relay_task
355                        .handle_vote_event(Arc::clone(&event), &event_stream)
356                        .await?
357                        .is_some()
358                    {
359                        map.get_mut(&vote.date().epoch)
360                            .and_then(|x| x.remove(&vote_view));
361                        map.retain(|_, x| !x.is_empty());
362                    }
363
364                    return Ok(());
365                }
366
367                let epoch_mem = self
368                    .membership_coordinator
369                    .membership_for_epoch(vote.date().epoch)?;
370                // We do not have a relay task already running, so start one
371                ensure!(
372                    epoch_mem.leader(vote_view + relay)? == self.public_key,
373                    "View sync vote sent to wrong leader"
374                );
375
376                let info = AccumulatorInfo {
377                    public_key: self.public_key.clone(),
378                    membership: epoch_mem,
379                    view: vote_view,
380                    id: self.id,
381                };
382                let vote_collector = create_vote_accumulator(
383                    &info,
384                    event,
385                    &event_stream,
386                    self.upgrade_lock.clone(),
387                    EpochTransitionIndicator::NotInTransition,
388                )
389                .await?;
390
391                relay_map.insert(relay, vote_collector);
392            },
393
394            HotShotEvent::ViewSyncCommitVoteRecv(vote) => {
395                let mut map = self.commit_relay_map.write().await;
396                let vote_view = vote.view_number();
397                let relay = vote.date().relay;
398                let relay_map = map
399                    .entry(vote.date().epoch)
400                    .or_insert(BTreeMap::new())
401                    .entry(vote_view)
402                    .or_insert(BTreeMap::new());
403                if let Some(relay_task) = relay_map.get_mut(&relay) {
404                    tracing::debug!("Forwarding message");
405
406                    // Handle the vote and check if the accumulator has returned successfully
407                    if relay_task
408                        .handle_vote_event(Arc::clone(&event), &event_stream)
409                        .await?
410                        .is_some()
411                    {
412                        map.get_mut(&vote.date().epoch)
413                            .and_then(|x| x.remove(&vote_view));
414                        map.retain(|_, x| !x.is_empty());
415                    }
416
417                    return Ok(());
418                }
419
420                // We do not have a relay task already running, so start one
421                let epoch_mem = self
422                    .membership_coordinator
423                    .membership_for_epoch(vote.date().epoch)?;
424                ensure!(
425                    epoch_mem.leader(vote_view + relay)? == self.public_key,
426                    debug!("View sync vote sent to wrong leader")
427                );
428
429                let info = AccumulatorInfo {
430                    public_key: self.public_key.clone(),
431                    membership: epoch_mem,
432                    view: vote_view,
433                    id: self.id,
434                };
435
436                let vote_collector = create_vote_accumulator(
437                    &info,
438                    event,
439                    &event_stream,
440                    self.upgrade_lock.clone(),
441                    EpochTransitionIndicator::NotInTransition,
442                )
443                .await?;
444                relay_map.insert(relay, vote_collector);
445            },
446
447            HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
448                let mut map = self.finalize_relay_map.write().await;
449                let vote_view = vote.view_number();
450                let relay = vote.date().relay;
451                let relay_map = map
452                    .entry(vote.date().epoch)
453                    .or_insert(BTreeMap::new())
454                    .entry(vote_view)
455                    .or_insert(BTreeMap::new());
456                if let Some(relay_task) = relay_map.get_mut(&relay) {
457                    tracing::debug!("Forwarding message");
458
459                    // Handle the vote and check if the accumulator has returned successfully
460                    if relay_task
461                        .handle_vote_event(Arc::clone(&event), &event_stream)
462                        .await?
463                        .is_some()
464                    {
465                        map.get_mut(&vote.date().epoch)
466                            .and_then(|x| x.remove(&vote_view));
467                        map.retain(|_, x| !x.is_empty());
468                    }
469
470                    return Ok(());
471                }
472
473                let epoch_mem = self
474                    .membership_coordinator
475                    .membership_for_epoch(vote.date().epoch)?;
476                // We do not have a relay task already running, so start one
477                ensure!(
478                    epoch_mem.leader(vote_view + relay)? == self.public_key,
479                    debug!("View sync vote sent to wrong leader")
480                );
481
482                let info = AccumulatorInfo {
483                    public_key: self.public_key.clone(),
484                    membership: epoch_mem,
485                    view: vote_view,
486                    id: self.id,
487                };
488                let vote_collector = create_vote_accumulator(
489                    &info,
490                    event,
491                    &event_stream,
492                    self.upgrade_lock.clone(),
493                    EpochTransitionIndicator::NotInTransition,
494                )
495                .await;
496                if let Ok(vote_task) = vote_collector {
497                    relay_map.insert(relay, vote_task);
498                }
499            },
500
501            &HotShotEvent::ViewChange(new_view, epoch) => {
502                if epoch > self.cur_epoch {
503                    self.cur_epoch = epoch;
504                }
505                let new_view = ViewNumber::new(*new_view);
506                if self.cur_view < new_view {
507                    tracing::debug!(
508                        "Change from view {} to view {} in view sync task",
509                        *self.cur_view,
510                        *new_view
511                    );
512
513                    self.cur_view = new_view;
514                    self.next_view = self.cur_view;
515                    self.num_timeouts_tracked = 0;
516                }
517
518                self.garbage_collect_tasks().await;
519            },
520            HotShotEvent::LeavesDecided(leaves) => {
521                let finalized_epoch = self.highest_finalized_epoch_view.0.max(
522                    leaves
523                        .iter()
524                        .map(|leaf| leaf.epoch(self.epoch_height))
525                        .max()
526                        .unwrap_or(None),
527                );
528                let finalized_view = self.highest_finalized_epoch_view.1.max(
529                    leaves
530                        .iter()
531                        .map(|leaf| leaf.view_number())
532                        .max()
533                        .unwrap_or(ViewNumber::new(0)),
534                );
535
536                self.highest_finalized_epoch_view = (finalized_epoch, finalized_view);
537
538                self.garbage_collect_tasks().await;
539            },
540            &HotShotEvent::Timeout(view_number, ..) => {
541                // This is an old timeout and we can ignore it
542                ensure!(
543                    view_number >= self.cur_view,
544                    debug!("Discarding old timeout vote.")
545                );
546
547                self.num_timeouts_tracked += 1;
548
549                if self.num_timeouts_tracked >= 3 {
550                    tracing::error!("Too many consecutive timeouts!  This shouldn't happen");
551                }
552
553                if self.num_timeouts_tracked >= 2 {
554                    tracing::error!("Starting view sync protocol for view {}", *view_number + 1);
555
556                    self.send_to_or_create_replica(
557                        Arc::new(HotShotEvent::ViewSyncTrigger(view_number + 1)),
558                        view_number + 1,
559                        self.cur_epoch,
560                        &event_stream,
561                    )
562                    .await;
563                } else {
564                    // If this is the first timeout we've seen advance to the next view
565                    self.cur_view = view_number + 1;
566                    broadcast_view_change(
567                        &event_stream,
568                        self.cur_view,
569                        self.cur_epoch,
570                        self.first_epoch,
571                    )
572                    .await;
573                }
574                let leader = self
575                    .membership_coordinator
576                    .membership_for_epoch(self.cur_epoch)?
577                    .leader(view_number)?;
578                tracing::warn!(
579                    %leader,
580                    leader_mnemonic = hotshot_types::utils::mnemonic(&leader),
581                    view_number = *view_number,
582                    num_timeouts_tracked = self.num_timeouts_tracked,
583                    "view timed out",
584                );
585            },
586            HotShotEvent::SetFirstEpoch(view, epoch) => {
587                self.first_epoch = Some((*view, *epoch));
588            },
589
590            _ => {},
591        }
592        Ok(())
593    }
594
595    /// Garbage collect tasks for epochs older than the highest finalized epoch
596    /// or older than the previous epoch, whichever is greater.
597    /// Garbage collect views older than the highest finalized view including the highest finalized view.
598    async fn garbage_collect_tasks(&self) {
599        let previous_epoch = self
600            .cur_epoch
601            .map(|e| e.saturating_sub(1))
602            .map(EpochNumber::new);
603        let gc_epoch = self.highest_finalized_epoch_view.0.max(previous_epoch);
604        Self::garbage_collect_tasks_helper(
605            &self.replica_task_map,
606            &gc_epoch,
607            &self.highest_finalized_epoch_view.1,
608        )
609        .await;
610        Self::garbage_collect_tasks_helper(
611            &self.pre_commit_relay_map,
612            &gc_epoch,
613            &self.highest_finalized_epoch_view.1,
614        )
615        .await;
616        Self::garbage_collect_tasks_helper(
617            &self.commit_relay_map,
618            &gc_epoch,
619            &self.highest_finalized_epoch_view.1,
620        )
621        .await;
622        Self::garbage_collect_tasks_helper(
623            &self.finalize_relay_map,
624            &gc_epoch,
625            &self.highest_finalized_epoch_view.1,
626        )
627        .await;
628    }
629
630    async fn garbage_collect_tasks_helper<VAL>(
631        map: &RwLock<TaskMap<VAL>>,
632        gc_epoch: &Option<EpochNumber>,
633        gc_view: &ViewNumber,
634    ) {
635        let mut task_map = map.write().await;
636        task_map.retain(|e, _| e >= gc_epoch);
637        if let Some(view_map) = task_map.get_mut(gc_epoch) {
638            view_map.retain(|v, _| v > gc_view)
639        };
640        task_map.retain(|_, view_map| !view_map.is_empty());
641    }
642}
643
644impl<TYPES: NodeType> ViewSyncReplicaTaskState<TYPES> {
645    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Replica Task", level = "error")]
646    /// Handle incoming events for the view sync replica task
647    pub async fn handle(
648        &mut self,
649        event: Arc<HotShotEvent<TYPES>>,
650        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
651    ) -> Option<HotShotTaskCompleted> {
652        match event.as_ref() {
653            HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
654                let last_seen_certificate = ViewSyncPhase::PreCommit;
655
656                // Ignore certificate if it is for an older round
657                if certificate.view_number() < self.next_view {
658                    tracing::warn!("We're already in a higher round");
659
660                    return None;
661                }
662
663                let membership = self.membership_for_epoch(certificate.epoch()).await?;
664                let membership_stake_table =
665                    StakeTableEntries::from_iter(membership.stake_table()).0;
666                let membership_failure_threshold = membership.failure_threshold();
667
668                // If certificate is not valid, return current state
669                if let Err(e) = certificate.is_valid_cert(
670                    &membership_stake_table,
671                    membership_failure_threshold,
672                    &self.upgrade_lock,
673                ) {
674                    tracing::error!(
675                        "Not valid view sync cert! data: {:?}, error: {}",
676                        certificate.data(),
677                        e
678                    );
679
680                    return None;
681                }
682
683                // If certificate is for a higher round shutdown this task
684                // since another task should have been started for the higher round
685                if certificate.view_number() > self.next_view {
686                    return Some(HotShotTaskCompleted);
687                }
688
689                if certificate.data().relay > self.relay {
690                    self.relay = certificate.data().relay;
691                }
692
693                let Ok(vote) = ViewSyncCommitVote2::<TYPES>::create_signed_vote(
694                    ViewSyncCommitData2 {
695                        relay: certificate.data().relay,
696                        round: self.next_view,
697                        epoch: certificate.data().epoch,
698                    },
699                    self.next_view,
700                    &self.public_key,
701                    &self.private_key,
702                    &self.upgrade_lock,
703                ) else {
704                    tracing::error!("Failed to sign ViewSyncCommitData!");
705                    return None;
706                };
707
708                broadcast_event(
709                    Arc::new(HotShotEvent::ViewSyncCommitVoteSend(vote)),
710                    &event_stream,
711                )
712                .await;
713
714                if let Some(timeout_task) = self.timeout_task.take() {
715                    timeout_task.abort();
716                }
717
718                self.timeout_task = Some(spawn({
719                    let stream = event_stream.clone();
720                    let phase = last_seen_certificate;
721                    let relay = self.relay;
722                    let next_view = self.next_view;
723                    let timeout = self.view_sync_timeout;
724                    async move {
725                        sleep(timeout).await;
726                        tracing::warn!(
727                            "Vote sending timed out in ViewSyncPreCommitCertificateRecv, Relay = \
728                             {relay}"
729                        );
730
731                        broadcast_event(
732                            Arc::new(HotShotEvent::ViewSyncTimeout(
733                                ViewNumber::new(*next_view),
734                                relay,
735                                phase,
736                            )),
737                            &stream,
738                        )
739                        .await;
740                    }
741                }));
742            },
743
744            HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
745                let last_seen_certificate = ViewSyncPhase::Commit;
746
747                // Ignore certificate if it is for an older round
748                if certificate.view_number() < self.next_view {
749                    tracing::warn!("We're already in a higher round");
750
751                    return None;
752                }
753
754                let membership = self.membership_for_epoch(certificate.epoch()).await?;
755                let membership_stake_table =
756                    StakeTableEntries::from_iter(membership.stake_table()).0;
757                let membership_success_threshold = membership.success_threshold();
758
759                // If certificate is not valid, return current state
760                if let Err(e) = certificate.is_valid_cert(
761                    &membership_stake_table,
762                    membership_success_threshold,
763                    &self.upgrade_lock,
764                ) {
765                    tracing::error!(
766                        "Not valid view sync cert! data: {:?}, error: {}",
767                        certificate.data(),
768                        e
769                    );
770
771                    return None;
772                }
773
774                // If certificate is for a higher round shutdown this task
775                // since another task should have been started for the higher round
776                if certificate.view_number() > self.next_view {
777                    return Some(HotShotTaskCompleted);
778                }
779
780                if certificate.data().relay > self.relay {
781                    self.relay = certificate.data().relay;
782                }
783
784                let Ok(vote) = ViewSyncFinalizeVote2::<TYPES>::create_signed_vote(
785                    ViewSyncFinalizeData2 {
786                        relay: certificate.data().relay,
787                        round: self.next_view,
788                        epoch: certificate.data().epoch,
789                    },
790                    self.next_view,
791                    &self.public_key,
792                    &self.private_key,
793                    &self.upgrade_lock,
794                ) else {
795                    tracing::error!("Failed to sign view sync finalized vote!");
796                    return None;
797                };
798
799                broadcast_event(
800                    Arc::new(HotShotEvent::ViewSyncFinalizeVoteSend(vote)),
801                    &event_stream,
802                )
803                .await;
804
805                tracing::info!(
806                    "View sync protocol has received view sync evidence to update the view to {}",
807                    *self.next_view
808                );
809
810                broadcast_view_change(
811                    &event_stream,
812                    self.next_view,
813                    certificate.epoch(),
814                    self.first_epoch,
815                )
816                .await;
817
818                if let Some(timeout_task) = self.timeout_task.take() {
819                    timeout_task.abort();
820                }
821                self.timeout_task = Some(spawn({
822                    let stream = event_stream.clone();
823                    let phase = last_seen_certificate;
824                    let relay = self.relay;
825                    let next_view = self.next_view;
826                    let timeout = self.view_sync_timeout;
827                    async move {
828                        sleep(timeout).await;
829                        tracing::warn!(
830                            "Vote sending timed out in ViewSyncCommitCertificateRecv, relay = \
831                             {relay}"
832                        );
833                        broadcast_event(
834                            Arc::new(HotShotEvent::ViewSyncTimeout(
835                                ViewNumber::new(*next_view),
836                                relay,
837                                phase,
838                            )),
839                            &stream,
840                        )
841                        .await;
842                    }
843                }));
844            },
845
846            HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
847                // Ignore certificate if it is for an older round
848                if certificate.view_number() < self.next_view {
849                    tracing::warn!("We're already in a higher round");
850
851                    return None;
852                }
853
854                let membership = self.membership_for_epoch(certificate.epoch()).await?;
855                let membership_stake_table =
856                    StakeTableEntries::from_iter(membership.stake_table()).0;
857                let membership_success_threshold = membership.success_threshold();
858
859                // If certificate is not valid, return current state
860                if let Err(e) = certificate.is_valid_cert(
861                    &membership_stake_table,
862                    membership_success_threshold,
863                    &self.upgrade_lock,
864                ) {
865                    tracing::error!(
866                        "Not valid view sync cert! data: {:?}, error: {}",
867                        certificate.data(),
868                        e
869                    );
870
871                    return None;
872                }
873
874                // If certificate is for a higher round shutdown this task
875                // since another task should have been started for the higher round
876                if certificate.view_number() > self.next_view {
877                    return Some(HotShotTaskCompleted);
878                }
879
880                if certificate.data().relay > self.relay {
881                    self.relay = certificate.data().relay;
882                }
883
884                if let Some(timeout_task) = self.timeout_task.take() {
885                    timeout_task.abort();
886                }
887
888                broadcast_view_change(
889                    &event_stream,
890                    self.next_view,
891                    certificate.epoch(),
892                    self.first_epoch,
893                )
894                .await;
895                return Some(HotShotTaskCompleted);
896            },
897
898            HotShotEvent::ViewSyncTrigger(view_number) => {
899                let view_number = *view_number;
900                if self.next_view != ViewNumber::new(*view_number) {
901                    tracing::error!("Unexpected view number to trigger view sync");
902                    return None;
903                }
904
905                let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
906                    ViewSyncPreCommitData2 {
907                        relay: 0,
908                        round: view_number,
909                        epoch: self.cur_epoch,
910                    },
911                    view_number,
912                    &self.public_key,
913                    &self.private_key,
914                    &self.upgrade_lock,
915                ) else {
916                    tracing::error!("Failed to sign pre commit vote!");
917                    return None;
918                };
919
920                broadcast_event(
921                    Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
922                    &event_stream,
923                )
924                .await;
925
926                self.timeout_task = Some(spawn({
927                    let stream = event_stream.clone();
928                    let relay = self.relay;
929                    let next_view = self.next_view;
930                    let timeout = self.view_sync_timeout;
931                    async move {
932                        sleep(timeout).await;
933                        tracing::warn!("Vote sending timed out in ViewSyncTrigger");
934                        broadcast_event(
935                            Arc::new(HotShotEvent::ViewSyncTimeout(
936                                ViewNumber::new(*next_view),
937                                relay,
938                                ViewSyncPhase::None,
939                            )),
940                            &stream,
941                        )
942                        .await;
943                    }
944                }));
945
946                return None;
947            },
948
949            HotShotEvent::ViewSyncTimeout(round, relay, last_seen_certificate) => {
950                let round = *round;
951                // Shouldn't ever receive a timeout for a relay higher than ours
952                if ViewNumber::new(*round) == self.next_view && *relay == self.relay {
953                    if let Some(timeout_task) = self.timeout_task.take() {
954                        timeout_task.abort();
955                    }
956                    self.relay += 1;
957                    match last_seen_certificate {
958                        ViewSyncPhase::None | ViewSyncPhase::PreCommit | ViewSyncPhase::Commit => {
959                            let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
960                                ViewSyncPreCommitData2 {
961                                    relay: self.relay,
962                                    round: self.next_view,
963                                    epoch: self.cur_epoch,
964                                },
965                                self.next_view,
966                                &self.public_key,
967                                &self.private_key,
968                                &self.upgrade_lock,
969                            ) else {
970                                tracing::error!("Failed to sign ViewSyncPreCommitData!");
971                                return None;
972                            };
973
974                            broadcast_event(
975                                Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
976                                &event_stream,
977                            )
978                            .await;
979                        },
980                        ViewSyncPhase::Finalize => {
981                            // This should never occur
982                            unimplemented!()
983                        },
984                    }
985
986                    self.timeout_task = Some(spawn({
987                        let stream = event_stream.clone();
988                        let relay = self.relay;
989                        let next_view = self.next_view;
990                        let timeout = self.view_sync_timeout;
991                        let last_cert = last_seen_certificate.clone();
992                        async move {
993                            sleep(timeout).await;
994                            tracing::warn!(
995                                "Vote sending timed out in ViewSyncTimeout relay = {relay}"
996                            );
997                            broadcast_event(
998                                Arc::new(HotShotEvent::ViewSyncTimeout(
999                                    ViewNumber::new(*next_view),
1000                                    relay,
1001                                    last_cert,
1002                                )),
1003                                &stream,
1004                            )
1005                            .await;
1006                        }
1007                    }));
1008
1009                    return None;
1010                }
1011            },
1012            _ => return None,
1013        }
1014        None
1015    }
1016
1017    pub async fn membership_for_epoch(
1018        &self,
1019        epoch: Option<EpochNumber>,
1020    ) -> Option<EpochMembership<TYPES>> {
1021        match self.membership_coordinator.membership_for_epoch(epoch) {
1022            Ok(m) => Some(m),
1023            Err(e) => {
1024                tracing::warn!(e.message);
1025                None
1026            },
1027        }
1028    }
1029}