1use std::{collections::BTreeMap, fmt::Debug, sync::Arc, time::Duration};
8
9use async_broadcast::{Receiver, Sender};
10use async_lock::RwLock;
11use async_trait::async_trait;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14 data::{EpochNumber, ViewNumber},
15 epoch_membership::{EpochMembership, EpochMembershipCoordinator},
16 message::UpgradeLock,
17 simple_certificate::{
18 ViewSyncCommitCertificate2, ViewSyncFinalizeCertificate2, ViewSyncPreCommitCertificate2,
19 },
20 simple_vote::{
21 HasEpoch, ViewSyncCommitData2, ViewSyncCommitVote2, ViewSyncFinalizeData2,
22 ViewSyncFinalizeVote2, ViewSyncPreCommitData2, ViewSyncPreCommitVote2,
23 },
24 stake_table::StakeTableEntries,
25 traits::{node_implementation::NodeType, signature_key::SignatureKey},
26 utils::EpochTransitionIndicator,
27 vote::{Certificate, HasViewNumber, Vote},
28};
29use hotshot_utils::anytrace::*;
30use tokio::{spawn, task::JoinHandle, time::sleep};
31use tracing::instrument;
32
33use crate::{
34 events::{HotShotEvent, HotShotTaskCompleted},
35 helpers::{broadcast_event, broadcast_view_change},
36 vote_collection::{
37 AccumulatorInfo, HandleVoteEvent, VoteCollectionTaskState, create_vote_accumulator,
38 },
39};
40
41#[derive(PartialEq, PartialOrd, Clone, Debug, Eq, Hash)]
42pub enum ViewSyncPhase {
44 None,
46 PreCommit,
48 Commit,
50 Finalize,
52}
53
54type TaskMap<VAL> = BTreeMap<Option<EpochNumber>, BTreeMap<ViewNumber, VAL>>;
55
56type RelayMap<TYPES, VOTE, CERT> =
58 TaskMap<BTreeMap<u64, VoteCollectionTaskState<TYPES, VOTE, CERT>>>;
59
60type ReplicaTaskMap<TYPES> = TaskMap<ViewSyncReplicaTaskState<TYPES>>;
61
62pub struct ViewSyncTaskState<TYPES: NodeType> {
64 pub cur_view: ViewNumber,
66
67 pub next_view: ViewNumber,
69
70 pub cur_epoch: Option<EpochNumber>,
72
73 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
75
76 pub public_key: TYPES::SignatureKey,
78
79 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
81
82 pub id: u64,
84
85 pub num_timeouts_tracked: u64,
87
88 pub replica_task_map: RwLock<ReplicaTaskMap<TYPES>>,
90
91 pub pre_commit_relay_map: RwLock<
93 RelayMap<TYPES, ViewSyncPreCommitVote2<TYPES>, ViewSyncPreCommitCertificate2<TYPES>>,
94 >,
95
96 pub commit_relay_map:
98 RwLock<RelayMap<TYPES, ViewSyncCommitVote2<TYPES>, ViewSyncCommitCertificate2<TYPES>>>,
99
100 pub finalize_relay_map:
102 RwLock<RelayMap<TYPES, ViewSyncFinalizeVote2<TYPES>, ViewSyncFinalizeCertificate2<TYPES>>>,
103
104 pub view_sync_timeout: Duration,
106
107 pub last_garbage_collected_view: ViewNumber,
109
110 pub upgrade_lock: UpgradeLock<TYPES>,
112
113 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
115
116 pub highest_finalized_epoch_view: (Option<EpochNumber>, ViewNumber),
118
119 pub epoch_height: u64,
120}
121
122#[async_trait]
123impl<TYPES: NodeType> TaskState for ViewSyncTaskState<TYPES> {
124 type Event = HotShotEvent<TYPES>;
125
126 async fn handle_event(
127 &mut self,
128 event: Arc<Self::Event>,
129 sender: &Sender<Arc<Self::Event>>,
130 _receiver: &Receiver<Arc<Self::Event>>,
131 ) -> Result<()> {
132 if self.upgrade_lock.new_protocol_active(self.cur_view) {
133 return Ok(());
134 }
135 self.handle(event, sender.clone()).await
136 }
137
138 fn cancel_subtasks(&mut self) {}
139}
140
141pub struct ViewSyncReplicaTaskState<TYPES: NodeType> {
143 pub view_sync_timeout: Duration,
145
146 pub cur_view: ViewNumber,
148
149 pub next_view: ViewNumber,
151
152 pub relay: u64,
154
155 pub finalized: bool,
157
158 pub sent_view_change_event: bool,
160
161 pub timeout_task: Option<JoinHandle<()>>,
163
164 pub id: u64,
166
167 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
169
170 pub public_key: TYPES::SignatureKey,
172
173 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
175
176 pub upgrade_lock: UpgradeLock<TYPES>,
178
179 pub cur_epoch: Option<EpochNumber>,
181
182 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
184}
185
186#[async_trait]
187impl<TYPES: NodeType> TaskState for ViewSyncReplicaTaskState<TYPES> {
188 type Event = HotShotEvent<TYPES>;
189
190 async fn handle_event(
191 &mut self,
192 event: Arc<Self::Event>,
193 sender: &Sender<Arc<Self::Event>>,
194 _receiver: &Receiver<Arc<Self::Event>>,
195 ) -> Result<()> {
196 if self.upgrade_lock.new_protocol_active(self.cur_view) {
197 return Ok(());
198 }
199 self.handle(event, sender.clone()).await;
200
201 Ok(())
202 }
203
204 fn cancel_subtasks(&mut self) {}
205}
206
207impl<TYPES: NodeType> ViewSyncTaskState<TYPES> {
208 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "View Sync Main Task", level = "error")]
209 #[allow(clippy::type_complexity)]
210 pub async fn send_to_or_create_replica(
212 &mut self,
213 event: Arc<HotShotEvent<TYPES>>,
214 view: ViewNumber,
215 epoch: Option<EpochNumber>,
216 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
217 ) {
218 let mut task_map = self.replica_task_map.write().await;
219
220 if let Some(replica_task) = task_map.get_mut(&epoch).and_then(|x| x.get_mut(&view)) {
221 tracing::debug!("Forwarding message");
223 let result = replica_task
224 .handle(Arc::clone(&event), sender.clone())
225 .await;
226
227 if result == Some(HotShotTaskCompleted) {
228 if epoch >= self.highest_finalized_epoch_view.0
230 && view > self.highest_finalized_epoch_view.1
231 {
232 self.highest_finalized_epoch_view = (epoch, view);
233 } else if view > self.highest_finalized_epoch_view.1 {
234 tracing::error!(
235 "We finalized a higher view but the epoch is lower. This should never \
236 happen. Current highest finalized epoch view: {:?}, new highest \
237 finalized epoch view: {:?}",
238 self.highest_finalized_epoch_view,
239 (epoch, view)
240 );
241 }
242 task_map.get_mut(&epoch).and_then(|x| x.remove(&view));
243 task_map.retain(|_, x| !x.is_empty());
244 drop(task_map);
245
246 self.garbage_collect_tasks().await;
248 return;
249 }
250
251 return;
252 }
253
254 let mut replica_state: ViewSyncReplicaTaskState<TYPES> = ViewSyncReplicaTaskState {
256 cur_view: view,
257 next_view: view,
258 relay: 0,
259 finalized: false,
260 sent_view_change_event: false,
261 timeout_task: None,
262 membership_coordinator: self.membership_coordinator.clone(),
263 public_key: self.public_key.clone(),
264 private_key: self.private_key.clone(),
265 view_sync_timeout: self.view_sync_timeout,
266 id: self.id,
267 upgrade_lock: self.upgrade_lock.clone(),
268 cur_epoch: self.cur_epoch,
269 first_epoch: self.first_epoch,
270 };
271
272 let result = replica_state
273 .handle(Arc::clone(&event), sender.clone())
274 .await;
275
276 if result == Some(HotShotTaskCompleted) {
277 return;
279 }
280
281 task_map
282 .entry(epoch)
283 .or_default()
284 .insert(view, replica_state);
285 }
286
287 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Main Task", level = "error")]
288 #[allow(clippy::type_complexity)]
289 pub async fn handle(
291 &mut self,
292 event: Arc<HotShotEvent<TYPES>>,
293 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
294 ) -> Result<()> {
295 match event.as_ref() {
296 HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
297 tracing::debug!("Received view sync cert for phase {certificate:?}");
298 let view = certificate.view_number();
299 self.send_to_or_create_replica(
300 Arc::clone(&event),
301 view,
302 certificate.epoch(),
303 &event_stream,
304 )
305 .await;
306 },
307 HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
308 tracing::debug!("Received view sync cert for phase {certificate:?}");
309 let view = certificate.view_number();
310 self.send_to_or_create_replica(
311 Arc::clone(&event),
312 view,
313 certificate.epoch(),
314 &event_stream,
315 )
316 .await;
317 },
318 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
319 tracing::debug!("Received view sync cert for phase {certificate:?}");
320 let view = certificate.view_number();
321 self.send_to_or_create_replica(
322 Arc::clone(&event),
323 view,
324 certificate.epoch(),
325 &event_stream,
326 )
327 .await;
328 },
329 HotShotEvent::ViewSyncTimeout(view, ..) => {
330 tracing::debug!("view sync timeout in main task {view:?}");
331 let view = *view;
332 self.send_to_or_create_replica(
333 Arc::clone(&event),
334 view,
335 self.cur_epoch,
336 &event_stream,
337 )
338 .await;
339 },
340
341 HotShotEvent::ViewSyncPreCommitVoteRecv(vote) => {
342 let mut map = self.pre_commit_relay_map.write().await;
343 let vote_view = vote.view_number();
344 let relay = vote.date().relay;
345 let relay_map = map
346 .entry(vote.date().epoch)
347 .or_insert(BTreeMap::new())
348 .entry(vote_view)
349 .or_insert(BTreeMap::new());
350 if let Some(relay_task) = relay_map.get_mut(&relay) {
351 tracing::debug!("Forwarding message");
352
353 if relay_task
355 .handle_vote_event(Arc::clone(&event), &event_stream)
356 .await?
357 .is_some()
358 {
359 map.get_mut(&vote.date().epoch)
360 .and_then(|x| x.remove(&vote_view));
361 map.retain(|_, x| !x.is_empty());
362 }
363
364 return Ok(());
365 }
366
367 let epoch_mem = self
368 .membership_coordinator
369 .membership_for_epoch(vote.date().epoch)?;
370 ensure!(
372 epoch_mem.leader(vote_view + relay)? == self.public_key,
373 "View sync vote sent to wrong leader"
374 );
375
376 let info = AccumulatorInfo {
377 public_key: self.public_key.clone(),
378 membership: epoch_mem,
379 view: vote_view,
380 id: self.id,
381 };
382 let vote_collector = create_vote_accumulator(
383 &info,
384 event,
385 &event_stream,
386 self.upgrade_lock.clone(),
387 EpochTransitionIndicator::NotInTransition,
388 )
389 .await?;
390
391 relay_map.insert(relay, vote_collector);
392 },
393
394 HotShotEvent::ViewSyncCommitVoteRecv(vote) => {
395 let mut map = self.commit_relay_map.write().await;
396 let vote_view = vote.view_number();
397 let relay = vote.date().relay;
398 let relay_map = map
399 .entry(vote.date().epoch)
400 .or_insert(BTreeMap::new())
401 .entry(vote_view)
402 .or_insert(BTreeMap::new());
403 if let Some(relay_task) = relay_map.get_mut(&relay) {
404 tracing::debug!("Forwarding message");
405
406 if relay_task
408 .handle_vote_event(Arc::clone(&event), &event_stream)
409 .await?
410 .is_some()
411 {
412 map.get_mut(&vote.date().epoch)
413 .and_then(|x| x.remove(&vote_view));
414 map.retain(|_, x| !x.is_empty());
415 }
416
417 return Ok(());
418 }
419
420 let epoch_mem = self
422 .membership_coordinator
423 .membership_for_epoch(vote.date().epoch)?;
424 ensure!(
425 epoch_mem.leader(vote_view + relay)? == self.public_key,
426 debug!("View sync vote sent to wrong leader")
427 );
428
429 let info = AccumulatorInfo {
430 public_key: self.public_key.clone(),
431 membership: epoch_mem,
432 view: vote_view,
433 id: self.id,
434 };
435
436 let vote_collector = create_vote_accumulator(
437 &info,
438 event,
439 &event_stream,
440 self.upgrade_lock.clone(),
441 EpochTransitionIndicator::NotInTransition,
442 )
443 .await?;
444 relay_map.insert(relay, vote_collector);
445 },
446
447 HotShotEvent::ViewSyncFinalizeVoteRecv(vote) => {
448 let mut map = self.finalize_relay_map.write().await;
449 let vote_view = vote.view_number();
450 let relay = vote.date().relay;
451 let relay_map = map
452 .entry(vote.date().epoch)
453 .or_insert(BTreeMap::new())
454 .entry(vote_view)
455 .or_insert(BTreeMap::new());
456 if let Some(relay_task) = relay_map.get_mut(&relay) {
457 tracing::debug!("Forwarding message");
458
459 if relay_task
461 .handle_vote_event(Arc::clone(&event), &event_stream)
462 .await?
463 .is_some()
464 {
465 map.get_mut(&vote.date().epoch)
466 .and_then(|x| x.remove(&vote_view));
467 map.retain(|_, x| !x.is_empty());
468 }
469
470 return Ok(());
471 }
472
473 let epoch_mem = self
474 .membership_coordinator
475 .membership_for_epoch(vote.date().epoch)?;
476 ensure!(
478 epoch_mem.leader(vote_view + relay)? == self.public_key,
479 debug!("View sync vote sent to wrong leader")
480 );
481
482 let info = AccumulatorInfo {
483 public_key: self.public_key.clone(),
484 membership: epoch_mem,
485 view: vote_view,
486 id: self.id,
487 };
488 let vote_collector = create_vote_accumulator(
489 &info,
490 event,
491 &event_stream,
492 self.upgrade_lock.clone(),
493 EpochTransitionIndicator::NotInTransition,
494 )
495 .await;
496 if let Ok(vote_task) = vote_collector {
497 relay_map.insert(relay, vote_task);
498 }
499 },
500
501 &HotShotEvent::ViewChange(new_view, epoch) => {
502 if epoch > self.cur_epoch {
503 self.cur_epoch = epoch;
504 }
505 let new_view = ViewNumber::new(*new_view);
506 if self.cur_view < new_view {
507 tracing::debug!(
508 "Change from view {} to view {} in view sync task",
509 *self.cur_view,
510 *new_view
511 );
512
513 self.cur_view = new_view;
514 self.next_view = self.cur_view;
515 self.num_timeouts_tracked = 0;
516 }
517
518 self.garbage_collect_tasks().await;
519 },
520 HotShotEvent::LeavesDecided(leaves) => {
521 let finalized_epoch = self.highest_finalized_epoch_view.0.max(
522 leaves
523 .iter()
524 .map(|leaf| leaf.epoch(self.epoch_height))
525 .max()
526 .unwrap_or(None),
527 );
528 let finalized_view = self.highest_finalized_epoch_view.1.max(
529 leaves
530 .iter()
531 .map(|leaf| leaf.view_number())
532 .max()
533 .unwrap_or(ViewNumber::new(0)),
534 );
535
536 self.highest_finalized_epoch_view = (finalized_epoch, finalized_view);
537
538 self.garbage_collect_tasks().await;
539 },
540 &HotShotEvent::Timeout(view_number, ..) => {
541 ensure!(
543 view_number >= self.cur_view,
544 debug!("Discarding old timeout vote.")
545 );
546
547 self.num_timeouts_tracked += 1;
548
549 if self.num_timeouts_tracked >= 3 {
550 tracing::error!("Too many consecutive timeouts! This shouldn't happen");
551 }
552
553 if self.num_timeouts_tracked >= 2 {
554 tracing::error!("Starting view sync protocol for view {}", *view_number + 1);
555
556 self.send_to_or_create_replica(
557 Arc::new(HotShotEvent::ViewSyncTrigger(view_number + 1)),
558 view_number + 1,
559 self.cur_epoch,
560 &event_stream,
561 )
562 .await;
563 } else {
564 self.cur_view = view_number + 1;
566 broadcast_view_change(
567 &event_stream,
568 self.cur_view,
569 self.cur_epoch,
570 self.first_epoch,
571 )
572 .await;
573 }
574 let leader = self
575 .membership_coordinator
576 .membership_for_epoch(self.cur_epoch)?
577 .leader(view_number)?;
578 tracing::warn!(
579 %leader,
580 leader_mnemonic = hotshot_types::utils::mnemonic(&leader),
581 view_number = *view_number,
582 num_timeouts_tracked = self.num_timeouts_tracked,
583 "view timed out",
584 );
585 },
586 HotShotEvent::SetFirstEpoch(view, epoch) => {
587 self.first_epoch = Some((*view, *epoch));
588 },
589
590 _ => {},
591 }
592 Ok(())
593 }
594
595 async fn garbage_collect_tasks(&self) {
599 let previous_epoch = self
600 .cur_epoch
601 .map(|e| e.saturating_sub(1))
602 .map(EpochNumber::new);
603 let gc_epoch = self.highest_finalized_epoch_view.0.max(previous_epoch);
604 Self::garbage_collect_tasks_helper(
605 &self.replica_task_map,
606 &gc_epoch,
607 &self.highest_finalized_epoch_view.1,
608 )
609 .await;
610 Self::garbage_collect_tasks_helper(
611 &self.pre_commit_relay_map,
612 &gc_epoch,
613 &self.highest_finalized_epoch_view.1,
614 )
615 .await;
616 Self::garbage_collect_tasks_helper(
617 &self.commit_relay_map,
618 &gc_epoch,
619 &self.highest_finalized_epoch_view.1,
620 )
621 .await;
622 Self::garbage_collect_tasks_helper(
623 &self.finalize_relay_map,
624 &gc_epoch,
625 &self.highest_finalized_epoch_view.1,
626 )
627 .await;
628 }
629
630 async fn garbage_collect_tasks_helper<VAL>(
631 map: &RwLock<TaskMap<VAL>>,
632 gc_epoch: &Option<EpochNumber>,
633 gc_view: &ViewNumber,
634 ) {
635 let mut task_map = map.write().await;
636 task_map.retain(|e, _| e >= gc_epoch);
637 if let Some(view_map) = task_map.get_mut(gc_epoch) {
638 view_map.retain(|v, _| v > gc_view)
639 };
640 task_map.retain(|_, view_map| !view_map.is_empty());
641 }
642}
643
644impl<TYPES: NodeType> ViewSyncReplicaTaskState<TYPES> {
645 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "View Sync Replica Task", level = "error")]
646 pub async fn handle(
648 &mut self,
649 event: Arc<HotShotEvent<TYPES>>,
650 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
651 ) -> Option<HotShotTaskCompleted> {
652 match event.as_ref() {
653 HotShotEvent::ViewSyncPreCommitCertificateRecv(certificate) => {
654 let last_seen_certificate = ViewSyncPhase::PreCommit;
655
656 if certificate.view_number() < self.next_view {
658 tracing::warn!("We're already in a higher round");
659
660 return None;
661 }
662
663 let membership = self.membership_for_epoch(certificate.epoch()).await?;
664 let membership_stake_table =
665 StakeTableEntries::from_iter(membership.stake_table()).0;
666 let membership_failure_threshold = membership.failure_threshold();
667
668 if let Err(e) = certificate.is_valid_cert(
670 &membership_stake_table,
671 membership_failure_threshold,
672 &self.upgrade_lock,
673 ) {
674 tracing::error!(
675 "Not valid view sync cert! data: {:?}, error: {}",
676 certificate.data(),
677 e
678 );
679
680 return None;
681 }
682
683 if certificate.view_number() > self.next_view {
686 return Some(HotShotTaskCompleted);
687 }
688
689 if certificate.data().relay > self.relay {
690 self.relay = certificate.data().relay;
691 }
692
693 let Ok(vote) = ViewSyncCommitVote2::<TYPES>::create_signed_vote(
694 ViewSyncCommitData2 {
695 relay: certificate.data().relay,
696 round: self.next_view,
697 epoch: certificate.data().epoch,
698 },
699 self.next_view,
700 &self.public_key,
701 &self.private_key,
702 &self.upgrade_lock,
703 ) else {
704 tracing::error!("Failed to sign ViewSyncCommitData!");
705 return None;
706 };
707
708 broadcast_event(
709 Arc::new(HotShotEvent::ViewSyncCommitVoteSend(vote)),
710 &event_stream,
711 )
712 .await;
713
714 if let Some(timeout_task) = self.timeout_task.take() {
715 timeout_task.abort();
716 }
717
718 self.timeout_task = Some(spawn({
719 let stream = event_stream.clone();
720 let phase = last_seen_certificate;
721 let relay = self.relay;
722 let next_view = self.next_view;
723 let timeout = self.view_sync_timeout;
724 async move {
725 sleep(timeout).await;
726 tracing::warn!(
727 "Vote sending timed out in ViewSyncPreCommitCertificateRecv, Relay = \
728 {relay}"
729 );
730
731 broadcast_event(
732 Arc::new(HotShotEvent::ViewSyncTimeout(
733 ViewNumber::new(*next_view),
734 relay,
735 phase,
736 )),
737 &stream,
738 )
739 .await;
740 }
741 }));
742 },
743
744 HotShotEvent::ViewSyncCommitCertificateRecv(certificate) => {
745 let last_seen_certificate = ViewSyncPhase::Commit;
746
747 if certificate.view_number() < self.next_view {
749 tracing::warn!("We're already in a higher round");
750
751 return None;
752 }
753
754 let membership = self.membership_for_epoch(certificate.epoch()).await?;
755 let membership_stake_table =
756 StakeTableEntries::from_iter(membership.stake_table()).0;
757 let membership_success_threshold = membership.success_threshold();
758
759 if let Err(e) = certificate.is_valid_cert(
761 &membership_stake_table,
762 membership_success_threshold,
763 &self.upgrade_lock,
764 ) {
765 tracing::error!(
766 "Not valid view sync cert! data: {:?}, error: {}",
767 certificate.data(),
768 e
769 );
770
771 return None;
772 }
773
774 if certificate.view_number() > self.next_view {
777 return Some(HotShotTaskCompleted);
778 }
779
780 if certificate.data().relay > self.relay {
781 self.relay = certificate.data().relay;
782 }
783
784 let Ok(vote) = ViewSyncFinalizeVote2::<TYPES>::create_signed_vote(
785 ViewSyncFinalizeData2 {
786 relay: certificate.data().relay,
787 round: self.next_view,
788 epoch: certificate.data().epoch,
789 },
790 self.next_view,
791 &self.public_key,
792 &self.private_key,
793 &self.upgrade_lock,
794 ) else {
795 tracing::error!("Failed to sign view sync finalized vote!");
796 return None;
797 };
798
799 broadcast_event(
800 Arc::new(HotShotEvent::ViewSyncFinalizeVoteSend(vote)),
801 &event_stream,
802 )
803 .await;
804
805 tracing::info!(
806 "View sync protocol has received view sync evidence to update the view to {}",
807 *self.next_view
808 );
809
810 broadcast_view_change(
811 &event_stream,
812 self.next_view,
813 certificate.epoch(),
814 self.first_epoch,
815 )
816 .await;
817
818 if let Some(timeout_task) = self.timeout_task.take() {
819 timeout_task.abort();
820 }
821 self.timeout_task = Some(spawn({
822 let stream = event_stream.clone();
823 let phase = last_seen_certificate;
824 let relay = self.relay;
825 let next_view = self.next_view;
826 let timeout = self.view_sync_timeout;
827 async move {
828 sleep(timeout).await;
829 tracing::warn!(
830 "Vote sending timed out in ViewSyncCommitCertificateRecv, relay = \
831 {relay}"
832 );
833 broadcast_event(
834 Arc::new(HotShotEvent::ViewSyncTimeout(
835 ViewNumber::new(*next_view),
836 relay,
837 phase,
838 )),
839 &stream,
840 )
841 .await;
842 }
843 }));
844 },
845
846 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
847 if certificate.view_number() < self.next_view {
849 tracing::warn!("We're already in a higher round");
850
851 return None;
852 }
853
854 let membership = self.membership_for_epoch(certificate.epoch()).await?;
855 let membership_stake_table =
856 StakeTableEntries::from_iter(membership.stake_table()).0;
857 let membership_success_threshold = membership.success_threshold();
858
859 if let Err(e) = certificate.is_valid_cert(
861 &membership_stake_table,
862 membership_success_threshold,
863 &self.upgrade_lock,
864 ) {
865 tracing::error!(
866 "Not valid view sync cert! data: {:?}, error: {}",
867 certificate.data(),
868 e
869 );
870
871 return None;
872 }
873
874 if certificate.view_number() > self.next_view {
877 return Some(HotShotTaskCompleted);
878 }
879
880 if certificate.data().relay > self.relay {
881 self.relay = certificate.data().relay;
882 }
883
884 if let Some(timeout_task) = self.timeout_task.take() {
885 timeout_task.abort();
886 }
887
888 broadcast_view_change(
889 &event_stream,
890 self.next_view,
891 certificate.epoch(),
892 self.first_epoch,
893 )
894 .await;
895 return Some(HotShotTaskCompleted);
896 },
897
898 HotShotEvent::ViewSyncTrigger(view_number) => {
899 let view_number = *view_number;
900 if self.next_view != ViewNumber::new(*view_number) {
901 tracing::error!("Unexpected view number to trigger view sync");
902 return None;
903 }
904
905 let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
906 ViewSyncPreCommitData2 {
907 relay: 0,
908 round: view_number,
909 epoch: self.cur_epoch,
910 },
911 view_number,
912 &self.public_key,
913 &self.private_key,
914 &self.upgrade_lock,
915 ) else {
916 tracing::error!("Failed to sign pre commit vote!");
917 return None;
918 };
919
920 broadcast_event(
921 Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
922 &event_stream,
923 )
924 .await;
925
926 self.timeout_task = Some(spawn({
927 let stream = event_stream.clone();
928 let relay = self.relay;
929 let next_view = self.next_view;
930 let timeout = self.view_sync_timeout;
931 async move {
932 sleep(timeout).await;
933 tracing::warn!("Vote sending timed out in ViewSyncTrigger");
934 broadcast_event(
935 Arc::new(HotShotEvent::ViewSyncTimeout(
936 ViewNumber::new(*next_view),
937 relay,
938 ViewSyncPhase::None,
939 )),
940 &stream,
941 )
942 .await;
943 }
944 }));
945
946 return None;
947 },
948
949 HotShotEvent::ViewSyncTimeout(round, relay, last_seen_certificate) => {
950 let round = *round;
951 if ViewNumber::new(*round) == self.next_view && *relay == self.relay {
953 if let Some(timeout_task) = self.timeout_task.take() {
954 timeout_task.abort();
955 }
956 self.relay += 1;
957 match last_seen_certificate {
958 ViewSyncPhase::None | ViewSyncPhase::PreCommit | ViewSyncPhase::Commit => {
959 let Ok(vote) = ViewSyncPreCommitVote2::<TYPES>::create_signed_vote(
960 ViewSyncPreCommitData2 {
961 relay: self.relay,
962 round: self.next_view,
963 epoch: self.cur_epoch,
964 },
965 self.next_view,
966 &self.public_key,
967 &self.private_key,
968 &self.upgrade_lock,
969 ) else {
970 tracing::error!("Failed to sign ViewSyncPreCommitData!");
971 return None;
972 };
973
974 broadcast_event(
975 Arc::new(HotShotEvent::ViewSyncPreCommitVoteSend(vote)),
976 &event_stream,
977 )
978 .await;
979 },
980 ViewSyncPhase::Finalize => {
981 unimplemented!()
983 },
984 }
985
986 self.timeout_task = Some(spawn({
987 let stream = event_stream.clone();
988 let relay = self.relay;
989 let next_view = self.next_view;
990 let timeout = self.view_sync_timeout;
991 let last_cert = last_seen_certificate.clone();
992 async move {
993 sleep(timeout).await;
994 tracing::warn!(
995 "Vote sending timed out in ViewSyncTimeout relay = {relay}"
996 );
997 broadcast_event(
998 Arc::new(HotShotEvent::ViewSyncTimeout(
999 ViewNumber::new(*next_view),
1000 relay,
1001 last_cert,
1002 )),
1003 &stream,
1004 )
1005 .await;
1006 }
1007 }));
1008
1009 return None;
1010 }
1011 },
1012 _ => return None,
1013 }
1014 None
1015 }
1016
1017 pub async fn membership_for_epoch(
1018 &self,
1019 epoch: Option<EpochNumber>,
1020 ) -> Option<EpochMembership<TYPES>> {
1021 match self.membership_coordinator.membership_for_epoch(epoch) {
1022 Ok(m) => Some(m),
1023 Err(e) => {
1024 tracing::warn!(e.message);
1025 None
1026 },
1027 }
1028 }
1029}