1use std::{collections::BTreeMap, fmt::Debug, sync::Arc, time::Duration};
8
9use async_broadcast::{Receiver, Sender};
10use async_lock::RwLock;
11use async_trait::async_trait;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14 data::{EpochNumber, ViewNumber},
15 epoch_membership::{EpochMembership, EpochMembershipCoordinator},
16 message::UpgradeLock,
17 simple_certificate::{
18 ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
19 },
20 simple_vote::{
21 HasEpoch, ViewSyncCommitData2, ViewSyncCommitVote2, ViewSyncFinalizeData2,
22 ViewSyncFinalizeVote2, ViewSyncPreCommitData2, ViewSyncPreCommitVote2,
23 },
24 stake_table::StakeTableEntries,
25 traits::{node_implementation::NodeType, signature_key::SignatureKey},
26 utils::EpochTransitionIndicator,
27 vote::{Certificate, HasViewNumber, Vote},
28};
29use hotshot_utils::anytrace::*;
30use tokio::{spawn, task::JoinHandle, time::sleep};
31use tracing::instrument;
32
33use crate::{
34 events::{HotShotEvent, HotShotTaskCompleted},
35 helpers::{broadcast_event, broadcast_view_change},
36 vote_collection::{
37 AccumulatorInfo, HandleVoteEvent, VoteCollectionTaskState, create_vote_accumulator,
38 },
39};
40
41#[derive(PartialEq, PartialOrd, Clone, Debug, Eq, Hash)]
42pub enum ViewSyncPhase {
44 None,
46 PreCommit,
48 Commit,
50 Finalize,
52}
53
54type TaskMap<VAL> = BTreeMap<Option<EpochNumber>, BTreeMap<ViewNumber, VAL>>;
55
56type RelayMap<TYPES, VOTE, CERT> =
58 TaskMap<BTreeMap<u64, VoteCollectionTaskState<TYPES, VOTE, CERT>>>;
59
60type ReplicaTaskMap<TYPES> = TaskMap<ViewSyncReplicaTaskState<TYPES>>;
61
62pub struct ViewSyncTaskState<TYPES: NodeType> {
64 pub cur_view: ViewNumber,
66
67 pub next_view: ViewNumber,
69
70 pub cur_epoch: Option<EpochNumber>,
72
73 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
75
76 pub public_key: TYPES::SignatureKey,
78
79 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
81
82 pub id: u64,
84
85 pub num_timeouts_tracked: u64,
87
88 pub replica_task_map: RwLock<ReplicaTaskMap<TYPES>>,
90
91 pub pre_commit_relay_map: RwLock<
93 RelayMap<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>,
94 >,
95
96 pub commit_relay_map:
98 RwLock<RelayMap<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>>,
99
100 pub finalize_relay_map:
102 RwLock<RelayMap<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>>,
103
104 pub view_sync_timeout: Duration,
106
107 pub last_garbage_collected_view: ViewNumber,
109
110 pub upgrade_lock: UpgradeLock<TYPES>,
112
113 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
115
116 pub highest_finalized_epoch_view: (Option<EpochNumber>, ViewNumber),
118
119 pub epoch_height: u64,
120}
121
122#[async_trait]
123impl<TYPES: NodeType> TaskState for ViewSyncTaskState<TYPES> {
124 type Event = HotShotEvent<TYPES>;
125
126 async fn handle_event(
127 &mut self,
128 event: Arc<Self::Event>,
129 sender: &Sender<Arc<Self::Event>>,
130 _receiver: &Receiver<Arc<Self::Event>>,
131 ) -> Result<()> {
132 self.handle(event, sender.clone()).await
133 }
134
135 fn cancel_subtasks(&mut self) {}
136}
137
138pub struct ViewSyncReplicaTaskState<TYPES: NodeType> {
140 pub view_sync_timeout: Duration,
142
143 pub cur_view: ViewNumber,
145
146 pub next_view: ViewNumber,
148
149 pub relay: u64,
151
152 pub finalized: bool,
154
155 pub sent_view_change_event: bool,
157
158 pub timeout_task: Option<JoinHandle<()>>,
160
161 pub id: u64,
163
164 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
166
167 pub public_key: TYPES::SignatureKey,
169
170 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
172
173 pub upgrade_lock: UpgradeLock<TYPES>,
175
176 pub cur_epoch: Option<EpochNumber>,
178
179 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
181}
182
183#[async_trait]
184impl<TYPES: NodeType> TaskState for ViewSyncReplicaTaskState<TYPES> {
185 type Event = HotShotEvent<TYPES>;
186
187 async fn handle_event(
188 &mut self,
189 event: Arc<Self::Event>,
190 sender: &Sender<Arc<Self::Event>>,
191 _receiver: &Receiver<Arc<Self::Event>>,
192 ) -> Result<()> {
193 self.handle(event, sender.clone()).await;
194
195 Ok(())
196 }
197
198 fn cancel_subtasks(&mut self) {}
199}
200
201impl<TYPES: NodeType> ViewSyncTaskState<TYPES> {
202 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "View Sync Main Task", level = "error")]
203 #[allow(clippy::type_complexity)]
204 pub async fn send_to_or_create_replica(
206 &mut self,
207 event: Arc<HotShotEvent<TYPES>>,
208 view: ViewNumber,
209 epoch: Option<EpochNumber>,
210 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
211 ) {
212 let mut task_map = self.replica_task_map.write().await;
213
214 if let Some(replica_task) = task_map.get_mut(&epoch).and_then(|x| x.get_mut(&view)) {
215 tracing::debug!("Forwarding message");
217 let result = replica_task
218 .handle(Arc::clone(&event), sender.clone())
219 .await;
220
221 if result == Some(HotShotTaskCompleted) {
222 if epoch >= self.highest_finalized_epoch_view.0
224 && view > self.highest_finalized_epoch_view.1
225 {
226 self.highest_finalized_epoch_view = (epoch, view);
227 } else if view > self.highest_finalized_epoch_view.1 {
228 tracing::error!(
229 "We finalized a higher view but the epoch is lower. This should never \
230 happen. Current highest finalized epoch view: {:?}, new highest \
231 finalized epoch view: {:?}",
232 self.highest_finalized_epoch_view,
233 (epoch, view)
234 );
235 }
236 task_map.get_mut(&epoch).and_then(|x| x.remove(&view));
237 task_map.retain(|_, x| !x.is_empty());
238 drop(task_map);
239
240 self.garbage_collect_tasks().await;
242 return;
243 }
244
245 return;
246 }
247
248 let mut replica_state: ViewSyncReplicaTaskState<TYPES> = ViewSyncReplicaTaskState {
250 cur_view: view,
251 next_view: view,
252 relay: 0,
253 finalized: false,
254 sent_view_change_event: false,
255 timeout_task: None,
256 membership_coordinator: self.membership_coordinator.clone(),
257 public_key: self.public_key.clone(),
258 private_key: self.private_key.clone(),
259 view_sync_timeout: self.view_sync_timeout,
260 id: self.id,
261 upgrade_lock: self.upgrade_lock.clone(),
262 cur_epoch: self.cur_epoch,
263 first_epoch: self.first_epoch,
264 };
265
266 let result = replica_state
267 .handle(Arc::clone(&event), sender.clone())
268 .await;
269
270 if result == Some(HotShotTaskCompleted) {
271 return;
273 }
274
275 task_map
276 .entry(epoch)
277 .or_default()
278 .insert(view, replica_state);
279 }
280
281 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Main Task", level = "error")]
282 #[allow(clippy::type_complexity)]
283 pub async fn handle(
285 &mut self,
286 event: Arc<HotShotEvent<TYPES>>,
287 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
288 ) -> Result<()> {
289 match event.as_ref() {
290 HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
291 tracing::debug!("Received view sync cert for phase {certificate:?}");
292 let view = certificate.view_number();
293 self.send_to_or_create_replica(
294 Arc::clone(&event),
295 view,
296 certificate.epoch(),
297 &event_stream,
298 )
299 .await;
300 },
301 HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
302 tracing::debug!("Received view sync cert for phase {certificate:?}");
303 let view = certificate.view_number();
304 self.send_to_or_create_replica(
305 Arc::clone(&event),
306 view,
307 certificate.epoch(),
308 &event_stream,
309 )
310 .await;
311 },
312 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
313 tracing::debug!("Received view sync cert for phase {certificate:?}");
314 let view = certificate.view_number();
315 self.send_to_or_create_replica(
316 Arc::clone(&event),
317 view,
318 certificate.epoch(),
319 &event_stream,
320 )
321 .await;
322 },
323 HotShotEvent::ViewSyncTimeout(view, ..) => {
324 tracing::debug!("view sync timeout in main task {view:?}");
325 let view = *view;
326 self.send_to_or_create_replica(
327 Arc::clone(&event),
328 view,
329 self.cur_epoch,
330 &event_stream,
331 )
332 .await;
333 },
334
335 HotShotEvent::ViewSyncPreCommitVoteRecv(vote) => {
336 let mut map = self.pre_commit_relay_map.write().await;
337 let vote_view = vote.view_number();
338 let relay = vote.date().relay;
339 let relay_map = map
340 .entry(vote.date().epoch)
341 .or_insert(BTreeMap::new())
342 .entry(vote_view)
343 .or_insert(BTreeMap::new());
344 if let Some(relay_task) = relay_map.get_mut(&relay) {
345 tracing::debug!("Forwarding message");
346
347 if relay_task
349 .handle_vote_event(Arc::clone(&event), &event_stream)
350 .await?
351 .is_some()
352 {
353 map.get_mut(&vote.date().epoch)
354 .and_then(|x| x.remove(&vote_view));
355 map.retain(|_, x| !x.is_empty());
356 }
357
358 return Ok(());
359 }
360
361 let epoch_mem = self
362 .membership_coordinator
363 .membership_for_epoch(vote.date().epoch)
364 .await?;
365 ensure!(
367 epoch_mem.leader(vote_view + relay).await? == self.public_key,
368 "View sync vote sent to wrong leader"
369 );
370
371 let info = AccumulatorInfo {
372 public_key: self.public_key.clone(),
373 membership: epoch_mem,
374 view: vote_view,
375 id: self.id,
376 };
377 let vote_collector = create_vote_accumulator(
378 &info,
379 event,
380 &event_stream,
381 self.upgrade_lock.clone(),
382 EpochTransitionIndicator::NotInTransition,
383 )
384 .await?;
385
386 relay_map.insert(relay, vote_collector);
387 },
388
389 HotShotEvent::ViewSyncCommitVoteRecv(vote) => {
390 let mut map = self.commit_relay_map.write().await;
391 let vote_view = vote.view_number();
392 let relay = vote.date().relay;
393 let relay_map = map
394 .entry(vote.date().epoch)
395 .or_insert(BTreeMap::new())
396 .entry(vote_view)
397 .or_insert(BTreeMap::new());
398 if let Some(relay_task) = relay_map.get_mut(&relay) {
399 tracing::debug!("Forwarding message");
400
401 if relay_task
403 .handle_vote_event(Arc::clone(&event), &event_stream)
404 .await?
405 .is_some()
406 {
407 map.get_mut(&vote.date().epoch)
408 .and_then(|x| x.remove(&vote_view));
409 map.retain(|_, x| !x.is_empty());
410 }
411
412 return Ok(());
413 }
414
415 let epoch_mem = self
417 .membership_coordinator
418 .membership_for_epoch(vote.date().epoch)
419 .await?;
420 ensure!(
421 epoch_mem.leader(vote_view + relay).await? == self.public_key,
422 debug!("View sync vote sent to wrong leader")
423 );
424
425 let info = AccumulatorInfo {
426 public_key: self.public_key.clone(),
427 membership: epoch_mem,
428 view: vote_view,
429 id: self.id,
430 };
431
432 let vote_collector = create_vote_accumulator(
433 &info,
434 event,
435 &event_stream,
436 self.upgrade_lock.clone(),
437 EpochTransitionIndicator::NotInTransition,
438 )
439 .await?;
440 relay_map.insert(relay, vote_collector);
441 },
442
443 HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
444 let mut map = self.finalize_relay_map.write().await;
445 let vote_view = vote.view_number();
446 let relay = vote.date().relay;
447 let relay_map = map
448 .entry(vote.date().epoch)
449 .or_insert(BTreeMap::new())
450 .entry(vote_view)
451 .or_insert(BTreeMap::new());
452 if let Some(relay_task) = relay_map.get_mut(&relay) {
453 tracing::debug!("Forwarding message");
454
455 if relay_task
457 .handle_vote_event(Arc::clone(&event), &event_stream)
458 .await?
459 .is_some()
460 {
461 map.get_mut(&vote.date().epoch)
462 .and_then(|x| x.remove(&vote_view));
463 map.retain(|_, x| !x.is_empty());
464 }
465
466 return Ok(());
467 }
468
469 let epoch_mem = self
470 .membership_coordinator
471 .membership_for_epoch(vote.date().epoch)
472 .await?;
473 ensure!(
475 epoch_mem.leader(vote_view + relay).await? == self.public_key,
476 debug!("View sync vote sent to wrong leader")
477 );
478
479 let info = AccumulatorInfo {
480 public_key: self.public_key.clone(),
481 membership: epoch_mem,
482 view: vote_view,
483 id: self.id,
484 };
485 let vote_collector = create_vote_accumulator(
486 &info,
487 event,
488 &event_stream,
489 self.upgrade_lock.clone(),
490 EpochTransitionIndicator::NotInTransition,
491 )
492 .await;
493 if let Ok(vote_task) = vote_collector {
494 relay_map.insert(relay, vote_task);
495 }
496 },
497
498 &HotShotEvent::ViewChange(new_view, epoch) => {
499 if epoch > self.cur_epoch {
500 self.cur_epoch = epoch;
501 }
502 let new_view = ViewNumber::new(*new_view);
503 if self.cur_view < new_view {
504 tracing::debug!(
505 "Change from view {} to view {} in view sync task",
506 *self.cur_view,
507 *new_view
508 );
509
510 self.cur_view = new_view;
511 self.next_view = self.cur_view;
512 self.num_timeouts_tracked = 0;
513 }
514
515 self.garbage_collect_tasks().await;
516 },
517 HotShotEvent::LeavesDecided(leaves) => {
518 let finalized_epoch = self.highest_finalized_epoch_view.0.max(
519 leaves
520 .iter()
521 .map(|leaf| leaf.epoch(self.epoch_height))
522 .max()
523 .unwrap_or(None),
524 );
525 let finalized_view = self.highest_finalized_epoch_view.1.max(
526 leaves
527 .iter()
528 .map(|leaf| leaf.view_number())
529 .max()
530 .unwrap_or(ViewNumber::new(0)),
531 );
532
533 self.highest_finalized_epoch_view = (finalized_epoch, finalized_view);
534
535 self.garbage_collect_tasks().await;
536 },
537 &HotShotEvent::Timeout(view_number, ..) => {
538 ensure!(
540 view_number >= self.cur_view,
541 debug!("Discarding old timeout vote.")
542 );
543
544 self.num_timeouts_tracked += 1;
545
546 if self.num_timeouts_tracked >= 3 {
547 tracing::error!("Too many consecutive timeouts! This shouldn't happen");
548 }
549
550 if self.num_timeouts_tracked >= 2 {
551 tracing::error!("Starting view sync protocol for view {}", *view_number + 1);
552
553 self.send_to_or_create_replica(
554 Arc::new(HotShotEvent::ViewSyncTrigger(view_number + 1)),
555 view_number + 1,
556 self.cur_epoch,
557 &event_stream,
558 )
559 .await;
560 } else {
561 self.cur_view = view_number + 1;
563 broadcast_view_change(
564 &event_stream,
565 self.cur_view,
566 self.cur_epoch,
567 self.first_epoch,
568 )
569 .await;
570 }
571 let leader = self
572 .membership_coordinator
573 .membership_for_epoch(self.cur_epoch)
574 .await?
575 .leader(view_number)
576 .await?;
577 tracing::warn!(
578 %leader,
579 leader_mnemonic = hotshot_types::utils::mnemonic(&leader),
580 view_number = *view_number,
581 num_timeouts_tracked = self.num_timeouts_tracked,
582 "view timed out",
583 );
584 },
585 HotShotEvent::SetFirstEpoch(view, epoch) => {
586 self.first_epoch = Some((*view, *epoch));
587 },
588
589 _ => {},
590 }
591 Ok(())
592 }
593
594 async fn garbage_collect_tasks(&self) {
598 let previous_epoch = self
599 .cur_epoch
600 .map(|e| e.saturating_sub(1))
601 .map(EpochNumber::new);
602 let gc_epoch = self.highest_finalized_epoch_view.0.max(previous_epoch);
603 Self::garbage_collect_tasks_helper(
604 &self.replica_task_map,
605 &gc_epoch,
606 &self.highest_finalized_epoch_view.1,
607 )
608 .await;
609 Self::garbage_collect_tasks_helper(
610 &self.pre_commit_relay_map,
611 &gc_epoch,
612 &self.highest_finalized_epoch_view.1,
613 )
614 .await;
615 Self::garbage_collect_tasks_helper(
616 &self.commit_relay_map,
617 &gc_epoch,
618 &self.highest_finalized_epoch_view.1,
619 )
620 .await;
621 Self::garbage_collect_tasks_helper(
622 &self.finalize_relay_map,
623 &gc_epoch,
624 &self.highest_finalized_epoch_view.1,
625 )
626 .await;
627 }
628
629 async fn garbage_collect_tasks_helper<VAL>(
630 map: &RwLock<TaskMap<VAL>>,
631 gc_epoch: &Option<EpochNumber>,
632 gc_view: &ViewNumber,
633 ) {
634 let mut task_map = map.write().await;
635 task_map.retain(|e, _| e >= gc_epoch);
636 if let Some(view_map) = task_map.get_mut(gc_epoch) {
637 view_map.retain(|v, _| v > gc_view)
638 };
639 task_map.retain(|_, view_map| !view_map.is_empty());
640 }
641}
642
643impl<TYPES: NodeType> ViewSyncReplicaTaskState<TYPES> {
644 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Replica Task", level = "error")]
645 pub async fn handle(
647 &mut self,
648 event: Arc<HotShotEvent<TYPES>>,
649 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
650 ) -> Option<HotShotTaskCompleted> {
651 match event.as_ref() {
652 HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
653 let last_seen_certificate = ViewSyncPhase::PreCommit;
654
655 if certificate.view_number() < self.next_view {
657 tracing::warn!("We're already in a higher round");
658
659 return None;
660 }
661
662 let membership = self.membership_for_epoch(certificate.epoch()).await?;
663 let membership_stake_table = membership.stake_table().await;
664 let membership_failure_threshold = membership.failure_threshold().await;
665
666 if let Err(e) = certificate.is_valid_cert(
668 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
669 membership_failure_threshold,
670 &self.upgrade_lock,
671 ) {
672 tracing::error!(
673 "Not valid view sync cert! data: {:?}, error: {}",
674 certificate.data(),
675 e
676 );
677
678 return None;
679 }
680
681 if certificate.view_number() > self.next_view {
684 return Some(HotShotTaskCompleted);
685 }
686
687 if certificate.data().relay > self.relay {
688 self.relay = certificate.data().relay;
689 }
690
691 let Ok(vote) = ViewSyncCommitVote2::<TYPES>::create_signed_vote(
692 ViewSyncCommitData2 {
693 relay: certificate.data().relay,
694 round: self.next_view,
695 epoch: certificate.data().epoch,
696 },
697 self.next_view,
698 &self.public_key,
699 &self.private_key,
700 &self.upgrade_lock,
701 ) else {
702 tracing::error!("Failed to sign ViewSyncCommitData!");
703 return None;
704 };
705
706 broadcast_event(
707 Arc::new(HotShotEvent::ViewSyncCommitVoteSend(vote)),
708 &event_stream,
709 )
710 .await;
711
712 if let Some(timeout_task) = self.timeout_task.take() {
713 timeout_task.abort();
714 }
715
716 self.timeout_task = Some(spawn({
717 let stream = event_stream.clone();
718 let phase = last_seen_certificate;
719 let relay = self.relay;
720 let next_view = self.next_view;
721 let timeout = self.view_sync_timeout;
722 async move {
723 sleep(timeout).await;
724 tracing::warn!(
725 "Vote sending timed out in ViewSyncPreCommitCertificateRecv, Relay = \
726 {relay}"
727 );
728
729 broadcast_event(
730 Arc::new(HotShotEvent::ViewSyncTimeout(
731 ViewNumber::new(*next_view),
732 relay,
733 phase,
734 )),
735 &stream,
736 )
737 .await;
738 }
739 }));
740 },
741
742 HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
743 let last_seen_certificate = ViewSyncPhase::Commit;
744
745 if certificate.view_number() < self.next_view {
747 tracing::warn!("We're already in a higher round");
748
749 return None;
750 }
751
752 let membership = self.membership_for_epoch(certificate.epoch()).await?;
753 let membership_stake_table = membership.stake_table().await;
754 let membership_success_threshold = membership.success_threshold().await;
755
756 if let Err(e) = certificate.is_valid_cert(
758 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
759 membership_success_threshold,
760 &self.upgrade_lock,
761 ) {
762 tracing::error!(
763 "Not valid view sync cert! data: {:?}, error: {}",
764 certificate.data(),
765 e
766 );
767
768 return None;
769 }
770
771 if certificate.view_number() > self.next_view {
774 return Some(HotShotTaskCompleted);
775 }
776
777 if certificate.data().relay > self.relay {
778 self.relay = certificate.data().relay;
779 }
780
781 let Ok(vote) = ViewSyncFinalizeVote2::<TYPES>::create_signed_vote(
782 ViewSyncFinalizeData2 {
783 relay: certificate.data().relay,
784 round: self.next_view,
785 epoch: certificate.data().epoch,
786 },
787 self.next_view,
788 &self.public_key,
789 &self.private_key,
790 &self.upgrade_lock,
791 ) else {
792 tracing::error!("Failed to sign view sync finalized vote!");
793 return None;
794 };
795
796 broadcast_event(
797 Arc::new(HotShotEvent::ViewSyncFinalizeVoteSend(vote)),
798 &event_stream,
799 )
800 .await;
801
802 tracing::info!(
803 "View sync protocol has received view sync evidence to update the view to {}",
804 *self.next_view
805 );
806
807 broadcast_view_change(
808 &event_stream,
809 self.next_view,
810 certificate.epoch(),
811 self.first_epoch,
812 )
813 .await;
814
815 if let Some(timeout_task) = self.timeout_task.take() {
816 timeout_task.abort();
817 }
818 self.timeout_task = Some(spawn({
819 let stream = event_stream.clone();
820 let phase = last_seen_certificate;
821 let relay = self.relay;
822 let next_view = self.next_view;
823 let timeout = self.view_sync_timeout;
824 async move {
825 sleep(timeout).await;
826 tracing::warn!(
827 "Vote sending timed out in ViewSyncCommitCertificateRecv, relay = \
828 {relay}"
829 );
830 broadcast_event(
831 Arc::new(HotShotEvent::ViewSyncTimeout(
832 ViewNumber::new(*next_view),
833 relay,
834 phase,
835 )),
836 &stream,
837 )
838 .await;
839 }
840 }));
841 },
842
843 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
844 if certificate.view_number() < self.next_view {
846 tracing::warn!("We're already in a higher round");
847
848 return None;
849 }
850
851 let membership = self.membership_for_epoch(certificate.epoch()).await?;
852 let membership_stake_table = membership.stake_table().await;
853 let membership_success_threshold = membership.success_threshold().await;
854
855 if let Err(e) = certificate.is_valid_cert(
857 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
858 membership_success_threshold,
859 &self.upgrade_lock,
860 ) {
861 tracing::error!(
862 "Not valid view sync cert! data: {:?}, error: {}",
863 certificate.data(),
864 e
865 );
866
867 return None;
868 }
869
870 if certificate.view_number() > self.next_view {
873 return Some(HotShotTaskCompleted);
874 }
875
876 if certificate.data().relay > self.relay {
877 self.relay = certificate.data().relay;
878 }
879
880 if let Some(timeout_task) = self.timeout_task.take() {
881 timeout_task.abort();
882 }
883
884 broadcast_view_change(
885 &event_stream,
886 self.next_view,
887 certificate.epoch(),
888 self.first_epoch,
889 )
890 .await;
891 return Some(HotShotTaskCompleted);
892 },
893
894 HotShotEvent::ViewSyncTrigger(view_number) => {
895 let view_number = *view_number;
896 if self.next_view != ViewNumber::new(*view_number) {
897 tracing::error!("Unexpected view number to trigger view sync");
898 return None;
899 }
900
901 let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
902 ViewSyncPreCommitData2 {
903 relay: 0,
904 round: view_number,
905 epoch: self.cur_epoch,
906 },
907 view_number,
908 &self.public_key,
909 &self.private_key,
910 &self.upgrade_lock,
911 ) else {
912 tracing::error!("Failed to sign pre commit vote!");
913 return None;
914 };
915
916 broadcast_event(
917 Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
918 &event_stream,
919 )
920 .await;
921
922 self.timeout_task = Some(spawn({
923 let stream = event_stream.clone();
924 let relay = self.relay;
925 let next_view = self.next_view;
926 let timeout = self.view_sync_timeout;
927 async move {
928 sleep(timeout).await;
929 tracing::warn!("Vote sending timed out in ViewSyncTrigger");
930 broadcast_event(
931 Arc::new(HotShotEvent::ViewSyncTimeout(
932 ViewNumber::new(*next_view),
933 relay,
934 ViewSyncPhase::None,
935 )),
936 &stream,
937 )
938 .await;
939 }
940 }));
941
942 return None;
943 },
944
945 HotShotEvent::ViewSyncTimeout(round, relay, last_seen_certificate) => {
946 let round = *round;
947 if ViewNumber::new(*round) == self.next_view && *relay == self.relay {
949 if let Some(timeout_task) = self.timeout_task.take() {
950 timeout_task.abort();
951 }
952 self.relay += 1;
953 match last_seen_certificate {
954 ViewSyncPhase::None | ViewSyncPhase::PreCommit | ViewSyncPhase::Commit => {
955 let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
956 ViewSyncPreCommitData2 {
957 relay: self.relay,
958 round: self.next_view,
959 epoch: self.cur_epoch,
960 },
961 self.next_view,
962 &self.public_key,
963 &self.private_key,
964 &self.upgrade_lock,
965 ) else {
966 tracing::error!("Failed to sign ViewSyncPreCommitData!");
967 return None;
968 };
969
970 broadcast_event(
971 Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
972 &event_stream,
973 )
974 .await;
975 },
976 ViewSyncPhase::Finalize => {
977 unimplemented!()
979 },
980 }
981
982 self.timeout_task = Some(spawn({
983 let stream = event_stream.clone();
984 let relay = self.relay;
985 let next_view = self.next_view;
986 let timeout = self.view_sync_timeout;
987 let last_cert = last_seen_certificate.clone();
988 async move {
989 sleep(timeout).await;
990 tracing::warn!(
991 "Vote sending timed out in ViewSyncTimeout relay = {relay}"
992 );
993 broadcast_event(
994 Arc::new(HotShotEvent::ViewSyncTimeout(
995 ViewNumber::new(*next_view),
996 relay,
997 last_cert,
998 )),
999 &stream,
1000 )
1001 .await;
1002 }
1003 }));
1004
1005 return None;
1006 }
1007 },
1008 _ => return None,
1009 }
1010 None
1011 }
1012
1013 pub async fn membership_for_epoch(
1014 &self,
1015 epoch: Option<EpochNumber>,
1016 ) -> Option<EpochMembership<TYPES>> {
1017 match self
1018 .membership_coordinator
1019 .membership_for_epoch(epoch)
1020 .await
1021 {
1022 Ok(m) => Some(m),
1023 Err(e) => {
1024 tracing::warn!(e.message);
1025 None
1026 },
1027 }
1028 }
1029}