1use std::{collections::HashSet, sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, SendError, Sender};
10use committable::{Commitment, Committable};
11use hotshot_task::dependency::{Dependency, EventDependency};
12use hotshot_types::{
13 consensus::OuterConsensus,
14 data::{
15 EpochNumber, Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2,
16 ViewNumber,
17 },
18 drb::DrbResult,
19 epoch_membership::EpochMembershipCoordinator,
20 event::{Event, EventType, LeafInfo},
21 message::{Proposal, UpgradeLock},
22 request_response::ProposalRequestPayload,
23 simple_certificate::{
24 CertificatePair, DaCertificate2, NextEpochQuorumCertificate2, QuorumCertificate2,
25 UpgradeCertificate,
26 },
27 simple_vote::HasEpoch,
28 stake_table::StakeTableEntries,
29 traits::{
30 BlockPayload, ValidatedState,
31 block_contents::BlockHeader,
32 election::Membership,
33 node_implementation::{NodeImplementation, NodeType},
34 signature_key::SignatureKey,
35 storage::Storage,
36 },
37 utils::{
38 Terminator, View, ViewInner, epoch_from_block_number, is_epoch_root, is_epoch_transition,
39 is_transition_block, option_epoch_from_block_number,
40 },
41 vote::{Certificate, HasViewNumber},
42};
43use hotshot_utils::anytrace::*;
44use time::OffsetDateTime;
45use tokio::time::timeout;
46use tracing::instrument;
47use versions::EPOCH_VERSION;
48
49use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
50
51#[instrument(skip_all)]
53#[allow(clippy::too_many_arguments)]
54pub(crate) async fn fetch_proposal<TYPES: NodeType>(
55 qc: &QuorumCertificate2<TYPES>,
56 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
57 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
58 membership_coordinator: EpochMembershipCoordinator<TYPES>,
59 consensus: OuterConsensus<TYPES>,
60 sender_public_key: TYPES::SignatureKey,
61 sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
62 upgrade_lock: &UpgradeLock<TYPES>,
63 epoch_height: u64,
64) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
65 let view_number = qc.view_number();
66 let leaf_commit = qc.data.leaf_commit;
67 let signed_proposal_request = ProposalRequestPayload {
70 view_number,
71 key: sender_public_key,
72 };
73
74 let signature = TYPES::SignatureKey::sign(
76 &sender_private_key,
77 signed_proposal_request.commit().as_ref(),
78 )
79 .wrap()
80 .context(error!("Failed to sign proposal. This should never happen."))?;
81
82 tracing::info!("Sending proposal request for view {view_number}");
83
84 broadcast_event(
86 HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
87 &event_sender,
88 )
89 .await;
90
91 let mut rx = event_receiver.clone();
92 let Ok(Some(proposal)) =
94 timeout(REQUEST_TIMEOUT, async move {
96 while let Ok(event) = rx.recv_direct().await {
98 if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
99 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
100 if leaf.view_number() == view_number && leaf.commit() == leaf_commit {
101 return Some(quorum_proposal.clone());
102 }
103 }
104 }
105 None
106 })
107 .await
108 else {
109 bail!("Request for proposal failed");
110 };
111
112 let view_number = proposal.data.view_number();
113 let justify_qc = proposal.data.justify_qc().clone();
114
115 let justify_qc_epoch = justify_qc.data.epoch();
116
117 let epoch_membership = membership_coordinator.stake_table_for_epoch(justify_qc_epoch)?;
118 let membership_stake_table = StakeTableEntries::from_iter(epoch_membership.stake_table()).0;
119 let membership_success_threshold = epoch_membership.success_threshold();
120
121 justify_qc
122 .is_valid_cert(
123 &membership_stake_table,
124 membership_success_threshold,
125 upgrade_lock,
126 )
127 .context(|e| warn!("Invalid justify_qc in proposal for view {view_number}: {e}"))?;
128
129 let mut consensus_writer = consensus.write().await;
130 let leaf = Leaf2::from_quorum_proposal(&proposal.data);
131 let state = Arc::new(
132 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
133 );
134
135 if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
136 tracing::trace!("{e:?}");
137 }
138 let view = View {
139 view_inner: ViewInner::Leaf {
140 leaf: leaf.commit(),
141 state,
142 delta: None,
143 epoch: leaf.epoch(epoch_height),
144 },
145 };
146 Ok((leaf, view))
147}
148pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
149 membership: &TYPES::Membership,
150 epoch: EpochNumber,
151 storage: &I::Storage,
152 drb_result: DrbResult,
153) {
154 tracing::debug!("Calling store_drb_result for epoch {epoch}");
155 if let Err(e) = storage.store_drb_result(epoch, drb_result).await {
156 tracing::error!("Failed to store drb result for epoch {epoch}: {e}");
157 }
158
159 membership.add_drb_result(epoch, drb_result)
160}
161
162pub(crate) async fn verify_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
169 proposal: &QuorumProposalWrapper<TYPES>,
170 validation_info: &ValidationInfo<TYPES, I>,
171) -> Result<()> {
172 if validation_info.epoch_height == 0
174 || !is_epoch_transition(
175 proposal.block_header().block_number(),
176 validation_info.epoch_height,
177 )
178 {
179 tracing::debug!("Skipping DRB result verification");
180 return Ok(());
181 }
182
183 let epoch = option_epoch_from_block_number(
187 validation_info
188 .upgrade_lock
189 .epochs_enabled(proposal.view_number()),
190 proposal.block_header().block_number(),
191 validation_info.epoch_height,
192 );
193
194 let proposal_result = proposal
195 .next_drb_result()
196 .context(info!("Proposal is missing the next epoch's DRB result."))?;
197
198 if let Some(epoch_val) = epoch {
199 let current_epoch_membership = validation_info
200 .membership
201 .coordinator
202 .stake_table_for_epoch(epoch)
203 .context(warn!("No stake table for epoch {}", epoch_val))?;
204
205 let has_stake_current_epoch =
206 current_epoch_membership.has_stake(&validation_info.public_key);
207
208 if has_stake_current_epoch {
209 let computed_result = current_epoch_membership
210 .next_epoch()
211 .context(warn!("No stake table for epoch {}", epoch_val + 1))?
212 .get_epoch_drb()
213 .await
214 .clone()
215 .context(warn!("DRB result not found"))?;
216
217 ensure!(
218 proposal_result == computed_result,
219 warn!(
220 "Our calculated DRB result is {computed_result:?}, which does not match the \
221 proposed DRB result of {proposal_result:?}"
222 )
223 );
224 }
225
226 Ok(())
227 } else {
228 Err(error!("Epochs are not available"))
229 }
230}
231
232async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
234 decided_leaf: &Leaf2<TYPES>,
235 epoch_height: u64,
236 membership: &EpochMembershipCoordinator<TYPES>,
237 storage: &I::Storage,
238 consensus: &OuterConsensus<TYPES>,
239) {
240 let decided_leaf = decided_leaf.clone();
241 let decided_block_number = decided_leaf.block_header().block_number();
242
243 if epoch_height != 0 && is_epoch_root(decided_block_number, epoch_height) {
245 let next_epoch_number =
246 EpochNumber::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
247
248 let start = Instant::now();
249 if let Err(e) = storage
250 .store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
251 .await
252 {
253 tracing::error!("Failed to store epoch root for epoch {next_epoch_number}: {e}");
254 }
255 tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());
256
257 let membership = membership.clone();
258 let decided_block_header = decided_leaf.block_header().clone();
259 let storage = storage.clone();
260 let consensus = consensus.clone();
261
262 let consensus_reader = consensus.read().await;
263
264 drop(consensus_reader);
265
266 tokio::spawn(async move {
267 let membership_clone = membership.clone();
268
269 {
271 let start = Instant::now();
272 if let Err(e) = membership_clone
273 .membership()
274 .add_epoch_root(decided_block_header)
275 .await
276 {
277 tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
278 }
279 tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
280 }
281
282 let membership_clone = membership.clone();
283
284 let drb_result = membership_clone
285 .compute_drb_result(next_epoch_number, decided_leaf.clone())
286 .await;
287
288 let drb_result = match drb_result {
289 Ok(result) => result,
290 Err(e) => {
291 tracing::error!("Failed to compute DRB result from decide: {e}");
292 return;
293 },
294 };
295
296 let start = Instant::now();
297 handle_drb_result::<TYPES, I>(
298 membership.membership(),
299 next_epoch_number,
300 &storage,
301 drb_result,
302 )
303 .await;
304 tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
305 });
306 }
307}
308
309#[derive(Debug)]
311pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
312 pub new_locked_view_number: Option<ViewNumber>,
314
315 pub new_decided_view_number: Option<ViewNumber>,
317
318 pub committing_qc: Option<CertificatePair<TYPES>>,
322
323 pub deciding_qc: Option<CertificatePair<TYPES>>,
328
329 pub leaf_views: Vec<LeafInfo<TYPES>>,
331
332 pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
334
335 pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
337}
338
339impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
343 fn default() -> Self {
345 Self {
346 new_locked_view_number: None,
347 new_decided_view_number: None,
348 committing_qc: None,
349 deciding_qc: None,
350 leaf_views: Vec::new(),
351 included_txns: None,
352 decided_upgrade_cert: None,
353 }
354 }
355}
356
357async fn update_metrics<TYPES: NodeType>(
358 consensus: &OuterConsensus<TYPES>,
359 leaf_views: &[LeafInfo<TYPES>],
360) {
361 let consensus_reader = consensus.read().await;
362 let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
363
364 for leaf_view in leaf_views {
365 let proposal_timestamp = leaf_view.leaf.block_header().timestamp();
366
367 let Some(proposal_to_decide_time) = now.checked_sub(proposal_timestamp) else {
368 tracing::error!("Failed to calculate proposal to decide time: {proposal_timestamp}");
369 continue;
370 };
371 consensus_reader
372 .metrics
373 .proposal_to_decide_time
374 .add_point(proposal_to_decide_time as f64);
375 if let Some(txn_bytes) = leaf_view.leaf.block_payload().map(|p| p.txn_bytes()) {
376 consensus_reader
377 .metrics
378 .finalized_bytes
379 .add_point(txn_bytes as f64);
380 }
381 }
382}
383
384#[allow(clippy::too_many_arguments)]
390pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>>(
391 proposal: &QuorumProposalWrapper<TYPES>,
392 consensus: OuterConsensus<TYPES>,
393 upgrade_lock: &UpgradeLock<TYPES>,
394 public_key: &TYPES::SignatureKey,
395 with_epochs: bool,
396 membership: &EpochMembershipCoordinator<TYPES>,
397 storage: &I::Storage,
398) -> LeafChainTraversalOutcome<TYPES> {
399 let mut res = LeafChainTraversalOutcome::default();
400 let consensus_reader = consensus.read().await;
401 let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
402 res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
403
404 let Some(parent_info) = consensus_reader
406 .parent_leaf_info(&proposed_leaf, public_key)
407 .await
408 else {
409 return res;
410 };
411 let Some(grand_parent_info) = consensus_reader
414 .parent_leaf_info(&parent_info.leaf, public_key)
415 .await
416 else {
417 return res;
418 };
419 if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
420 return res;
421 }
422 res.committing_qc = Some(CertificatePair::for_parent(&parent_info.leaf));
423 res.deciding_qc = Some(CertificatePair::for_parent(&proposed_leaf));
424 let decided_view_number = grand_parent_info.leaf.view_number();
425 res.new_decided_view_number = Some(decided_view_number);
426 let old_anchor_view = consensus_reader.last_decided_view();
428 let mut current_leaf_info = Some(grand_parent_info);
429 let existing_upgrade_cert_reader = upgrade_lock.decided_upgrade_cert();
430 let mut txns = HashSet::new();
431 while current_leaf_info
432 .as_ref()
433 .is_some_and(|info| info.leaf.view_number() > old_anchor_view)
434 {
435 let info = &mut current_leaf_info.unwrap();
437 if let Some(cert) = info.leaf.upgrade_certificate()
439 && info.leaf.upgrade_certificate() != existing_upgrade_cert_reader
440 {
441 if cert.data.decide_by < decided_view_number {
442 tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
443 } else {
444 tracing::info!("Reached decide on upgrade certificate: {cert:?}");
445 res.decided_upgrade_cert = Some(cert.clone());
446 }
447 }
448
449 if let Some(payload) = consensus_reader
452 .saved_payloads()
453 .get(&info.leaf.view_number())
454 {
455 info.leaf
456 .fill_block_payload_unchecked(payload.as_ref().payload.clone());
457 }
458
459 if let Some(ref payload) = info.leaf.block_payload() {
460 for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
461 txns.insert(txn);
462 }
463 }
464
465 current_leaf_info = consensus_reader
466 .parent_leaf_info(&info.leaf, public_key)
467 .await;
468 res.leaf_views.push(info.clone());
469 }
470
471 if !txns.is_empty() {
472 res.included_txns = Some(txns);
473 }
474
475 if with_epochs && res.new_decided_view_number.is_some() {
476 let Some(first_leaf) = res.leaf_views.first() else {
477 return res;
478 };
479 let epoch_height = consensus_reader.epoch_height;
480 consensus_reader
481 .metrics
482 .last_synced_block_height
483 .set(usize::try_from(first_leaf.leaf.height()).unwrap_or(0));
484 drop(consensus_reader);
485
486 for decided_leaf_info in &res.leaf_views {
487 decide_epoch_root::<TYPES, I>(
488 &decided_leaf_info.leaf,
489 epoch_height,
490 membership,
491 storage,
492 &consensus,
493 )
494 .await;
495 }
496 update_metrics(&consensus, &res.leaf_views).await;
497 }
498
499 res
500}
501
502#[allow(clippy::too_many_arguments)]
534pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>>(
535 proposal: &QuorumProposalWrapper<TYPES>,
536 consensus: OuterConsensus<TYPES>,
537 upgrade_lock: &UpgradeLock<TYPES>,
538 public_key: &TYPES::SignatureKey,
539 with_epochs: bool,
540 membership: &EpochMembershipCoordinator<TYPES>,
541 storage: &I::Storage,
542 epoch_height: u64,
543) -> LeafChainTraversalOutcome<TYPES> {
544 let consensus_reader = consensus.read().await;
545 let existing_upgrade_cert_reader = upgrade_lock.decided_upgrade_cert();
546 let view_number = proposal.view_number();
547 let parent_view_number = proposal.justify_qc().view_number();
548 let old_anchor_view = consensus_reader.last_decided_view();
549
550 let mut last_view_number_visited = view_number;
551 let mut current_chain_length = 0usize;
552 let mut res = LeafChainTraversalOutcome::default();
553
554 if let Err(e) = consensus_reader.visit_leaf_ancestors(
555 parent_view_number,
556 Terminator::Exclusive(old_anchor_view),
557 true,
558 |leaf, state, delta| {
559 if res.new_decided_view_number.is_none() {
561 if last_view_number_visited == leaf.view_number() + 1 {
563 last_view_number_visited = leaf.view_number();
564
565 current_chain_length += 1;
567
568 if current_chain_length == 2 {
570 res.new_locked_view_number = Some(leaf.view_number());
571 res.committing_qc = Some(CertificatePair::for_parent(leaf));
574 } else if current_chain_length == 3 {
575 res.new_decided_view_number = Some(leaf.view_number());
577 }
578 } else {
579 return false;
582 }
583 }
584
585 if let Some(new_decided_view) = res.new_decided_view_number {
587 let mut leaf = leaf.clone();
589
590 if leaf.view_number() == new_decided_view {
592 consensus_reader
593 .metrics
594 .last_synced_block_height
595 .set(usize::try_from(leaf.height()).unwrap_or(0));
596 }
597
598 if let Some(cert) = leaf.upgrade_certificate()
600 && leaf.upgrade_certificate() != existing_upgrade_cert_reader
601 {
602 if cert.data.decide_by < view_number {
603 tracing::warn!(
604 "Failed to decide an upgrade certificate in time. Ignoring."
605 );
606 } else {
607 tracing::info!("Reached decide on upgrade certificate: {cert:?}");
608 res.decided_upgrade_cert = Some(cert.clone());
609 }
610 }
611 if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
614 leaf.fill_block_payload_unchecked(payload.as_ref().payload.clone());
615 }
616
617 let vid_share = consensus_reader
620 .vid_shares()
621 .get(&leaf.view_number())
622 .and_then(|key_map| key_map.get(public_key))
623 .and_then(|epoch_map| epoch_map.get(&leaf.epoch(epoch_height)))
624 .map(|prop| prop.data.clone());
625
626 let state_cert = if leaf.with_epoch
627 && is_epoch_root(
628 leaf.block_header().block_number(),
629 consensus_reader.epoch_height,
630 ) {
631 match consensus_reader.state_cert() {
632 Some(state_cert)
634 if state_cert.light_client_state.view_number
635 == leaf.view_number().u64() =>
636 {
637 Some(state_cert.clone())
638 },
639 _ => None,
640 }
641 } else {
642 None
643 };
644
645 res.leaf_views.push(LeafInfo::new(
647 leaf.clone(),
648 Arc::clone(&state),
649 delta.clone(),
650 vid_share,
651 state_cert,
652 ));
653 if let Some(ref payload) = leaf.block_payload() {
654 res.included_txns = Some(
655 payload
656 .transaction_commitments(leaf.block_header().metadata())
657 .into_iter()
658 .collect::<HashSet<_>>(),
659 );
660 }
661 }
662 true
663 },
664 ) {
665 tracing::debug!("Leaf ascension failed; error={e}");
666 }
667
668 let epoch_height = consensus_reader.epoch_height;
669 drop(consensus_reader);
670
671 if with_epochs && res.new_decided_view_number.is_some() {
672 for decided_leaf_info in &res.leaf_views {
673 decide_epoch_root::<TYPES, I>(
674 &decided_leaf_info.leaf,
675 epoch_height,
676 membership,
677 storage,
678 &consensus,
679 )
680 .await;
681 }
682 }
683
684 res
685}
686
687#[instrument(skip_all)]
689#[allow(clippy::too_many_arguments)]
690pub(crate) async fn parent_leaf_and_state<TYPES: NodeType>(
691 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
692 event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
693 membership: EpochMembershipCoordinator<TYPES>,
694 public_key: TYPES::SignatureKey,
695 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
696 consensus: OuterConsensus<TYPES>,
697 upgrade_lock: &UpgradeLock<TYPES>,
698 parent_qc: &QuorumCertificate2<TYPES>,
699 epoch_height: u64,
700) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
701 let consensus_reader = consensus.read().await;
702 let vsm_contains_parent_view = consensus_reader
703 .validated_state_map()
704 .contains_key(&parent_qc.view_number());
705 drop(consensus_reader);
706
707 if !vsm_contains_parent_view {
708 let _ = fetch_proposal(
709 parent_qc,
710 event_sender.clone(),
711 event_receiver.clone(),
712 membership,
713 consensus.clone(),
714 public_key.clone(),
715 private_key.clone(),
716 upgrade_lock,
717 epoch_height,
718 )
719 .await
720 .context(info!("Failed to fetch proposal"))?;
721 }
722
723 let consensus_reader = consensus.read().await;
724 let parent_view = consensus_reader
725 .validated_state_map()
726 .get(&parent_qc.view_number())
727 .context(debug!(
728 "Couldn't find parent view in state map, waiting for replica to see proposal; \
729 parent_view_number: {}",
730 *parent_qc.view_number()
731 ))?;
732
733 let (leaf_commitment, state) = parent_view.leaf_and_state().context(info!(
734 "Parent of high QC points to a view without a proposal; parent_view_number: {}, \
735 parent_view {:?}",
736 *parent_qc.view_number(),
737 parent_view
738 ))?;
739
740 if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
741 tracing::debug!(
743 "They don't equal: {:?} {:?}",
744 leaf_commitment,
745 consensus_reader.high_qc().data().leaf_commit
746 );
747 }
748
749 let leaf = consensus_reader
750 .saved_leaves()
751 .get(&leaf_commitment)
752 .context(info!("Failed to find high QC of parent"))?;
753
754 Ok((leaf.clone(), Arc::clone(state)))
755}
756
757pub(crate) async fn update_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
758 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
759 validation_info: &ValidationInfo<TYPES, I>,
760) -> Result<()> {
761 let in_transition_epoch = proposal
762 .data
763 .justify_qc()
764 .data
765 .block_number
766 .is_some_and(|bn| {
767 !is_transition_block(bn, validation_info.epoch_height)
768 && is_epoch_transition(bn, validation_info.epoch_height)
769 && bn % validation_info.epoch_height != 0
770 });
771 let justify_qc = proposal.data.justify_qc();
772 let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc();
773 if !in_transition_epoch {
774 tracing::debug!(
775 "Storing high QC for view {:?} and height {:?}",
776 justify_qc.view_number(),
777 justify_qc.data.block_number
778 );
779 if let Err(e) = validation_info
780 .storage
781 .update_high_qc2(justify_qc.clone())
782 .await
783 {
784 bail!("Failed to store High QC, not voting; error = {e:?}");
785 }
786 if justify_qc
787 .data
788 .block_number
789 .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
790 {
791 let Some(state_cert) = proposal.data.state_cert() else {
792 bail!("Epoch root QC has no state cert, not voting!");
793 };
794 if let Err(e) = validation_info
795 .storage
796 .update_state_cert(state_cert.clone())
797 .await
798 {
799 bail!(
800 "Failed to store the light client state update certificate, not voting; error \
801 = {:?}",
802 e
803 );
804 }
805 validation_info
806 .consensus
807 .write()
808 .await
809 .update_state_cert(state_cert.clone())?;
810 }
811 if let Some(next_epoch_justify_qc) = maybe_next_epoch_justify_qc
812 && let Err(e) = validation_info
813 .storage
814 .update_next_epoch_high_qc2(next_epoch_justify_qc.clone())
815 .await
816 {
817 bail!("Failed to store next epoch High QC, not voting; error = {e:?}");
818 }
819 }
820 let mut consensus_writer = validation_info.consensus.write().await;
821 if let Some(next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
822 if justify_qc
823 .data
824 .block_number
825 .is_some_and(|bn| is_transition_block(bn, validation_info.epoch_height))
826 {
827 consensus_writer.reset_high_qc(justify_qc.clone(), next_epoch_justify_qc.clone())?;
828 consensus_writer
829 .update_transition_qc(justify_qc.clone(), next_epoch_justify_qc.clone());
830 return Ok(());
831 }
832 consensus_writer.update_next_epoch_high_qc(next_epoch_justify_qc.clone())?;
833 }
834 consensus_writer.update_high_qc(justify_qc.clone())?;
835
836 Ok(())
837}
838
839async fn transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
840 validation_info: &ValidationInfo<TYPES, I>,
841) -> Option<(
842 QuorumCertificate2<TYPES>,
843 NextEpochQuorumCertificate2<TYPES>,
844)> {
845 validation_info
846 .consensus
847 .read()
848 .await
849 .transition_qc()
850 .cloned()
851}
852
853pub(crate) async fn validate_epoch_transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
854 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
855 validation_info: &ValidationInfo<TYPES, I>,
856) -> Result<()> {
857 let proposed_qc = proposal.data.justify_qc();
858 let Some(qc_block_number) = proposed_qc.data().block_number else {
859 bail!("Justify QC has no block number");
860 };
861 if !is_epoch_transition(qc_block_number, validation_info.epoch_height)
862 || qc_block_number % validation_info.epoch_height == 0
863 {
864 return Ok(());
865 }
866 let Some(next_epoch_qc) = proposal.data.next_epoch_justify_qc() else {
867 bail!("Next epoch justify QC is not present");
868 };
869 ensure!(
870 next_epoch_qc.data.leaf_commit == proposed_qc.data().leaf_commit,
871 "Next epoch QC has different leaf commit to justify QC"
872 );
873
874 if is_transition_block(qc_block_number, validation_info.epoch_height) {
875 ensure!(
877 transition_qc(validation_info)
878 .await
879 .is_none_or(|(qc, _)| qc.view_number() <= proposed_qc.view_number()),
880 "Proposed transition qc must have view number greater than or equal to previous \
881 transition QC"
882 );
883
884 validation_info
885 .consensus
886 .write()
887 .await
888 .update_transition_qc(proposed_qc.clone(), next_epoch_qc.clone());
889 update_high_qc(proposal, validation_info).await?;
891 } else {
892 ensure!(
894 transition_qc(validation_info)
895 .await
896 .is_none_or(|(qc, _)| qc.view_number() < proposed_qc.view_number()),
897 "Transition block must have view number greater than previous transition QC"
898 );
899 ensure!(
900 proposal.data.view_change_evidence().is_none(),
901 "Second to last block and last block of epoch must directly extend previous block, Qc \
902 Block number: {qc_block_number}, Proposal Block number: {}",
903 proposal.data.block_header().block_number()
904 );
905 ensure!(
906 proposed_qc.view_number() + 1 == proposal.data.view_number()
907 || transition_qc(validation_info)
908 .await
909 .is_some_and(|(qc, _)| &qc == proposed_qc),
910 "Transition proposals must extend the previous view directly, or extend the previous \
911 transition block"
912 );
913 }
914 Ok(())
915}
916
917#[allow(clippy::too_many_lines)]
924#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
925pub(crate) async fn validate_proposal_safety_and_liveness<
926 TYPES: NodeType,
927 I: NodeImplementation<TYPES>,
928>(
929 proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
930 parent_leaf: Leaf2<TYPES>,
931 validation_info: &ValidationInfo<TYPES, I>,
932 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
933 sender: TYPES::SignatureKey,
934) -> Result<()> {
935 let view_number = proposal.data.view_number();
936
937 let mut valid_epoch_transition = false;
938 if validation_info
939 .upgrade_lock
940 .version(proposal.data.justify_qc().view_number())
941 .is_ok_and(|v| v >= EPOCH_VERSION)
942 {
943 let Some(block_number) = proposal.data.justify_qc().data.block_number else {
944 bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
945 };
946 if is_epoch_transition(block_number, validation_info.epoch_height) {
947 validate_epoch_transition_qc(&proposal, validation_info).await?;
948 valid_epoch_transition = true;
949 }
950 }
951
952 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
953 ensure!(
954 proposed_leaf.parent_commitment() == parent_leaf.commit(),
955 "Proposed leaf does not extend the parent leaf."
956 );
957 let proposal_epoch = option_epoch_from_block_number(
958 validation_info.upgrade_lock.epochs_enabled(view_number),
959 proposed_leaf.height(),
960 validation_info.epoch_height,
961 );
962
963 let state = Arc::new(
964 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
965 );
966
967 {
968 let mut consensus_writer = validation_info.consensus.write().await;
969 if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
970 tracing::trace!("{e:?}");
971 }
972
973 if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
976 tracing::debug!("Internal proposal update failed; error = {e:#}");
977 };
978 }
979
980 UpgradeCertificate::validate(
981 proposal.data.upgrade_certificate(),
982 &validation_info.membership,
983 proposal_epoch,
984 &validation_info.upgrade_lock,
985 )
986 .await?;
987
988 proposed_leaf.extends_upgrade(&parent_leaf, &validation_info.upgrade_lock)?;
990
991 let justify_qc = proposal.data.justify_qc().clone();
992 {
996 let consensus_reader = validation_info.consensus.read().await;
997 let justify_qc_epoch = option_epoch_from_block_number(
1002 validation_info.upgrade_lock.epochs_enabled(view_number),
1003 parent_leaf.height(),
1004 validation_info.epoch_height,
1005 );
1006 ensure!(
1007 proposal_epoch == justify_qc_epoch
1008 || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
1009 {
1010 error!(
1011 "Failed epoch safety check \n Proposed leaf is {proposed_leaf:?} \n justify \
1012 QC leaf is {parent_leaf:?}"
1013 )
1014 }
1015 );
1016
1017 if is_epoch_transition(parent_leaf.height(), validation_info.epoch_height)
1019 && validation_info.upgrade_lock.epochs_enabled(view_number)
1020 {
1021 ensure!(
1022 proposal.data.next_epoch_justify_qc().is_some(),
1023 "Epoch transition proposal does not include the next epoch justify QC. Do not \
1024 vote!"
1025 );
1026 }
1027
1028 let liveness_check =
1030 justify_qc.view_number() > consensus_reader.locked_view() || valid_epoch_transition;
1031
1032 let outcome = consensus_reader.visit_leaf_ancestors(
1035 justify_qc.view_number(),
1036 Terminator::Inclusive(consensus_reader.locked_view()),
1037 false,
1038 |leaf, _, _| {
1039 leaf.view_number() != consensus_reader.locked_view()
1042 },
1043 );
1044 let safety_check = outcome.is_ok();
1045
1046 ensure!(safety_check || liveness_check, {
1047 if let Err(e) = outcome {
1048 broadcast_event(
1049 Event {
1050 view_number,
1051 event: EventType::Error { error: Arc::new(e) },
1052 },
1053 &validation_info.output_event_stream,
1054 )
1055 .await;
1056 }
1057
1058 error!(
1059 "Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked \
1060 view is {:?}",
1061 consensus_reader.high_qc(),
1062 proposal.data,
1063 consensus_reader.locked_view()
1064 )
1065 });
1066 }
1067
1068 broadcast_event(
1070 Event {
1071 view_number,
1072 event: EventType::QuorumProposal {
1073 proposal: proposal.clone(),
1074 sender,
1075 },
1076 },
1077 &validation_info.output_event_stream,
1078 )
1079 .await;
1080
1081 broadcast_event(
1083 Arc::new(HotShotEvent::QuorumProposalValidated(
1084 proposal.clone(),
1085 parent_leaf,
1086 )),
1087 &event_stream,
1088 )
1089 .await;
1090
1091 Ok(())
1092}
1093
1094pub(crate) async fn validate_proposal_view_and_certs<
1101 TYPES: NodeType,
1102 I: NodeImplementation<TYPES>,
1103>(
1104 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1105 validation_info: &ValidationInfo<TYPES, I>,
1106) -> Result<()> {
1107 let view_number = proposal.data.view_number();
1108 ensure!(
1109 view_number + 1 >= validation_info.consensus.read().await.cur_view(),
1110 "Proposal is from an older view {:?}",
1111 proposal.data
1112 );
1113
1114 let mut membership = validation_info.membership.clone();
1116 proposal.validate_signature(&membership)?;
1117
1118 if proposal.data.justify_qc().view_number() != view_number - 1 {
1120 let received_proposal_cert =
1121 proposal
1122 .data
1123 .view_change_evidence()
1124 .clone()
1125 .context(debug!(
1126 "Quorum proposal for view {view_number} needed a timeout or view sync \
1127 certificate, but did not have one",
1128 ))?;
1129
1130 match received_proposal_cert {
1131 ViewChangeEvidence2::Timeout(timeout_cert) => {
1132 ensure!(
1133 timeout_cert.data().view == view_number - 1,
1134 "Timeout certificate for view {view_number} was not for the immediately \
1135 preceding view"
1136 );
1137 let timeout_cert_epoch = timeout_cert.data().epoch();
1138 membership = membership.get_new_epoch(timeout_cert_epoch)?;
1139
1140 let membership_stake_table =
1141 StakeTableEntries::from_iter(membership.stake_table()).0;
1142 let membership_success_threshold = membership.success_threshold();
1143
1144 timeout_cert
1145 .is_valid_cert(
1146 &membership_stake_table,
1147 membership_success_threshold,
1148 &validation_info.upgrade_lock,
1149 )
1150 .context(|e| {
1151 warn!("Timeout certificate for view {view_number} was invalid: {e}")
1152 })?;
1153 },
1154 ViewChangeEvidence2::ViewSync(view_sync_cert) => {
1155 ensure!(
1156 view_sync_cert.view_number == view_number,
1157 "View sync cert view number {:?} does not match proposal view number {:?}",
1158 view_sync_cert.view_number,
1159 view_number
1160 );
1161
1162 let view_sync_cert_epoch = view_sync_cert.data().epoch();
1163 membership = membership.get_new_epoch(view_sync_cert_epoch)?;
1164
1165 let membership_stake_table =
1166 StakeTableEntries::from_iter(membership.stake_table()).0;
1167 let membership_success_threshold = membership.success_threshold();
1168
1169 view_sync_cert
1171 .is_valid_cert(
1172 &membership_stake_table,
1173 membership_success_threshold,
1174 &validation_info.upgrade_lock,
1175 )
1176 .context(|e| warn!("Invalid view sync finalize cert provided: {e}"))?;
1177 },
1178 }
1179 }
1180
1181 {
1184 let epoch = option_epoch_from_block_number(
1185 proposal.data.epoch().is_some(),
1186 proposal.data.block_header().block_number(),
1187 validation_info.epoch_height,
1188 );
1189 UpgradeCertificate::validate(
1190 proposal.data.upgrade_certificate(),
1191 &validation_info.membership,
1192 epoch,
1193 &validation_info.upgrade_lock,
1194 )
1195 .await?;
1196 }
1197
1198 Ok(())
1199}
1200
1201pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
1203 match sender.broadcast_direct(event).await {
1204 Ok(None) => (),
1205 Ok(Some(overflowed)) => {
1206 tracing::error!(
1207 "Event sender queue overflow, Oldest event removed form queue: {overflowed:?}"
1208 );
1209 },
1210 Err(SendError(e)) => {
1211 tracing::warn!("Event: {e:?}\n Sending failed, event stream probably shutdown");
1212 },
1213 }
1214}
1215
1216pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType>(
1219 qc: &QuorumCertificate2<TYPES>,
1220 maybe_next_epoch_qc: Option<&NextEpochQuorumCertificate2<TYPES>>,
1221 consensus: &OuterConsensus<TYPES>,
1222 membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1223 upgrade_lock: &UpgradeLock<TYPES>,
1224 epoch_height: u64,
1225) -> Result<()> {
1226 let cert = CertificatePair::new(qc.clone(), maybe_next_epoch_qc.cloned());
1227
1228 let mut epoch_membership = membership_coordinator.stake_table_for_epoch(cert.epoch())?;
1229
1230 let membership_stake_table = StakeTableEntries::from_iter(epoch_membership.stake_table()).0;
1231 let membership_success_threshold = epoch_membership.success_threshold();
1232
1233 if let Err(e) = cert.qc().is_valid_cert(
1234 &membership_stake_table,
1235 membership_success_threshold,
1236 upgrade_lock,
1237 ) {
1238 consensus.read().await.metrics.invalid_qc.update(1);
1239 return Err(warn!("Invalid certificate: {e}"));
1240 }
1241
1242 if upgrade_lock.epochs_enabled(cert.view_number())
1244 && let Some(next_epoch_qc) = cert.verify_next_epoch_qc(epoch_height)?
1245 {
1246 epoch_membership = epoch_membership.next_epoch_stake_table()?;
1247 let membership_next_stake_table =
1248 StakeTableEntries::from_iter(epoch_membership.stake_table()).0;
1249 let membership_next_success_threshold = epoch_membership.success_threshold();
1250 next_epoch_qc
1251 .is_valid_cert(
1252 &membership_next_stake_table,
1253 membership_next_success_threshold,
1254 upgrade_lock,
1255 )
1256 .context(|e| warn!("Invalid next epoch certificate: {e}"))?;
1257 }
1258
1259 Ok(())
1260}
1261
1262pub async fn wait_for_second_vid_share<TYPES: NodeType>(
1266 target_epoch: Option<EpochNumber>,
1267 vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
1268 da_cert: &DaCertificate2<TYPES>,
1269 consensus: &OuterConsensus<TYPES>,
1270 receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
1271 cancel_receiver: Receiver<()>,
1272 id: u64,
1273) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
1274 tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
1275 let maybe_second_vid_share = consensus
1276 .read()
1277 .await
1278 .vid_shares()
1279 .get(&vid_share.data.view_number())
1280 .and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
1281 .and_then(|epoch_map| epoch_map.get(&target_epoch))
1282 .cloned();
1283 if let Some(second_vid_share) = maybe_second_vid_share
1284 && ((target_epoch == da_cert.epoch()
1285 && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
1286 || (target_epoch != da_cert.epoch()
1287 && Some(second_vid_share.data.payload_commitment())
1288 == da_cert.data().next_epoch_payload_commit))
1289 {
1290 return Ok(second_vid_share);
1291 }
1292
1293 let receiver = receiver.clone();
1294 let da_cert_clone = da_cert.clone();
1295 let Some(event) = EventDependency::new(
1296 receiver,
1297 cancel_receiver,
1298 format!(
1299 "VoteDependency Second VID share for view {:?}, my id {:?}",
1300 vid_share.data.view_number(),
1301 id
1302 ),
1303 Box::new(move |event| {
1304 let event = event.as_ref();
1305 if let HotShotEvent::VidShareValidated(second_vid_share) = event {
1306 if target_epoch == da_cert_clone.epoch() {
1307 second_vid_share.data.payload_commitment()
1308 == da_cert_clone.data().payload_commit
1309 } else {
1310 Some(second_vid_share.data.payload_commitment())
1311 == da_cert_clone.data().next_epoch_payload_commit
1312 }
1313 } else {
1314 false
1315 }
1316 }),
1317 )
1318 .completed()
1319 .await
1320 else {
1321 return Err(warn!("Error while waiting for the second VID share."));
1322 };
1323 let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
1324 return Err(warn!(
1326 "Received event is not VidShareValidated but we checked it earlier. Shouldn't be \
1327 possible."
1328 ));
1329 };
1330 Ok(second_vid_share.clone())
1331}
1332
1333pub async fn broadcast_view_change<TYPES: NodeType>(
1334 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
1335 new_view_number: ViewNumber,
1336 epoch: Option<EpochNumber>,
1337 first_epoch: Option<(ViewNumber, EpochNumber)>,
1338) {
1339 let mut broadcast_epoch = epoch;
1340 if let Some((first_epoch_view, first_epoch)) = first_epoch
1341 && new_view_number == first_epoch_view
1342 && broadcast_epoch != Some(first_epoch)
1343 {
1344 broadcast_epoch = Some(first_epoch);
1345 }
1346 tracing::trace!("Sending ViewChange for view {new_view_number} and epoch {broadcast_epoch:?}");
1347 broadcast_event(
1348 Arc::new(HotShotEvent::ViewChange(new_view_number, broadcast_epoch)),
1349 sender,
1350 )
1351 .await
1352}