1use std::{collections::BTreeMap, sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender, broadcast};
10use async_trait::async_trait;
11use either::Either;
12use hotshot_task::{
13 dependency::{AndDependency, EventDependency, OrDependency},
14 dependency_task::DependencyTask,
15 task::TaskState,
16};
17use hotshot_types::{
18 consensus::OuterConsensus,
19 data::{EpochNumber, ViewNumber},
20 epoch_membership::EpochMembershipCoordinator,
21 message::UpgradeLock,
22 simple_certificate::{
23 EpochRootQuorumCertificateV2, LightClientStateUpdateCertificateV2,
24 NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
25 },
26 stake_table::StakeTableEntries,
27 traits::{
28 node_implementation::{NodeImplementation, NodeType},
29 signature_key::SignatureKey,
30 storage::Storage,
31 },
32 utils::{EpochTransitionIndicator, is_epoch_transition, is_last_block},
33 vote::{Certificate, HasViewNumber},
34};
35use hotshot_utils::anytrace::*;
36use tracing::instrument;
37
38use self::handlers::{ProposalDependency, ProposalDependencyHandle};
39use crate::{
40 events::HotShotEvent, helpers::broadcast_view_change,
41 quorum_proposal::handlers::handle_eqc_formed,
42};
43
44mod handlers;
45
46pub struct QuorumProposalTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
48 pub latest_proposed_view: ViewNumber,
50
51 pub cur_epoch: Option<EpochNumber>,
53
54 pub proposal_dependencies: BTreeMap<ViewNumber, Sender<()>>,
56
57 pub formed_quorum_certificates: BTreeMap<ViewNumber, QuorumCertificate2<TYPES>>,
59
60 pub formed_next_epoch_quorum_certificates:
62 BTreeMap<ViewNumber, NextEpochQuorumCertificate2<TYPES>>,
63
64 pub instance_state: Arc<TYPES::InstanceState>,
66
67 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
69
70 pub public_key: TYPES::SignatureKey,
72
73 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
75
76 pub timeout: u64,
78
79 pub storage: I::Storage,
81
82 pub consensus: OuterConsensus<TYPES>,
84
85 pub id: u64,
87
88 pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
95
96 pub upgrade_lock: UpgradeLock<TYPES>,
98
99 pub epoch_height: u64,
101
102 pub formed_state_cert: BTreeMap<EpochNumber, LightClientStateUpdateCertificateV2<TYPES>>,
104
105 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
107}
108
109impl<TYPES: NodeType, I: NodeImplementation<TYPES>> QuorumProposalTaskState<TYPES, I> {
110 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create event dependency", level = "info")]
112 fn create_event_dependency(
113 &self,
114 dependency_type: ProposalDependency,
115 view_number: ViewNumber,
116 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
117 cancel_receiver: Receiver<()>,
118 ) -> EventDependency<Arc<HotShotEvent<TYPES>>> {
119 let id = self.id;
120 EventDependency::new(
121 event_receiver,
122 cancel_receiver,
123 format!(
124 "ProposalDependency::{:?} for view {:?}, my id {:?}",
125 dependency_type, view_number, self.id
126 ),
127 Box::new(move |event| {
128 let event = event.as_ref();
129 let event_view = match dependency_type {
130 ProposalDependency::Qc => {
131 if let HotShotEvent::Qc2Formed(either::Left(qc)) = event {
132 qc.view_number() + 1
133 } else if let HotShotEvent::EpochRootQcFormed(root_qc) = event {
134 root_qc.view_number() + 1
135 } else {
136 return false;
137 }
138 },
139 ProposalDependency::TimeoutCert => {
140 if let HotShotEvent::Qc2Formed(either::Right(timeout)) = event {
141 timeout.view_number() + 1
142 } else {
143 return false;
144 }
145 },
146 ProposalDependency::ViewSyncCert => {
147 if let HotShotEvent::ViewSyncFinalizeCertificateRecv(view_sync_cert) = event
148 {
149 view_sync_cert.view_number()
150 } else {
151 return false;
152 }
153 },
154 ProposalDependency::Proposal => {
155 if let HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) = event
156 {
157 proposal.data.view_number() + 1
158 } else {
159 return false;
160 }
161 },
162 ProposalDependency::PayloadAndMetadata => {
163 if let HotShotEvent::SendPayloadCommitmentAndMetadata(
164 _payload_commitment,
165 _builder_commitment,
166 _metadata,
167 view_number,
168 _fee,
169 ) = event
170 {
171 *view_number
172 } else {
173 return false;
174 }
175 },
176 ProposalDependency::VidShare => {
177 if let HotShotEvent::VidDisperseSend(vid_disperse, _) = event {
178 vid_disperse.data.view_number()
179 } else {
180 return false;
181 }
182 },
183 };
184 let valid = event_view == view_number;
185 if valid {
186 tracing::debug!(
187 "Dependency {dependency_type:?} is complete for view {event_view:?}, my \
188 id is {id:?}!",
189 );
190 }
191 valid
192 }),
193 )
194 }
195
196 fn create_and_complete_dependencies(
198 &self,
199 view_number: ViewNumber,
200 event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
201 event: Arc<HotShotEvent<TYPES>>,
202 cancel_receiver: &Receiver<()>,
203 ) -> AndDependency<Vec<Vec<Arc<HotShotEvent<TYPES>>>>> {
204 let mut proposal_dependency = self.create_event_dependency(
205 ProposalDependency::Proposal,
206 view_number,
207 event_receiver.clone(),
208 cancel_receiver.clone(),
209 );
210
211 let mut qc_dependency = self.create_event_dependency(
212 ProposalDependency::Qc,
213 view_number,
214 event_receiver.clone(),
215 cancel_receiver.clone(),
216 );
217
218 let mut view_sync_dependency = self.create_event_dependency(
219 ProposalDependency::ViewSyncCert,
220 view_number,
221 event_receiver.clone(),
222 cancel_receiver.clone(),
223 );
224
225 let mut timeout_dependency = self.create_event_dependency(
226 ProposalDependency::TimeoutCert,
227 view_number,
228 event_receiver.clone(),
229 cancel_receiver.clone(),
230 );
231
232 let mut payload_commitment_dependency = self.create_event_dependency(
233 ProposalDependency::PayloadAndMetadata,
234 view_number,
235 event_receiver.clone(),
236 cancel_receiver.clone(),
237 );
238
239 let mut vid_share_dependency = self.create_event_dependency(
240 ProposalDependency::VidShare,
241 view_number,
242 event_receiver.clone(),
243 cancel_receiver.clone(),
244 );
245
246 let epoch_height = self.epoch_height;
247
248 let mut next_epoch_qc_dependency = EventDependency::new(
251 event_receiver.clone(),
252 cancel_receiver.clone(),
253 format!(
254 "ProposalDependency Next epoch QC for view {:?}, my id {:?}",
255 view_number, self.id
256 ),
257 Box::new(move |event| {
258 if let HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) =
259 event.as_ref()
260 {
261 return next_epoch_qc.view_number() + 1 == view_number;
262 }
263 if let HotShotEvent::EpochRootQcFormed(..) = event.as_ref() {
264 return true;
266 }
267 if let HotShotEvent::Qc2Formed(Either::Left(qc)) = event.as_ref()
268 && qc.view_number() + 1 == view_number
269 {
270 return qc
271 .data
272 .block_number
273 .is_none_or(|bn| !is_epoch_transition(bn, epoch_height));
274 }
275 false
276 }),
277 );
278
279 match event.as_ref() {
280 HotShotEvent::SendPayloadCommitmentAndMetadata(..) => {
281 payload_commitment_dependency.mark_as_completed(Arc::clone(&event));
282 },
283 HotShotEvent::QuorumProposalPreliminarilyValidated(..) => {
284 proposal_dependency.mark_as_completed(event);
285 },
286 HotShotEvent::Qc2Formed(quorum_certificate) => match quorum_certificate {
287 Either::Right(_) => timeout_dependency.mark_as_completed(event),
288 Either::Left(qc) => {
289 if qc
290 .data
291 .block_number
292 .is_none_or(|bn| !is_epoch_transition(bn, epoch_height))
293 {
294 next_epoch_qc_dependency.mark_as_completed(event.clone());
295 }
296 qc_dependency.mark_as_completed(event);
297 },
298 },
299 HotShotEvent::EpochRootQcFormed(..) => {
300 next_epoch_qc_dependency.mark_as_completed(event.clone());
302 qc_dependency.mark_as_completed(event);
303 },
304 HotShotEvent::ViewSyncFinalizeCertificateRecv(_) => {
305 view_sync_dependency.mark_as_completed(event);
306 },
307 HotShotEvent::VidDisperseSend(..) => {
308 vid_share_dependency.mark_as_completed(event);
309 },
310 HotShotEvent::NextEpochQc2Formed(Either::Left(_)) => {
311 next_epoch_qc_dependency.mark_as_completed(event);
312 },
313 _ => {},
314 };
315
316 let mut secondary_deps = vec![
318 AndDependency::from_deps(vec![timeout_dependency]),
320 AndDependency::from_deps(vec![view_sync_dependency]),
322 ];
323 if *view_number > 1 {
325 secondary_deps.push(AndDependency::from_deps(vec![
326 qc_dependency,
327 proposal_dependency,
328 next_epoch_qc_dependency,
329 ]));
330 } else {
331 secondary_deps.push(AndDependency::from_deps(vec![qc_dependency]));
332 }
333
334 let primary_deps = vec![payload_commitment_dependency, vid_share_dependency];
335
336 AndDependency::from_deps(vec![OrDependency::from_deps(vec![
337 AndDependency::from_deps(vec![
338 OrDependency::from_deps(vec![AndDependency::from_deps(primary_deps)]),
339 OrDependency::from_deps(secondary_deps),
340 ]),
341 ])])
342 }
343
344 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Create dependency task", level = "error")]
350 async fn create_dependency_task_if_new(
351 &mut self,
352 view_number: ViewNumber,
353 epoch_number: Option<EpochNumber>,
354 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
355 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
356 event: Arc<HotShotEvent<TYPES>>,
357 epoch_transition_indicator: EpochTransitionIndicator,
358 ) -> Result<()> {
359 let epoch_membership = self
360 .membership_coordinator
361 .membership_for_epoch(epoch_number)?;
362 let leader_in_current_epoch = epoch_membership.leader(view_number)? == self.public_key;
363 let leader_in_next_epoch = !leader_in_current_epoch
367 && epoch_number.is_some()
368 && matches!(
369 epoch_transition_indicator,
370 EpochTransitionIndicator::InTransition
371 )
372 && epoch_membership
373 .next_epoch()
374 .context(warn!(
375 "Missing the randomized stake table for epoch {}",
376 epoch_number.unwrap() + 1
377 ))?
378 .leader(view_number)?
379 == self.public_key;
380
381 ensure!(
383 leader_in_current_epoch || leader_in_next_epoch,
384 debug!("We are not the leader of the next view")
385 );
386
387 ensure!(
389 view_number > self.latest_proposed_view,
390 "We have already proposed for this view"
391 );
392
393 tracing::debug!(
394 "Attempting to make dependency task for view {view_number} and event {event:?}"
395 );
396
397 ensure!(
398 !self.proposal_dependencies.contains_key(&view_number),
399 "Task already exists"
400 );
401
402 let (cancel_sender, cancel_receiver) = broadcast(1);
403
404 let dependency_chain = self.create_and_complete_dependencies(
405 view_number,
406 &event_receiver,
407 event,
408 &cancel_receiver,
409 );
410
411 let dependency_task = DependencyTask::new(
412 dependency_chain,
413 ProposalDependencyHandle {
414 latest_proposed_view: self.latest_proposed_view,
415 view_number,
416 sender: event_sender,
417 receiver: event_receiver,
418 membership: epoch_membership,
419 public_key: self.public_key.clone(),
420 private_key: self.private_key.clone(),
421 instance_state: Arc::clone(&self.instance_state),
422 consensus: OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
423 timeout: self.timeout,
424 formed_upgrade_certificate: self.formed_upgrade_certificate.clone(),
425 upgrade_lock: self.upgrade_lock.clone(),
426 id: self.id,
427 view_start_time: Instant::now(),
428 epoch_height: self.epoch_height,
429 cancel_receiver,
430 },
431 );
432 self.proposal_dependencies
433 .insert(view_number, cancel_sender);
434
435 dependency_task.run();
436
437 Ok(())
438 }
439
440 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view), name = "Update latest proposed view", level = "error")]
442 fn update_latest_proposed_view(&mut self, new_view: ViewNumber) -> bool {
443 if *self.latest_proposed_view < *new_view {
444 tracing::debug!(
445 "Updating latest proposed view from {} to {}",
446 *self.latest_proposed_view,
447 *new_view
448 );
449
450 for view in (*self.latest_proposed_view + 1)..=(*new_view) {
452 let maybe_cancel_sender = self.proposal_dependencies.remove(&ViewNumber::new(view));
453 if maybe_cancel_sender.as_ref().is_some_and(|s| !s.is_closed()) {
454 tracing::debug!("Aborting proposal dependency task for view {view}");
455 let _ = maybe_cancel_sender.unwrap().try_broadcast(());
456 }
457 }
458
459 self.latest_proposed_view = new_view;
460
461 return true;
462 }
463 false
464 }
465
466 #[instrument(skip_all, fields(id = self.id, latest_proposed_view = *self.latest_proposed_view, epoch = self.cur_epoch.map(|x| *x)), name = "handle method", level = "error", target = "QuorumProposalTaskState")]
468 pub async fn handle(
469 &mut self,
470 event: Arc<HotShotEvent<TYPES>>,
471 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
472 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
473 ) -> Result<()> {
474 let epoch_number = self.cur_epoch;
475 let maybe_high_qc_block_number = self.consensus.read().await.high_qc().data.block_number;
476 let epoch_transition_indicator = if maybe_high_qc_block_number.is_some_and(|bn| {
477 is_epoch_transition(bn, self.epoch_height) && !is_last_block(bn, self.epoch_height)
478 }) {
479 EpochTransitionIndicator::InTransition
480 } else {
481 EpochTransitionIndicator::NotInTransition
482 };
483 match event.as_ref() {
484 HotShotEvent::UpgradeCertificateFormed(cert) => {
485 tracing::debug!(
486 "Upgrade certificate received for view {}!",
487 *cert.view_number
488 );
489 if cert.data.decide_by >= self.latest_proposed_view + 3 {
491 tracing::debug!("Updating current formed_upgrade_certificate");
492
493 self.formed_upgrade_certificate = Some(cert.clone());
494 }
495 },
496 HotShotEvent::Qc2Formed(cert) => match cert.clone() {
497 either::Right(timeout_cert) => {
498 let view_number = timeout_cert.view_number + 1;
499 self.create_dependency_task_if_new(
500 view_number,
501 epoch_number,
502 event_receiver,
503 event_sender,
504 Arc::clone(&event),
505 epoch_transition_indicator,
506 )
507 .await?;
508 },
509 either::Left(qc) => {
510 if qc.view_number <= self.consensus.read().await.high_qc().view_number {
512 tracing::trace!(
513 "Received a QC for a view that was not > than our current high QC"
514 );
515 }
516
517 self.formed_quorum_certificates
518 .insert(qc.view_number(), qc.clone());
519
520 handle_eqc_formed(
521 qc.view_number(),
522 qc.data.leaf_commit,
523 qc.data.block_number,
524 self,
525 &event_sender,
526 )
527 .await;
528
529 let view_number = qc.view_number() + 1;
530 if !qc
531 .data
532 .block_number
533 .is_some_and(|bn| is_last_block(bn, self.epoch_height))
534 {
535 broadcast_view_change(
536 &event_sender,
537 view_number,
538 qc.data.epoch,
539 self.first_epoch,
540 )
541 .await;
542 }
543 self.create_dependency_task_if_new(
544 view_number,
545 epoch_number,
546 event_receiver,
547 event_sender,
548 Arc::clone(&event),
549 epoch_transition_indicator,
550 )
551 .await?;
552 },
553 },
554
555 HotShotEvent::EpochRootQcFormed(EpochRootQuorumCertificateV2 { qc, state_cert }) => {
556 if qc.view_number() <= self.consensus.read().await.high_qc().view_number {
558 tracing::trace!(
559 "Received a QC for a view that was not > than our current high QC"
560 );
561 }
562
563 self.formed_quorum_certificates
564 .insert(qc.view_number(), qc.clone());
565 self.formed_state_cert
566 .insert(state_cert.epoch, state_cert.clone());
567
568 self.storage
569 .update_high_qc2_and_state_cert(qc.clone(), state_cert.clone())
570 .await
571 .wrap()
572 .context(error!(
573 "Failed to update the epoch root QC and state cert in storage!"
574 ))?;
575
576 let view_number = qc.view_number() + 1;
577 broadcast_view_change(&event_sender, view_number, qc.data.epoch, self.first_epoch)
578 .await;
579 self.create_dependency_task_if_new(
580 view_number,
581 epoch_number,
582 event_receiver,
583 event_sender,
584 Arc::clone(&event),
585 epoch_transition_indicator,
586 )
587 .await?;
588 },
589 HotShotEvent::SendPayloadCommitmentAndMetadata(
590 _payload_commitment,
591 _builder_commitment,
592 _metadata,
593 view_number,
594 _fee,
595 ) => {
596 let view_number = *view_number;
597
598 self.create_dependency_task_if_new(
599 view_number,
600 epoch_number,
601 event_receiver,
602 event_sender,
603 Arc::clone(&event),
604 epoch_transition_indicator,
605 )
606 .await?;
607 },
608 HotShotEvent::ViewSyncFinalizeCertificateRecv(certificate) => {
609 let epoch_number = certificate.data.epoch;
610 let epoch_membership = self
611 .membership_coordinator
612 .stake_table_for_epoch(epoch_number)
613 .context(warn!("No Stake Table for Epoch = {epoch_number:?}"))?;
614
615 let membership_stake_table =
616 StakeTableEntries::from_iter(epoch_membership.stake_table()).0;
617 let membership_success_threshold = epoch_membership.success_threshold();
618
619 certificate
620 .is_valid_cert(
621 &membership_stake_table,
622 membership_success_threshold,
623 &self.upgrade_lock,
624 )
625 .context(|e| {
626 warn!(
627 "View Sync Finalize certificate {:?} was invalid: {}",
628 certificate.data(),
629 e
630 )
631 })?;
632
633 let view_number = certificate.view_number;
634
635 self.create_dependency_task_if_new(
636 view_number,
637 epoch_number,
638 event_receiver,
639 event_sender,
640 event,
641 epoch_transition_indicator,
642 )
643 .await?;
644 },
645 HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
646 let view_number = proposal.data.view_number();
647 if !self.update_latest_proposed_view(view_number) {
649 tracing::trace!("Failed to update latest proposed view");
650 }
651
652 self.create_dependency_task_if_new(
653 view_number + 1,
654 epoch_number,
655 event_receiver,
656 event_sender,
657 Arc::clone(&event),
658 epoch_transition_indicator,
659 )
660 .await?;
661 },
662 HotShotEvent::QuorumProposalSend(proposal, _) => {
663 let view = proposal.data.view_number();
664
665 ensure!(
666 self.update_latest_proposed_view(view),
667 "Failed to update latest proposed view"
668 );
669 },
670 HotShotEvent::VidDisperseSend(vid_disperse, _) => {
671 let view_number = vid_disperse.data.view_number();
672 self.create_dependency_task_if_new(
673 view_number,
674 epoch_number,
675 event_receiver,
676 event_sender,
677 Arc::clone(&event),
678 epoch_transition_indicator,
679 )
680 .await?;
681 },
682 HotShotEvent::ViewChange(view, epoch) => {
683 if epoch > &self.cur_epoch {
684 self.cur_epoch = *epoch;
685 }
686 let keep_view = ViewNumber::new(view.saturating_sub(1));
687 self.cancel_tasks(keep_view);
688 },
689 HotShotEvent::Timeout(view, ..) => {
690 let keep_view = ViewNumber::new(view.saturating_sub(1));
691 self.cancel_tasks(keep_view);
692 },
693 HotShotEvent::NextEpochQc2Formed(Either::Left(next_epoch_qc)) => {
694 let current_next_epoch_qc =
696 self.consensus.read().await.next_epoch_high_qc().cloned();
697 ensure!(
698 current_next_epoch_qc.is_none()
699 || next_epoch_qc.view_number > current_next_epoch_qc.unwrap().view_number,
700 debug!(
701 "Received a next epoch QC for a view that was not > than our current next \
702 epoch high QC"
703 )
704 );
705
706 self.formed_next_epoch_quorum_certificates
707 .insert(next_epoch_qc.view_number(), next_epoch_qc.clone());
708
709 handle_eqc_formed(
710 next_epoch_qc.view_number(),
711 next_epoch_qc.data.leaf_commit,
712 next_epoch_qc.data.block_number,
713 self,
714 &event_sender,
715 )
716 .await;
717
718 let view_number = next_epoch_qc.view_number() + 1;
719 self.create_dependency_task_if_new(
720 view_number,
721 epoch_number,
722 event_receiver,
723 event_sender,
724 Arc::clone(&event),
725 epoch_transition_indicator,
726 )
727 .await?;
728 },
729 HotShotEvent::SetFirstEpoch(view, epoch) => {
730 self.first_epoch = Some((*view, *epoch));
731 },
732 _ => {},
733 }
734 Ok(())
735 }
736
737 pub fn cancel_tasks(&mut self, view: ViewNumber) {
739 let keep = self.proposal_dependencies.split_off(&view);
740 while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
741 if !cancel_sender.is_closed() {
742 tracing::debug!("Aborting proposal dependency task for view {view}");
743 let _ = cancel_sender.try_broadcast(());
744 }
745 }
746 self.proposal_dependencies = keep;
747 }
748}
749
750#[async_trait]
751impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState
752 for QuorumProposalTaskState<TYPES, I>
753{
754 type Event = HotShotEvent<TYPES>;
755
756 async fn handle_event(
757 &mut self,
758 event: Arc<Self::Event>,
759 sender: &Sender<Arc<Self::Event>>,
760 receiver: &Receiver<Arc<Self::Event>>,
761 ) -> Result<()> {
762 if self
763 .upgrade_lock
764 .new_protocol_active(self.latest_proposed_view)
765 {
766 return Ok(());
767 }
768 self.handle(event, receiver.clone(), sender.clone()).await
769 }
770
771 fn cancel_subtasks(&mut self) {
772 while let Some((view, cancel_sender)) = self.proposal_dependencies.pop_first() {
773 if !cancel_sender.is_closed() {
774 tracing::debug!("Aborting proposal dependency task for view {view}");
775 let _ = cancel_sender.try_broadcast(());
776 }
777 }
778 }
779}