1use std::{
11 marker::PhantomData,
12 sync::Arc,
13 time::{Duration, Instant},
14};
15
16use async_broadcast::{Receiver, Sender};
17use committable::{Commitment, Committable};
18use hotshot_contract_adapter::light_client::validate_light_client_state_update_certificate;
19use hotshot_task::dependency_task::HandleDepOutput;
20use hotshot_types::{
21 consensus::{CommitmentAndMetadata, OuterConsensus},
22 data::{
23 Leaf2, QuorumProposal2, QuorumProposalWrapper, VidDisperse, ViewChangeEvidence2, ViewNumber,
24 },
25 epoch_membership::EpochMembership,
26 message::Proposal,
27 simple_certificate::{
28 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
29 UpgradeCertificate, check_qc_state_cert_correspondence,
30 },
31 traits::{
32 BlockPayload,
33 block_contents::BlockHeader,
34 node_implementation::{NodeImplementation, NodeType},
35 signature_key::SignatureKey,
36 storage::Storage,
37 },
38 utils::{
39 epoch_from_block_number, is_epoch_root, is_epoch_transition, is_last_block,
40 is_transition_block, option_epoch_from_block_number,
41 },
42 vote::HasViewNumber,
43};
44use hotshot_utils::anytrace::*;
45use tracing::instrument;
46use versions::EPOCH_VERSION;
47
48use crate::{
49 events::HotShotEvent,
50 helpers::{broadcast_event, parent_leaf_and_state, validate_qc_and_next_epoch_qc},
51 quorum_proposal::{QuorumProposalTaskState, UpgradeLock},
52};
53
54#[derive(PartialEq, Debug)]
56pub(crate) enum ProposalDependency {
57 PayloadAndMetadata,
59
60 Qc,
62
63 ViewSyncCert,
65
66 TimeoutCert,
68
69 Proposal,
71
72 VidShare,
74}
75
76pub struct ProposalDependencyHandle<TYPES: NodeType> {
78 pub latest_proposed_view: ViewNumber,
80
81 pub view_number: ViewNumber,
83
84 pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
86
87 pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
89
90 pub instance_state: Arc<TYPES::InstanceState>,
92
93 pub membership: EpochMembership<TYPES>,
95
96 pub public_key: TYPES::SignatureKey,
98
99 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
101
102 pub consensus: OuterConsensus<TYPES>,
104
105 pub timeout: u64,
107
108 pub formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
115
116 pub upgrade_lock: UpgradeLock<TYPES>,
118
119 pub id: u64,
121
122 pub view_start_time: Instant,
124
125 pub epoch_height: u64,
127
128 pub cancel_receiver: Receiver<()>,
129}
130
131impl<TYPES: NodeType> ProposalDependencyHandle<TYPES> {
132 async fn wait_for_qc_event(
134 &self,
135 mut rx: Receiver<Arc<HotShotEvent<TYPES>>>,
136 ) -> Option<(
137 QuorumCertificate2<TYPES>,
138 Option<NextEpochQuorumCertificate2<TYPES>>,
139 Option<LightClientStateUpdateCertificateV2<TYPES>>,
140 )> {
141 while let Ok(event) = rx.recv_direct().await {
142 let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
143 HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
144 (qc, maybe_next_epoch_qc, None)
145 },
146 HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
147 (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
148 },
149 _ => continue,
150 };
151 if validate_qc_and_next_epoch_qc(
152 qc,
153 maybe_next_epoch_qc.as_ref(),
154 &self.consensus,
155 &self.membership.coordinator,
156 &self.upgrade_lock,
157 self.epoch_height,
158 )
159 .await
160 .is_ok()
161 {
162 if qc
163 .data
164 .block_number
165 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
166 {
167 if let Some(state_cert) = &maybe_state_cert {
169 if validate_light_client_state_update_certificate(
170 state_cert,
171 &self.membership.coordinator,
172 &self.upgrade_lock,
173 )
174 .await
175 .is_err()
176 || !check_qc_state_cert_correspondence(
177 qc,
178 state_cert,
179 self.epoch_height,
180 )
181 {
182 tracing::error!("Failed to validate state cert");
183 return None;
184 }
185 } else {
186 tracing::error!(
187 "Received an epoch root QC but we don't have the corresponding state \
188 cert."
189 );
190 return None;
191 }
192 } else {
193 maybe_state_cert = None;
194 }
195 return Some((qc.clone(), maybe_next_epoch_qc.clone(), maybe_state_cert));
196 }
197 }
198 None
199 }
200
201 async fn wait_for_transition_qc(
202 &self,
203 ) -> Result<
204 Option<(
205 QuorumCertificate2<TYPES>,
206 NextEpochQuorumCertificate2<TYPES>,
207 )>,
208 > {
209 ensure!(
210 self.upgrade_lock.epochs_enabled(self.view_number),
211 error!("Epochs are not enabled yet we tried to wait for Highest QC.")
212 );
213
214 let mut transition_qc = self.consensus.read().await.transition_qc().cloned();
215
216 let wait_duration = Duration::from_millis(self.timeout / 2);
217
218 let mut rx = self.receiver.clone();
219
220 while let Ok(event) = rx.try_recv() {
223 if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
224 if let Some(block_number) = qc.data.block_number {
225 if !is_transition_block(block_number, self.epoch_height) {
226 continue;
227 }
228 } else {
229 continue;
230 }
231 let Some(next_epoch_qc) = maybe_next_epoch_qc else {
232 continue;
233 };
234 if validate_qc_and_next_epoch_qc(
235 qc,
236 Some(next_epoch_qc),
237 &self.consensus,
238 &self.membership.coordinator,
239 &self.upgrade_lock,
240 self.epoch_height,
241 )
242 .await
243 .is_ok()
244 && transition_qc
245 .as_ref()
246 .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
247 {
248 transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
249 }
250 }
251 }
252 while self.view_start_time.elapsed() < wait_duration {
254 let time_spent = Instant::now()
255 .checked_duration_since(self.view_start_time)
256 .ok_or(error!(
257 "Time elapsed since the start of the task is negative. This should never \
258 happen."
259 ))?;
260 let time_left = wait_duration
261 .checked_sub(time_spent)
262 .ok_or(info!("No time left"))?;
263 let Ok(Ok(event)) = tokio::time::timeout(time_left, rx.recv_direct()).await else {
264 return Ok(transition_qc);
265 };
266 if let HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) = event.as_ref() {
267 if let Some(block_number) = qc.data.block_number {
268 if !is_transition_block(block_number, self.epoch_height) {
269 continue;
270 }
271 } else {
272 continue;
273 }
274 let Some(next_epoch_qc) = maybe_next_epoch_qc else {
275 continue;
276 };
277 if validate_qc_and_next_epoch_qc(
278 qc,
279 Some(next_epoch_qc),
280 &self.consensus,
281 &self.membership.coordinator,
282 &self.upgrade_lock,
283 self.epoch_height,
284 )
285 .await
286 .is_ok()
287 && transition_qc
288 .as_ref()
289 .is_none_or(|tqc| qc.view_number() > tqc.0.view_number())
290 {
291 transition_qc = Some((qc.clone(), next_epoch_qc.clone()));
292 }
293 }
294 }
295 Ok(transition_qc)
296 }
297 async fn wait_for_highest_qc(
301 &self,
302 ) -> Result<(
303 QuorumCertificate2<TYPES>,
304 Option<NextEpochQuorumCertificate2<TYPES>>,
305 Option<LightClientStateUpdateCertificateV2<TYPES>>,
306 )> {
307 tracing::debug!("waiting for QC");
308 ensure!(
310 self.upgrade_lock.epochs_enabled(self.view_number),
311 error!("Epochs are not enabled yet we tried to wait for Highest QC.")
312 );
313
314 let consensus_reader = self.consensus.read().await;
315 let mut highest_qc = consensus_reader.high_qc().clone();
316 let mut state_cert = if highest_qc
317 .data
318 .block_number
319 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
320 {
321 consensus_reader.state_cert().cloned()
322 } else {
323 None
324 };
325 let mut next_epoch_qc = if highest_qc
326 .data
327 .block_number
328 .is_some_and(|bn| is_last_block(bn, self.epoch_height))
329 {
330 let maybe_neqc = consensus_reader.next_epoch_high_qc().cloned();
331 if maybe_neqc
332 .as_ref()
333 .is_some_and(|neqc| neqc.data.leaf_commit == highest_qc.data.leaf_commit)
334 {
335 maybe_neqc
336 } else {
337 None
338 }
339 } else {
340 None
341 };
342 drop(consensus_reader);
343
344 let wait_duration = Duration::from_millis(self.timeout / 2);
345
346 let mut rx = self.receiver.clone();
347
348 while let Ok(event) = rx.try_recv() {
350 let (qc, maybe_next_epoch_qc, mut maybe_state_cert) = match event.as_ref() {
351 HotShotEvent::HighQcRecv(qc, maybe_next_epoch_qc, _sender) => {
352 (qc, maybe_next_epoch_qc, None)
353 },
354 HotShotEvent::EpochRootQcRecv(root_qc, _sender) => {
355 (&root_qc.qc, &None, Some(root_qc.state_cert.clone()))
356 },
357 _ => continue,
358 };
359 if validate_qc_and_next_epoch_qc(
360 qc,
361 maybe_next_epoch_qc.as_ref(),
362 &self.consensus,
363 &self.membership.coordinator,
364 &self.upgrade_lock,
365 self.epoch_height,
366 )
367 .await
368 .is_ok()
369 {
370 if qc
371 .data
372 .block_number
373 .is_some_and(|bn| is_epoch_root(bn, self.epoch_height))
374 {
375 if let Some(state_cert) = &maybe_state_cert {
377 if validate_light_client_state_update_certificate(
378 state_cert,
379 &self.membership.coordinator,
380 &self.upgrade_lock,
381 )
382 .await
383 .is_err()
384 || !check_qc_state_cert_correspondence(
385 qc,
386 state_cert,
387 self.epoch_height,
388 )
389 {
390 tracing::error!("Failed to validate state cert");
391 continue;
392 }
393 } else {
394 tracing::error!(
395 "Received an epoch root QC but we don't have the corresponding state \
396 cert."
397 );
398 continue;
399 }
400 } else {
401 maybe_state_cert = None;
402 }
403 if qc.view_number() > highest_qc.view_number() {
404 highest_qc = qc.clone();
405 next_epoch_qc = maybe_next_epoch_qc.clone();
406 state_cert = maybe_state_cert;
407 }
408 }
409 }
410
411 while self.view_start_time.elapsed() < wait_duration {
413 let time_spent = Instant::now()
414 .checked_duration_since(self.view_start_time)
415 .ok_or(error!(
416 "Time elapsed since the start of the task is negative. This should never \
417 happen."
418 ))?;
419 let time_left = wait_duration
420 .checked_sub(time_spent)
421 .ok_or(info!("No time left"))?;
422 let Ok(maybe_qc_state_cert) =
423 tokio::time::timeout(time_left, self.wait_for_qc_event(rx.clone())).await
424 else {
425 tracing::info!(
426 "Some nodes did not respond with their HighQc in time. Continuing with the \
427 highest QC that we received: {highest_qc:?}"
428 );
429 return Ok((highest_qc, next_epoch_qc, state_cert));
430 };
431 let Some((qc, maybe_next_epoch_qc, maybe_state_cert)) = maybe_qc_state_cert else {
432 continue;
433 };
434 if qc.view_number() > highest_qc.view_number() {
435 highest_qc = qc;
436 next_epoch_qc = maybe_next_epoch_qc;
437 state_cert = maybe_state_cert;
438 }
439 }
440 Ok((highest_qc, next_epoch_qc, state_cert))
441 }
442 #[allow(clippy::too_many_arguments)]
446 #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
447 async fn publish_proposal(
448 &self,
449 commitment_and_metadata: CommitmentAndMetadata<TYPES>,
450 _vid_share: Proposal<TYPES, VidDisperse<TYPES>>,
451 view_change_evidence: Option<ViewChangeEvidence2<TYPES>>,
452 formed_upgrade_certificate: Option<UpgradeCertificate<TYPES>>,
453 parent_qc: QuorumCertificate2<TYPES>,
454 maybe_next_epoch_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
455 maybe_state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
456 ) -> Result<()> {
457 let (parent_leaf, state) = parent_leaf_and_state(
458 &self.sender,
459 &self.receiver,
460 self.membership.coordinator.clone(),
461 self.public_key.clone(),
462 self.private_key.clone(),
463 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
464 &self.upgrade_lock,
465 &parent_qc,
466 self.epoch_height,
467 )
468 .await?;
469
470 let mut upgrade_certificate = parent_leaf
485 .upgrade_certificate()
486 .or(formed_upgrade_certificate);
487
488 if let Some(cert) = upgrade_certificate.clone()
489 && cert.is_relevant(self.view_number).await.is_err()
490 {
491 upgrade_certificate = None;
492 }
493
494 let proposal_certificate = view_change_evidence
495 .as_ref()
496 .filter(|cert| cert.is_valid_for_view(&self.view_number))
497 .cloned();
498
499 ensure!(
500 commitment_and_metadata.block_view == self.view_number,
501 "Cannot propose because our VID payload commitment and metadata is for an older view."
502 );
503
504 let version = self.upgrade_lock.version(self.view_number)?;
505
506 let builder_commitment = commitment_and_metadata.builder_commitment.clone();
507 let metadata = commitment_and_metadata.metadata.clone();
508
509 if version >= EPOCH_VERSION
510 && parent_qc.view_number()
511 > self
512 .upgrade_lock
513 .upgrade_view()
514 .unwrap_or(ViewNumber::new(0))
515 {
516 let Some(parent_block_number) = parent_qc.data.block_number else {
517 tracing::error!("Parent QC does not have a block number. Do not propose.");
518 return Ok(());
519 };
520 if is_epoch_transition(parent_block_number, self.epoch_height)
521 && !is_last_block(parent_block_number, self.epoch_height)
522 {
523 let (empty_payload, empty_metadata) = <TYPES as NodeType>::BlockPayload::empty();
524 tracing::info!("Reached end of epoch.");
525 ensure!(
526 builder_commitment == empty_payload.builder_commitment(&metadata)
527 && metadata == empty_metadata,
528 "We're trying to propose non empty block in the epoch transition. Do not \
529 propose. View number: {}. Parent Block number: {}",
530 self.view_number,
531 parent_block_number,
532 );
533 }
534 if is_epoch_root(parent_block_number, self.epoch_height) {
535 ensure!(
536 maybe_state_cert.as_ref().is_some_and(|state_cert| {
537 check_qc_state_cert_correspondence(
538 &parent_qc,
539 state_cert,
540 self.epoch_height,
541 )
542 }),
543 "We are proposing with parent epoch root QC but we don't have the \
544 corresponding state cert."
545 );
546 }
547 }
548 let block_header = TYPES::BlockHeader::new(
549 state.as_ref(),
550 self.instance_state.as_ref(),
551 &parent_leaf,
552 commitment_and_metadata.commitment,
553 builder_commitment,
554 metadata,
555 commitment_and_metadata.fees.first().clone(),
556 version,
557 *self.view_number,
558 )
559 .await
560 .wrap()
561 .context(warn!("Failed to construct block header"))?;
562 let epoch = option_epoch_from_block_number(
563 version >= EPOCH_VERSION,
564 block_header.block_number(),
565 self.epoch_height,
566 );
567
568 let epoch_membership = self.membership.coordinator.membership_for_epoch(epoch)?;
569 if epoch_membership.leader(self.view_number)? != self.public_key {
572 tracing::warn!(
573 "We are not the leader in the epoch for which we are about to propose. Do not \
574 send the quorum proposal."
575 );
576 return Ok(());
577 }
578 let is_high_qc_for_transition_block = parent_qc
579 .data
580 .block_number
581 .is_some_and(|block_number| is_epoch_transition(block_number, self.epoch_height));
582 let next_epoch_qc = if self.upgrade_lock.epochs_enabled(self.view_number)
583 && is_high_qc_for_transition_block
584 {
585 ensure!(
586 maybe_next_epoch_qc
587 .as_ref()
588 .is_some_and(|neqc| neqc.data.leaf_commit == parent_qc.data.leaf_commit),
589 "Justify QC on our proposal is for an epoch transition block but we don't have \
590 the corresponding next epoch QC. Do not propose."
591 );
592 maybe_next_epoch_qc
593 } else {
594 None
595 };
596 let next_drb_result = if is_epoch_transition(block_header.block_number(), self.epoch_height)
597 {
598 if let Some(epoch_val) = &epoch {
599 let drb_result = epoch_membership
600 .next_epoch()
601 .context(warn!("No stake table for epoch {}", *epoch_val + 1))?
602 .get_epoch_drb()
603 .await
604 .clone()
605 .context(warn!("No DRB result for epoch {}", *epoch_val + 1))?;
606
607 Some(drb_result)
608 } else {
609 None
610 }
611 } else {
612 None
613 };
614
615 let proposal = QuorumProposalWrapper {
616 proposal: QuorumProposal2 {
617 block_header,
618 view_number: self.view_number,
619 epoch,
620 justify_qc: parent_qc,
621 next_epoch_justify_qc: next_epoch_qc,
622 upgrade_certificate,
623 view_change_evidence: proposal_certificate,
624 next_drb_result,
625 state_cert: maybe_state_cert,
626 },
627 };
628
629 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal);
630 ensure!(
631 proposed_leaf.parent_commitment() == parent_leaf.commit(),
632 "Proposed leaf parent does not equal high qc"
633 );
634
635 let signature =
636 TYPES::SignatureKey::sign(&self.private_key, proposed_leaf.commit().as_ref())
637 .wrap()
638 .context(error!("Failed to compute proposed_leaf.commit()"))?;
639
640 let message = Proposal {
641 data: proposal,
642 signature,
643 _pd: PhantomData,
644 };
645 tracing::info!(
646 "Sending proposal for view {}, height {}, justify_qc view: {}",
647 proposed_leaf.view_number(),
648 proposed_leaf.height(),
649 proposed_leaf.justify_qc().view_number()
650 );
651
652 broadcast_event(
653 Arc::new(HotShotEvent::QuorumProposalSend(
654 message.clone(),
655 self.public_key.clone(),
656 )),
657 &self.sender,
658 )
659 .await;
660
661 Ok(())
662 }
663
664 fn print_proposal_events(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) {
665 let events: Vec<_> = res.iter().flatten().flatten().map(Arc::as_ref).collect();
666 tracing::warn!("Failed to propose, events: {:#?}", events);
667 }
668
669 async fn handle_proposal_deps(&self, res: &[Vec<Vec<Arc<HotShotEvent<TYPES>>>>]) -> Result<()> {
670 let mut commit_and_metadata: Option<CommitmentAndMetadata<TYPES>> = None;
671 let mut timeout_certificate = None;
672 let mut view_sync_finalize_cert = None;
673 let mut vid_share = None;
674 let mut parent_qc = None;
675 let mut next_epoch_qc = None;
676 let mut state_cert = None;
677 for event in res.iter().flatten().flatten() {
678 match event.as_ref() {
679 HotShotEvent::SendPayloadCommitmentAndMetadata(
680 payload_commitment,
681 builder_commitment,
682 metadata,
683 view,
684 fees,
685 ) => {
686 commit_and_metadata = Some(CommitmentAndMetadata {
687 commitment: *payload_commitment,
688 builder_commitment: builder_commitment.clone(),
689 metadata: metadata.clone(),
690 fees: fees.clone(),
691 block_view: *view,
692 });
693 },
694 HotShotEvent::Qc2Formed(cert) => match cert {
695 either::Right(timeout) => {
696 timeout_certificate = Some(timeout.clone());
697 },
698 either::Left(qc) => {
699 parent_qc = Some(qc.clone());
700 },
701 },
702 HotShotEvent::EpochRootQcFormed(root_qc) => {
703 parent_qc = Some(root_qc.qc.clone());
704 state_cert = Some(root_qc.state_cert.clone());
705 },
706 HotShotEvent::ViewSyncFinalizeCertificateRecv(cert) => {
707 view_sync_finalize_cert = Some(cert.clone());
708 },
709 HotShotEvent::VidDisperseSend(share, _) => {
710 vid_share = Some(share.clone());
711 },
712 HotShotEvent::NextEpochQc2Formed(either::Left(qc)) => {
713 next_epoch_qc = Some(qc.clone());
714 },
715 _ => {},
716 }
717 }
718
719 let Ok(version) = self.upgrade_lock.version(self.view_number) else {
720 bail!(error!(
721 "Failed to get version for view {:?}, not proposing",
722 self.view_number
723 ));
724 };
725
726 let mut maybe_epoch = None;
727 let proposal_cert = if let Some(view_sync_cert) = view_sync_finalize_cert {
728 maybe_epoch = view_sync_cert.data.epoch;
729 Some(ViewChangeEvidence2::ViewSync(view_sync_cert))
730 } else {
731 match timeout_certificate {
732 Some(timeout_cert) => {
733 maybe_epoch = timeout_cert.data.epoch;
734 Some(ViewChangeEvidence2::Timeout(timeout_cert))
735 },
736 None => None,
737 }
738 };
739
740 let (parent_qc, maybe_next_epoch_qc, maybe_state_cert) = if let Some(qc) = parent_qc {
741 if qc
742 .data
743 .block_number
744 .is_some_and(|bn| is_transition_block(bn, self.epoch_height))
745 && next_epoch_qc
746 .as_ref()
747 .is_none_or(|neqc| neqc.data.leaf_commit != qc.data.leaf_commit)
748 {
749 bail!(error!(
750 "We've formed a transition QC but we haven't formed the corresponding next \
751 epoch QC. Do not propose."
752 ));
753 }
754 (qc, next_epoch_qc, state_cert)
755 } else if version < EPOCH_VERSION {
756 (self.consensus.read().await.high_qc().clone(), None, None)
757 } else if proposal_cert.is_some() {
758 if let Ok(Some((qc, next_epoch_qc))) = self.wait_for_transition_qc().await {
760 let Some(epoch) = maybe_epoch else {
761 bail!(error!(
762 "No epoch found on view change evidence, but we are in epoch mode"
763 ));
764 };
765 if qc
766 .data
767 .block_number
768 .is_some_and(|bn| epoch_from_block_number(bn, self.epoch_height) == *epoch)
769 {
770 (qc, Some(next_epoch_qc), None)
771 } else {
772 match self.wait_for_highest_qc().await {
773 Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
774 (qc, maybe_next_epoch_qc, maybe_state_cert)
775 },
776 Err(e) => {
777 bail!(error!("Error while waiting for highest QC: {e:?}"));
778 },
779 }
780 }
781 } else {
782 let Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) =
783 self.wait_for_highest_qc().await
784 else {
785 bail!(error!("Error while waiting for highest QC"));
786 };
787 if qc.data.block_number.is_some_and(|bn| {
788 is_epoch_transition(bn, self.epoch_height)
789 && !is_last_block(bn, self.epoch_height)
790 }) {
791 bail!(error!(
792 "High is in transition but we need to propose with transition QC, do \
793 nothing"
794 ));
795 }
796 (qc, maybe_next_epoch_qc, maybe_state_cert)
797 }
798 } else {
799 match self.wait_for_highest_qc().await {
800 Ok((qc, maybe_next_epoch_qc, maybe_state_cert)) => {
801 (qc, maybe_next_epoch_qc, maybe_state_cert)
802 },
803 Err(e) => {
804 bail!(error!("Error while waiting for highest QC: {e:?}"));
805 },
806 }
807 };
808
809 ensure!(
810 commit_and_metadata.is_some(),
811 error!(
812 "Somehow completed the proposal dependency task without a commitment and metadata"
813 )
814 );
815
816 ensure!(
817 vid_share.is_some(),
818 error!("Somehow completed the proposal dependency task without a VID share")
819 );
820
821 self.publish_proposal(
822 commit_and_metadata.unwrap(),
823 vid_share.unwrap(),
824 proposal_cert,
825 self.formed_upgrade_certificate.clone(),
826 parent_qc,
827 maybe_next_epoch_qc,
828 maybe_state_cert,
829 )
830 .await
831 .inspect_err(|e| tracing::error!("Failed to publish proposal: {e:?}"))
832 }
833}
834
835impl<TYPES: NodeType> HandleDepOutput for ProposalDependencyHandle<TYPES> {
836 type Output = Vec<Vec<Vec<Arc<HotShotEvent<TYPES>>>>>;
837
838 #[allow(clippy::no_effect_underscore_binding, clippy::too_many_lines)]
839 #[instrument(skip_all, fields(id = self.id, view_number = *self.view_number, latest_proposed_view = *self.latest_proposed_view))]
840 async fn handle_dep_result(self, res: Self::Output) {
841 let mut cancel_receiver = self.cancel_receiver.clone();
842 let result = tokio::select! {
843 result = self.handle_proposal_deps(&res) => {
844 result
845 }
846 _ = cancel_receiver.recv() => {
847 tracing::warn!("Proposal dependency task cancelled");
848 return;
849 }
850 };
851 if result.is_err() {
852 log!(result);
853 self.print_proposal_events(&res)
854 }
855 }
856}
857
858pub(super) async fn handle_eqc_formed<TYPES: NodeType, I: NodeImplementation<TYPES>>(
859 cert_view: ViewNumber,
860 leaf_commit: Commitment<Leaf2<TYPES>>,
861 block_number: Option<u64>,
862 task_state: &mut QuorumProposalTaskState<TYPES, I>,
863 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
864) {
865 if !task_state.upgrade_lock.epochs_enabled(cert_view) {
866 tracing::debug!("QC2 formed but epochs not enabled. Do nothing");
867 return;
868 }
869 if !block_number.is_some_and(|bn| is_last_block(bn, task_state.epoch_height)) {
870 tracing::debug!("We formed QC but not eQC. Do nothing");
871 return;
872 }
873
874 let Some(current_epoch_qc) = task_state.formed_quorum_certificates.get(&cert_view) else {
875 tracing::debug!("We formed the eQC but we don't have the current epoch QC at all.");
876 return;
877 };
878 if current_epoch_qc.view_number() != cert_view
879 || current_epoch_qc.data.leaf_commit != leaf_commit
880 {
881 tracing::debug!("We haven't yet formed the eQC. Do nothing");
882 return;
883 }
884 let Some(next_epoch_qc) = task_state
885 .formed_next_epoch_quorum_certificates
886 .get(&cert_view)
887 else {
888 tracing::debug!("We formed the eQC but we don't have the next epoch eQC at all.");
889 return;
890 };
891 if current_epoch_qc.view_number() != cert_view || current_epoch_qc.data != *next_epoch_qc.data {
892 tracing::debug!(
893 "We formed the eQC but the current and next epoch QCs do not correspond to each other."
894 );
895 return;
896 }
897 let current_epoch_qc_clone = current_epoch_qc.clone();
898
899 let mut consensus_writer = task_state.consensus.write().await;
900 let _ = consensus_writer.update_high_qc(current_epoch_qc_clone.clone());
901 let _ = consensus_writer.update_next_epoch_high_qc(next_epoch_qc.clone());
902 drop(consensus_writer);
903
904 if let Err(e) = task_state
905 .storage
906 .update_eqc(current_epoch_qc.clone(), next_epoch_qc.clone())
907 .await
908 {
909 tracing::error!("Failed to store EQC: {}", e);
910 }
911
912 task_state.formed_quorum_certificates =
913 task_state.formed_quorum_certificates.split_off(&cert_view);
914 task_state.formed_next_epoch_quorum_certificates = task_state
915 .formed_next_epoch_quorum_certificates
916 .split_off(&cert_view);
917
918 broadcast_event(
919 Arc::new(HotShotEvent::ExtendedQc2Formed(current_epoch_qc_clone)),
920 event_sender,
921 )
922 .await;
923}