1use std::{
8 collections::{HashMap, HashSet},
9 sync::Arc,
10 time::Instant,
11};
12
13use alloy::{
14 primitives::{FixedBytes, U256},
15 sol_types::SolValue,
16};
17use ark_ff::PrimeField;
18use async_broadcast::{Receiver, SendError, Sender};
19use async_lock::RwLock;
20use committable::{Commitment, Committable};
21use hotshot_contract_adapter::sol_types::{LightClientStateSol, StakeTableStateSol};
22use hotshot_task::dependency::{Dependency, EventDependency};
23use hotshot_types::{
24 consensus::OuterConsensus,
25 data::{
26 EpochNumber, Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewChangeEvidence2,
27 ViewNumber,
28 },
29 drb::DrbResult,
30 epoch_membership::EpochMembershipCoordinator,
31 event::{Event, EventType, LeafInfo},
32 light_client::{CircuitField, LightClientState, StakeTableState},
33 message::{Proposal, UpgradeLock},
34 request_response::ProposalRequestPayload,
35 simple_certificate::{
36 CertificatePair, DaCertificate2, LightClientStateUpdateCertificateV2,
37 NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate,
38 },
39 simple_vote::HasEpoch,
40 stake_table::StakeTableEntries,
41 traits::{
42 BlockPayload, ValidatedState,
43 block_contents::BlockHeader,
44 election::Membership,
45 node_implementation::{NodeImplementation, NodeType},
46 signature_key::{
47 LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StakeTableEntryType,
48 },
49 storage::Storage,
50 },
51 utils::{
52 Terminator, View, ViewInner, epoch_from_block_number, is_epoch_root, is_epoch_transition,
53 is_transition_block, option_epoch_from_block_number,
54 },
55 vote::{Certificate, HasViewNumber},
56};
57use hotshot_utils::anytrace::*;
58use time::OffsetDateTime;
59use tokio::time::timeout;
60use tracing::instrument;
61use versions::EPOCH_VERSION;
62
63use crate::{events::HotShotEvent, quorum_proposal_recv::ValidationInfo, request::REQUEST_TIMEOUT};
64
65#[instrument(skip_all)]
67#[allow(clippy::too_many_arguments)]
68pub(crate) async fn fetch_proposal<TYPES: NodeType>(
69 qc: &QuorumCertificate2<TYPES>,
70 event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
71 event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
72 membership_coordinator: EpochMembershipCoordinator<TYPES>,
73 consensus: OuterConsensus<TYPES>,
74 sender_public_key: TYPES::SignatureKey,
75 sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
76 upgrade_lock: &UpgradeLock<TYPES>,
77 epoch_height: u64,
78) -> Result<(Leaf2<TYPES>, View<TYPES>)> {
79 let view_number = qc.view_number();
80 let leaf_commit = qc.data.leaf_commit;
81 let signed_proposal_request = ProposalRequestPayload {
84 view_number,
85 key: sender_public_key,
86 };
87
88 let signature = TYPES::SignatureKey::sign(
90 &sender_private_key,
91 signed_proposal_request.commit().as_ref(),
92 )
93 .wrap()
94 .context(error!("Failed to sign proposal. This should never happen."))?;
95
96 tracing::info!("Sending proposal request for view {view_number}");
97
98 broadcast_event(
100 HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
101 &event_sender,
102 )
103 .await;
104
105 let mut rx = event_receiver.clone();
106 let Ok(Some(proposal)) =
108 timeout(REQUEST_TIMEOUT, async move {
110 while let Ok(event) = rx.recv_direct().await {
112 if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event.as_ref() {
113 let leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
114 if leaf.view_number() == view_number && leaf.commit() == leaf_commit {
115 return Some(quorum_proposal.clone());
116 }
117 }
118 }
119 None
120 })
121 .await
122 else {
123 bail!("Request for proposal failed");
124 };
125
126 let view_number = proposal.data.view_number();
127 let justify_qc = proposal.data.justify_qc().clone();
128
129 let justify_qc_epoch = justify_qc.data.epoch();
130
131 let epoch_membership = membership_coordinator
132 .stake_table_for_epoch(justify_qc_epoch)
133 .await?;
134 let membership_stake_table = epoch_membership.stake_table().await;
135 let membership_success_threshold = epoch_membership.success_threshold().await;
136
137 justify_qc
138 .is_valid_cert(
139 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
140 membership_success_threshold,
141 upgrade_lock,
142 )
143 .context(|e| warn!("Invalid justify_qc in proposal for view {view_number}: {e}"))?;
144
145 let mut consensus_writer = consensus.write().await;
146 let leaf = Leaf2::from_quorum_proposal(&proposal.data);
147 let state = Arc::new(
148 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
149 );
150
151 if let Err(e) = consensus_writer.update_leaf(leaf.clone(), Arc::clone(&state), None) {
152 tracing::trace!("{e:?}");
153 }
154 let view = View {
155 view_inner: ViewInner::Leaf {
156 leaf: leaf.commit(),
157 state,
158 delta: None,
159 epoch: leaf.epoch(epoch_height),
160 },
161 };
162 Ok((leaf, view))
163}
164pub async fn handle_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
165 membership: &Arc<RwLock<TYPES::Membership>>,
166 epoch: EpochNumber,
167 storage: &I::Storage,
168 drb_result: DrbResult,
169) {
170 tracing::debug!("Calling store_drb_result for epoch {epoch}");
171 if let Err(e) = storage.store_drb_result(epoch, drb_result).await {
172 tracing::error!("Failed to store drb result for epoch {epoch}: {e}");
173 }
174
175 membership.write().await.add_drb_result(epoch, drb_result)
176}
177
178pub(crate) async fn verify_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
185 proposal: &QuorumProposalWrapper<TYPES>,
186 validation_info: &ValidationInfo<TYPES, I>,
187) -> Result<()> {
188 if validation_info.epoch_height == 0
190 || !is_epoch_transition(
191 proposal.block_header().block_number(),
192 validation_info.epoch_height,
193 )
194 {
195 tracing::debug!("Skipping DRB result verification");
196 return Ok(());
197 }
198
199 let epoch = option_epoch_from_block_number(
203 validation_info
204 .upgrade_lock
205 .epochs_enabled(proposal.view_number()),
206 proposal.block_header().block_number(),
207 validation_info.epoch_height,
208 );
209
210 let proposal_result = proposal
211 .next_drb_result()
212 .context(info!("Proposal is missing the next epoch's DRB result."))?;
213
214 if let Some(epoch_val) = epoch {
215 let current_epoch_membership = validation_info
216 .membership
217 .coordinator
218 .stake_table_for_epoch(epoch)
219 .await
220 .context(warn!("No stake table for epoch {}", epoch_val))?;
221
222 let has_stake_current_epoch = current_epoch_membership
223 .has_stake(&validation_info.public_key)
224 .await;
225
226 if has_stake_current_epoch {
227 let computed_result = current_epoch_membership
228 .next_epoch()
229 .await
230 .context(warn!("No stake table for epoch {}", epoch_val + 1))?
231 .get_epoch_drb()
232 .await
233 .clone()
234 .context(warn!("DRB result not found"))?;
235
236 ensure!(
237 proposal_result == computed_result,
238 warn!(
239 "Our calculated DRB result is {computed_result:?}, which does not match the \
240 proposed DRB result of {proposal_result:?}"
241 )
242 );
243 }
244
245 Ok(())
246 } else {
247 Err(error!("Epochs are not available"))
248 }
249}
250
251async fn decide_epoch_root<TYPES: NodeType, I: NodeImplementation<TYPES>>(
253 decided_leaf: &Leaf2<TYPES>,
254 epoch_height: u64,
255 membership: &EpochMembershipCoordinator<TYPES>,
256 storage: &I::Storage,
257 consensus: &OuterConsensus<TYPES>,
258) {
259 let decided_leaf = decided_leaf.clone();
260 let decided_block_number = decided_leaf.block_header().block_number();
261
262 if epoch_height != 0 && is_epoch_root(decided_block_number, epoch_height) {
264 let next_epoch_number =
265 EpochNumber::new(epoch_from_block_number(decided_block_number, epoch_height) + 2);
266
267 let start = Instant::now();
268 if let Err(e) = storage
269 .store_epoch_root(next_epoch_number, decided_leaf.block_header().clone())
270 .await
271 {
272 tracing::error!("Failed to store epoch root for epoch {next_epoch_number}: {e}");
273 }
274 tracing::info!("Time taken to store epoch root: {:?}", start.elapsed());
275
276 let membership = membership.clone();
277 let decided_block_header = decided_leaf.block_header().clone();
278 let storage = storage.clone();
279 let consensus = consensus.clone();
280
281 let consensus_reader = consensus.read().await;
282
283 drop(consensus_reader);
284
285 tokio::spawn(async move {
286 let membership_clone = membership.clone();
287
288 {
290 let start = Instant::now();
291 if let Err(e) = Membership::add_epoch_root(
292 Arc::clone(membership_clone.membership()),
293 decided_block_header,
294 )
295 .await
296 {
297 tracing::error!("Failed to add epoch root for epoch {next_epoch_number}: {e}");
298 }
299 tracing::info!("Time taken to add epoch root: {:?}", start.elapsed());
300 }
301
302 let membership_clone = membership.clone();
303
304 let drb_result = membership_clone
305 .compute_drb_result(next_epoch_number, decided_leaf.clone())
306 .await;
307
308 let drb_result = match drb_result {
309 Ok(result) => result,
310 Err(e) => {
311 tracing::error!("Failed to compute DRB result from decide: {e}");
312 return;
313 },
314 };
315
316 let start = Instant::now();
317 handle_drb_result::<TYPES, I>(
318 membership.membership(),
319 next_epoch_number,
320 &storage,
321 drb_result,
322 )
323 .await;
324 tracing::info!("Time taken to handle drb result: {:?}", start.elapsed());
325 });
326 }
327}
328
329#[derive(Debug)]
331pub struct LeafChainTraversalOutcome<TYPES: NodeType> {
332 pub new_locked_view_number: Option<ViewNumber>,
334
335 pub new_decided_view_number: Option<ViewNumber>,
337
338 pub committing_qc: Option<CertificatePair<TYPES>>,
342
343 pub deciding_qc: Option<CertificatePair<TYPES>>,
348
349 pub leaf_views: Vec<LeafInfo<TYPES>>,
351
352 pub included_txns: Option<HashSet<Commitment<<TYPES as NodeType>::Transaction>>>,
354
355 pub decided_upgrade_cert: Option<UpgradeCertificate<TYPES>>,
357}
358
359impl<TYPES: NodeType + Default> Default for LeafChainTraversalOutcome<TYPES> {
363 fn default() -> Self {
365 Self {
366 new_locked_view_number: None,
367 new_decided_view_number: None,
368 committing_qc: None,
369 deciding_qc: None,
370 leaf_views: Vec::new(),
371 included_txns: None,
372 decided_upgrade_cert: None,
373 }
374 }
375}
376
377async fn update_metrics<TYPES: NodeType>(
378 consensus: &OuterConsensus<TYPES>,
379 leaf_views: &[LeafInfo<TYPES>],
380) {
381 let consensus_reader = consensus.read().await;
382 let now = OffsetDateTime::now_utc().unix_timestamp() as u64;
383
384 for leaf_view in leaf_views {
385 let proposal_timestamp = leaf_view.leaf.block_header().timestamp();
386
387 let Some(proposal_to_decide_time) = now.checked_sub(proposal_timestamp) else {
388 tracing::error!("Failed to calculate proposal to decide time: {proposal_timestamp}");
389 continue;
390 };
391 consensus_reader
392 .metrics
393 .proposal_to_decide_time
394 .add_point(proposal_to_decide_time as f64);
395 if let Some(txn_bytes) = leaf_view.leaf.block_payload().map(|p| p.txn_bytes()) {
396 consensus_reader
397 .metrics
398 .finalized_bytes
399 .add_point(txn_bytes as f64);
400 }
401 }
402}
403
404#[allow(clippy::too_many_arguments)]
410pub async fn decide_from_proposal_2<TYPES: NodeType, I: NodeImplementation<TYPES>>(
411 proposal: &QuorumProposalWrapper<TYPES>,
412 consensus: OuterConsensus<TYPES>,
413 upgrade_lock: &UpgradeLock<TYPES>,
414 public_key: &TYPES::SignatureKey,
415 with_epochs: bool,
416 membership: &EpochMembershipCoordinator<TYPES>,
417 storage: &I::Storage,
418) -> LeafChainTraversalOutcome<TYPES> {
419 let mut res = LeafChainTraversalOutcome::default();
420 let consensus_reader = consensus.read().await;
421 let proposed_leaf = Leaf2::from_quorum_proposal(proposal);
422 res.new_locked_view_number = Some(proposed_leaf.justify_qc().view_number());
423
424 let Some(parent_info) = consensus_reader
426 .parent_leaf_info(&proposed_leaf, public_key)
427 .await
428 else {
429 return res;
430 };
431 let Some(grand_parent_info) = consensus_reader
434 .parent_leaf_info(&parent_info.leaf, public_key)
435 .await
436 else {
437 return res;
438 };
439 if grand_parent_info.leaf.view_number() + 1 != parent_info.leaf.view_number() {
440 return res;
441 }
442 res.committing_qc = Some(CertificatePair::for_parent(&parent_info.leaf));
443 res.deciding_qc = Some(CertificatePair::for_parent(&proposed_leaf));
444 let decided_view_number = grand_parent_info.leaf.view_number();
445 res.new_decided_view_number = Some(decided_view_number);
446 let old_anchor_view = consensus_reader.last_decided_view();
448 let mut current_leaf_info = Some(grand_parent_info);
449 let existing_upgrade_cert_reader = upgrade_lock.decided_upgrade_cert();
450 let mut txns = HashSet::new();
451 while current_leaf_info
452 .as_ref()
453 .is_some_and(|info| info.leaf.view_number() > old_anchor_view)
454 {
455 let info = &mut current_leaf_info.unwrap();
457 if let Some(cert) = info.leaf.upgrade_certificate()
459 && info.leaf.upgrade_certificate() != existing_upgrade_cert_reader
460 {
461 if cert.data.decide_by < decided_view_number {
462 tracing::warn!("Failed to decide an upgrade certificate in time. Ignoring.");
463 } else {
464 tracing::info!("Reached decide on upgrade certificate: {cert:?}");
465 res.decided_upgrade_cert = Some(cert.clone());
466 }
467 }
468
469 if let Some(payload) = consensus_reader
472 .saved_payloads()
473 .get(&info.leaf.view_number())
474 {
475 info.leaf
476 .fill_block_payload_unchecked(payload.as_ref().payload.clone());
477 }
478
479 if let Some(ref payload) = info.leaf.block_payload() {
480 for txn in payload.transaction_commitments(info.leaf.block_header().metadata()) {
481 txns.insert(txn);
482 }
483 }
484
485 current_leaf_info = consensus_reader
486 .parent_leaf_info(&info.leaf, public_key)
487 .await;
488 res.leaf_views.push(info.clone());
489 }
490
491 if !txns.is_empty() {
492 res.included_txns = Some(txns);
493 }
494
495 if with_epochs && res.new_decided_view_number.is_some() {
496 let Some(first_leaf) = res.leaf_views.first() else {
497 return res;
498 };
499 let epoch_height = consensus_reader.epoch_height;
500 consensus_reader
501 .metrics
502 .last_synced_block_height
503 .set(usize::try_from(first_leaf.leaf.height()).unwrap_or(0));
504 drop(consensus_reader);
505
506 for decided_leaf_info in &res.leaf_views {
507 decide_epoch_root::<TYPES, I>(
508 &decided_leaf_info.leaf,
509 epoch_height,
510 membership,
511 storage,
512 &consensus,
513 )
514 .await;
515 }
516 update_metrics(&consensus, &res.leaf_views).await;
517 }
518
519 res
520}
521
522#[allow(clippy::too_many_arguments)]
554pub async fn decide_from_proposal<TYPES: NodeType, I: NodeImplementation<TYPES>>(
555 proposal: &QuorumProposalWrapper<TYPES>,
556 consensus: OuterConsensus<TYPES>,
557 upgrade_lock: &UpgradeLock<TYPES>,
558 public_key: &TYPES::SignatureKey,
559 with_epochs: bool,
560 membership: &EpochMembershipCoordinator<TYPES>,
561 storage: &I::Storage,
562 epoch_height: u64,
563) -> LeafChainTraversalOutcome<TYPES> {
564 let consensus_reader = consensus.read().await;
565 let existing_upgrade_cert_reader = upgrade_lock.decided_upgrade_cert();
566 let view_number = proposal.view_number();
567 let parent_view_number = proposal.justify_qc().view_number();
568 let old_anchor_view = consensus_reader.last_decided_view();
569
570 let mut last_view_number_visited = view_number;
571 let mut current_chain_length = 0usize;
572 let mut res = LeafChainTraversalOutcome::default();
573
574 if let Err(e) = consensus_reader.visit_leaf_ancestors(
575 parent_view_number,
576 Terminator::Exclusive(old_anchor_view),
577 true,
578 |leaf, state, delta| {
579 if res.new_decided_view_number.is_none() {
581 if last_view_number_visited == leaf.view_number() + 1 {
583 last_view_number_visited = leaf.view_number();
584
585 current_chain_length += 1;
587
588 if current_chain_length == 2 {
590 res.new_locked_view_number = Some(leaf.view_number());
591 res.committing_qc = Some(CertificatePair::for_parent(leaf));
594 } else if current_chain_length == 3 {
595 res.new_decided_view_number = Some(leaf.view_number());
597 }
598 } else {
599 return false;
602 }
603 }
604
605 if let Some(new_decided_view) = res.new_decided_view_number {
607 let mut leaf = leaf.clone();
609
610 if leaf.view_number() == new_decided_view {
612 consensus_reader
613 .metrics
614 .last_synced_block_height
615 .set(usize::try_from(leaf.height()).unwrap_or(0));
616 }
617
618 if let Some(cert) = leaf.upgrade_certificate()
620 && leaf.upgrade_certificate() != existing_upgrade_cert_reader
621 {
622 if cert.data.decide_by < view_number {
623 tracing::warn!(
624 "Failed to decide an upgrade certificate in time. Ignoring."
625 );
626 } else {
627 tracing::info!("Reached decide on upgrade certificate: {cert:?}");
628 res.decided_upgrade_cert = Some(cert.clone());
629 }
630 }
631 if let Some(payload) = consensus_reader.saved_payloads().get(&leaf.view_number()) {
634 leaf.fill_block_payload_unchecked(payload.as_ref().payload.clone());
635 }
636
637 let vid_share = consensus_reader
640 .vid_shares()
641 .get(&leaf.view_number())
642 .and_then(|key_map| key_map.get(public_key))
643 .and_then(|epoch_map| epoch_map.get(&leaf.epoch(epoch_height)))
644 .map(|prop| prop.data.clone());
645
646 let state_cert = if leaf.with_epoch
647 && is_epoch_root(
648 leaf.block_header().block_number(),
649 consensus_reader.epoch_height,
650 ) {
651 match consensus_reader.state_cert() {
652 Some(state_cert)
654 if state_cert.light_client_state.view_number
655 == leaf.view_number().u64() =>
656 {
657 Some(state_cert.clone())
658 },
659 _ => None,
660 }
661 } else {
662 None
663 };
664
665 res.leaf_views.push(LeafInfo::new(
667 leaf.clone(),
668 Arc::clone(&state),
669 delta.clone(),
670 vid_share,
671 state_cert,
672 ));
673 if let Some(ref payload) = leaf.block_payload() {
674 res.included_txns = Some(
675 payload
676 .transaction_commitments(leaf.block_header().metadata())
677 .into_iter()
678 .collect::<HashSet<_>>(),
679 );
680 }
681 }
682 true
683 },
684 ) {
685 tracing::debug!("Leaf ascension failed; error={e}");
686 }
687
688 let epoch_height = consensus_reader.epoch_height;
689 drop(consensus_reader);
690
691 if with_epochs && res.new_decided_view_number.is_some() {
692 for decided_leaf_info in &res.leaf_views {
693 decide_epoch_root::<TYPES, I>(
694 &decided_leaf_info.leaf,
695 epoch_height,
696 membership,
697 storage,
698 &consensus,
699 )
700 .await;
701 }
702 }
703
704 res
705}
706
707#[instrument(skip_all)]
709#[allow(clippy::too_many_arguments)]
710pub(crate) async fn parent_leaf_and_state<TYPES: NodeType>(
711 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
712 event_receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
713 membership: EpochMembershipCoordinator<TYPES>,
714 public_key: TYPES::SignatureKey,
715 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
716 consensus: OuterConsensus<TYPES>,
717 upgrade_lock: &UpgradeLock<TYPES>,
718 parent_qc: &QuorumCertificate2<TYPES>,
719 epoch_height: u64,
720) -> Result<(Leaf2<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
721 let consensus_reader = consensus.read().await;
722 let vsm_contains_parent_view = consensus_reader
723 .validated_state_map()
724 .contains_key(&parent_qc.view_number());
725 drop(consensus_reader);
726
727 if !vsm_contains_parent_view {
728 let _ = fetch_proposal(
729 parent_qc,
730 event_sender.clone(),
731 event_receiver.clone(),
732 membership,
733 consensus.clone(),
734 public_key.clone(),
735 private_key.clone(),
736 upgrade_lock,
737 epoch_height,
738 )
739 .await
740 .context(info!("Failed to fetch proposal"))?;
741 }
742
743 let consensus_reader = consensus.read().await;
744 let parent_view = consensus_reader
745 .validated_state_map()
746 .get(&parent_qc.view_number())
747 .context(debug!(
748 "Couldn't find parent view in state map, waiting for replica to see proposal; \
749 parent_view_number: {}",
750 *parent_qc.view_number()
751 ))?;
752
753 let (leaf_commitment, state) = parent_view.leaf_and_state().context(info!(
754 "Parent of high QC points to a view without a proposal; parent_view_number: {}, \
755 parent_view {:?}",
756 *parent_qc.view_number(),
757 parent_view
758 ))?;
759
760 if leaf_commitment != consensus_reader.high_qc().data().leaf_commit {
761 tracing::debug!(
763 "They don't equal: {:?} {:?}",
764 leaf_commitment,
765 consensus_reader.high_qc().data().leaf_commit
766 );
767 }
768
769 let leaf = consensus_reader
770 .saved_leaves()
771 .get(&leaf_commitment)
772 .context(info!("Failed to find high QC of parent"))?;
773
774 Ok((leaf.clone(), Arc::clone(state)))
775}
776
777pub(crate) async fn update_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
778 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
779 validation_info: &ValidationInfo<TYPES, I>,
780) -> Result<()> {
781 let in_transition_epoch = proposal
782 .data
783 .justify_qc()
784 .data
785 .block_number
786 .is_some_and(|bn| {
787 !is_transition_block(bn, validation_info.epoch_height)
788 && is_epoch_transition(bn, validation_info.epoch_height)
789 && bn % validation_info.epoch_height != 0
790 });
791 let justify_qc = proposal.data.justify_qc();
792 let maybe_next_epoch_justify_qc = proposal.data.next_epoch_justify_qc();
793 if !in_transition_epoch {
794 tracing::debug!(
795 "Storing high QC for view {:?} and height {:?}",
796 justify_qc.view_number(),
797 justify_qc.data.block_number
798 );
799 if let Err(e) = validation_info
800 .storage
801 .update_high_qc2(justify_qc.clone())
802 .await
803 {
804 bail!("Failed to store High QC, not voting; error = {e:?}");
805 }
806 if justify_qc
807 .data
808 .block_number
809 .is_some_and(|bn| is_epoch_root(bn, validation_info.epoch_height))
810 {
811 let Some(state_cert) = proposal.data.state_cert() else {
812 bail!("Epoch root QC has no state cert, not voting!");
813 };
814 if let Err(e) = validation_info
815 .storage
816 .update_state_cert(state_cert.clone())
817 .await
818 {
819 bail!(
820 "Failed to store the light client state update certificate, not voting; error \
821 = {:?}",
822 e
823 );
824 }
825 validation_info
826 .consensus
827 .write()
828 .await
829 .update_state_cert(state_cert.clone())?;
830 }
831 if let Some(next_epoch_justify_qc) = maybe_next_epoch_justify_qc
832 && let Err(e) = validation_info
833 .storage
834 .update_next_epoch_high_qc2(next_epoch_justify_qc.clone())
835 .await
836 {
837 bail!("Failed to store next epoch High QC, not voting; error = {e:?}");
838 }
839 }
840 let mut consensus_writer = validation_info.consensus.write().await;
841 if let Some(next_epoch_justify_qc) = maybe_next_epoch_justify_qc {
842 if justify_qc
843 .data
844 .block_number
845 .is_some_and(|bn| is_transition_block(bn, validation_info.epoch_height))
846 {
847 consensus_writer.reset_high_qc(justify_qc.clone(), next_epoch_justify_qc.clone())?;
848 consensus_writer
849 .update_transition_qc(justify_qc.clone(), next_epoch_justify_qc.clone());
850 return Ok(());
851 }
852 consensus_writer.update_next_epoch_high_qc(next_epoch_justify_qc.clone())?;
853 }
854 consensus_writer.update_high_qc(justify_qc.clone())?;
855
856 Ok(())
857}
858
859async fn transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
860 validation_info: &ValidationInfo<TYPES, I>,
861) -> Option<(
862 QuorumCertificate2<TYPES>,
863 NextEpochQuorumCertificate2<TYPES>,
864)> {
865 validation_info
866 .consensus
867 .read()
868 .await
869 .transition_qc()
870 .cloned()
871}
872
873pub(crate) async fn validate_epoch_transition_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
874 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
875 validation_info: &ValidationInfo<TYPES, I>,
876) -> Result<()> {
877 let proposed_qc = proposal.data.justify_qc();
878 let Some(qc_block_number) = proposed_qc.data().block_number else {
879 bail!("Justify QC has no block number");
880 };
881 if !is_epoch_transition(qc_block_number, validation_info.epoch_height)
882 || qc_block_number % validation_info.epoch_height == 0
883 {
884 return Ok(());
885 }
886 let Some(next_epoch_qc) = proposal.data.next_epoch_justify_qc() else {
887 bail!("Next epoch justify QC is not present");
888 };
889 ensure!(
890 next_epoch_qc.data.leaf_commit == proposed_qc.data().leaf_commit,
891 "Next epoch QC has different leaf commit to justify QC"
892 );
893
894 if is_transition_block(qc_block_number, validation_info.epoch_height) {
895 ensure!(
897 transition_qc(validation_info)
898 .await
899 .is_none_or(|(qc, _)| qc.view_number() <= proposed_qc.view_number()),
900 "Proposed transition qc must have view number greater than or equal to previous \
901 transition QC"
902 );
903
904 validation_info
905 .consensus
906 .write()
907 .await
908 .update_transition_qc(proposed_qc.clone(), next_epoch_qc.clone());
909 update_high_qc(proposal, validation_info).await?;
911 } else {
912 ensure!(
914 transition_qc(validation_info)
915 .await
916 .is_none_or(|(qc, _)| qc.view_number() < proposed_qc.view_number()),
917 "Transition block must have view number greater than previous transition QC"
918 );
919 ensure!(
920 proposal.data.view_change_evidence().is_none(),
921 "Second to last block and last block of epoch must directly extend previous block, Qc \
922 Block number: {qc_block_number}, Proposal Block number: {}",
923 proposal.data.block_header().block_number()
924 );
925 ensure!(
926 proposed_qc.view_number() + 1 == proposal.data.view_number()
927 || transition_qc(validation_info)
928 .await
929 .is_some_and(|(qc, _)| &qc == proposed_qc),
930 "Transition proposals must extend the previous view directly, or extend the previous \
931 transition block"
932 );
933 }
934 Ok(())
935}
936
937#[allow(clippy::too_many_lines)]
944#[instrument(skip_all, fields(id = validation_info.id, view = *proposal.data.view_number()))]
945pub(crate) async fn validate_proposal_safety_and_liveness<
946 TYPES: NodeType,
947 I: NodeImplementation<TYPES>,
948>(
949 proposal: Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
950 parent_leaf: Leaf2<TYPES>,
951 validation_info: &ValidationInfo<TYPES, I>,
952 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
953 sender: TYPES::SignatureKey,
954) -> Result<()> {
955 let view_number = proposal.data.view_number();
956
957 let mut valid_epoch_transition = false;
958 if validation_info
959 .upgrade_lock
960 .version(proposal.data.justify_qc().view_number())
961 .is_ok_and(|v| v >= EPOCH_VERSION)
962 {
963 let Some(block_number) = proposal.data.justify_qc().data.block_number else {
964 bail!("Quorum Proposal has no block number but it's after the epoch upgrade");
965 };
966 if is_epoch_transition(block_number, validation_info.epoch_height) {
967 validate_epoch_transition_qc(&proposal, validation_info).await?;
968 valid_epoch_transition = true;
969 }
970 }
971
972 let proposed_leaf = Leaf2::from_quorum_proposal(&proposal.data);
973 ensure!(
974 proposed_leaf.parent_commitment() == parent_leaf.commit(),
975 "Proposed leaf does not extend the parent leaf."
976 );
977 let proposal_epoch = option_epoch_from_block_number(
978 validation_info.upgrade_lock.epochs_enabled(view_number),
979 proposed_leaf.height(),
980 validation_info.epoch_height,
981 );
982
983 let state = Arc::new(
984 <TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(proposal.data.block_header()),
985 );
986
987 {
988 let mut consensus_writer = validation_info.consensus.write().await;
989 if let Err(e) = consensus_writer.update_leaf(proposed_leaf.clone(), state, None) {
990 tracing::trace!("{e:?}");
991 }
992
993 if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
996 tracing::debug!("Internal proposal update failed; error = {e:#}");
997 };
998 }
999
1000 UpgradeCertificate::validate(
1001 proposal.data.upgrade_certificate(),
1002 &validation_info.membership,
1003 proposal_epoch,
1004 &validation_info.upgrade_lock,
1005 )
1006 .await?;
1007
1008 proposed_leaf.extends_upgrade(&parent_leaf, &validation_info.upgrade_lock)?;
1010
1011 let justify_qc = proposal.data.justify_qc().clone();
1012 {
1016 let consensus_reader = validation_info.consensus.read().await;
1017 let justify_qc_epoch = option_epoch_from_block_number(
1022 validation_info.upgrade_lock.epochs_enabled(view_number),
1023 parent_leaf.height(),
1024 validation_info.epoch_height,
1025 );
1026 ensure!(
1027 proposal_epoch == justify_qc_epoch
1028 || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
1029 {
1030 error!(
1031 "Failed epoch safety check \n Proposed leaf is {proposed_leaf:?} \n justify \
1032 QC leaf is {parent_leaf:?}"
1033 )
1034 }
1035 );
1036
1037 if is_epoch_transition(parent_leaf.height(), validation_info.epoch_height)
1039 && validation_info.upgrade_lock.epochs_enabled(view_number)
1040 {
1041 ensure!(
1042 proposal.data.next_epoch_justify_qc().is_some(),
1043 "Epoch transition proposal does not include the next epoch justify QC. Do not \
1044 vote!"
1045 );
1046 }
1047
1048 let liveness_check =
1050 justify_qc.view_number() > consensus_reader.locked_view() || valid_epoch_transition;
1051
1052 let outcome = consensus_reader.visit_leaf_ancestors(
1055 justify_qc.view_number(),
1056 Terminator::Inclusive(consensus_reader.locked_view()),
1057 false,
1058 |leaf, _, _| {
1059 leaf.view_number() != consensus_reader.locked_view()
1062 },
1063 );
1064 let safety_check = outcome.is_ok();
1065
1066 ensure!(safety_check || liveness_check, {
1067 if let Err(e) = outcome {
1068 broadcast_event(
1069 Event {
1070 view_number,
1071 event: EventType::Error { error: Arc::new(e) },
1072 },
1073 &validation_info.output_event_stream,
1074 )
1075 .await;
1076 }
1077
1078 error!(
1079 "Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked \
1080 view is {:?}",
1081 consensus_reader.high_qc(),
1082 proposal.data,
1083 consensus_reader.locked_view()
1084 )
1085 });
1086 }
1087
1088 broadcast_event(
1090 Event {
1091 view_number,
1092 event: EventType::QuorumProposal {
1093 proposal: proposal.clone(),
1094 sender,
1095 },
1096 },
1097 &validation_info.output_event_stream,
1098 )
1099 .await;
1100
1101 broadcast_event(
1103 Arc::new(HotShotEvent::QuorumProposalValidated(
1104 proposal.clone(),
1105 parent_leaf,
1106 )),
1107 &event_stream,
1108 )
1109 .await;
1110
1111 Ok(())
1112}
1113
1114pub(crate) async fn validate_proposal_view_and_certs<
1121 TYPES: NodeType,
1122 I: NodeImplementation<TYPES>,
1123>(
1124 proposal: &Proposal<TYPES, QuorumProposalWrapper<TYPES>>,
1125 validation_info: &ValidationInfo<TYPES, I>,
1126) -> Result<()> {
1127 let view_number = proposal.data.view_number();
1128 ensure!(
1129 view_number + 1 >= validation_info.consensus.read().await.cur_view(),
1130 "Proposal is from an older view {:?}",
1131 proposal.data
1132 );
1133
1134 let mut membership = validation_info.membership.clone();
1136 proposal.validate_signature(&membership).await?;
1137
1138 if proposal.data.justify_qc().view_number() != view_number - 1 {
1140 let received_proposal_cert =
1141 proposal
1142 .data
1143 .view_change_evidence()
1144 .clone()
1145 .context(debug!(
1146 "Quorum proposal for view {view_number} needed a timeout or view sync \
1147 certificate, but did not have one",
1148 ))?;
1149
1150 match received_proposal_cert {
1151 ViewChangeEvidence2::Timeout(timeout_cert) => {
1152 ensure!(
1153 timeout_cert.data().view == view_number - 1,
1154 "Timeout certificate for view {view_number} was not for the immediately \
1155 preceding view"
1156 );
1157 let timeout_cert_epoch = timeout_cert.data().epoch();
1158 membership = membership.get_new_epoch(timeout_cert_epoch).await?;
1159
1160 let membership_stake_table = membership.stake_table().await;
1161 let membership_success_threshold = membership.success_threshold().await;
1162
1163 timeout_cert
1164 .is_valid_cert(
1165 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1166 membership_success_threshold,
1167 &validation_info.upgrade_lock,
1168 )
1169 .context(|e| {
1170 warn!("Timeout certificate for view {view_number} was invalid: {e}")
1171 })?;
1172 },
1173 ViewChangeEvidence2::ViewSync(view_sync_cert) => {
1174 ensure!(
1175 view_sync_cert.view_number == view_number,
1176 "View sync cert view number {:?} does not match proposal view number {:?}",
1177 view_sync_cert.view_number,
1178 view_number
1179 );
1180
1181 let view_sync_cert_epoch = view_sync_cert.data().epoch();
1182 membership = membership.get_new_epoch(view_sync_cert_epoch).await?;
1183
1184 let membership_stake_table = membership.stake_table().await;
1185 let membership_success_threshold = membership.success_threshold().await;
1186
1187 view_sync_cert
1189 .is_valid_cert(
1190 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1191 membership_success_threshold,
1192 &validation_info.upgrade_lock,
1193 )
1194 .context(|e| warn!("Invalid view sync finalize cert provided: {e}"))?;
1195 },
1196 }
1197 }
1198
1199 {
1202 let epoch = option_epoch_from_block_number(
1203 proposal.data.epoch().is_some(),
1204 proposal.data.block_header().block_number(),
1205 validation_info.epoch_height,
1206 );
1207 UpgradeCertificate::validate(
1208 proposal.data.upgrade_certificate(),
1209 &validation_info.membership,
1210 epoch,
1211 &validation_info.upgrade_lock,
1212 )
1213 .await?;
1214 }
1215
1216 Ok(())
1217}
1218
1219pub async fn broadcast_event<E: Clone + std::fmt::Debug>(event: E, sender: &Sender<E>) {
1221 match sender.broadcast_direct(event).await {
1222 Ok(None) => (),
1223 Ok(Some(overflowed)) => {
1224 tracing::error!(
1225 "Event sender queue overflow, Oldest event removed form queue: {overflowed:?}"
1226 );
1227 },
1228 Err(SendError(e)) => {
1229 tracing::warn!("Event: {e:?}\n Sending failed, event stream probably shutdown");
1230 },
1231 }
1232}
1233
1234pub async fn validate_qc_and_next_epoch_qc<TYPES: NodeType>(
1237 qc: &QuorumCertificate2<TYPES>,
1238 maybe_next_epoch_qc: Option<&NextEpochQuorumCertificate2<TYPES>>,
1239 consensus: &OuterConsensus<TYPES>,
1240 membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1241 upgrade_lock: &UpgradeLock<TYPES>,
1242 epoch_height: u64,
1243) -> Result<()> {
1244 let cert = CertificatePair::new(qc.clone(), maybe_next_epoch_qc.cloned());
1245
1246 let mut epoch_membership = membership_coordinator
1247 .stake_table_for_epoch(cert.epoch())
1248 .await?;
1249
1250 let membership_stake_table = epoch_membership.stake_table().await;
1251 let membership_success_threshold = epoch_membership.success_threshold().await;
1252
1253 if let Err(e) = cert.qc().is_valid_cert(
1254 &StakeTableEntries::<TYPES>::from(membership_stake_table).0,
1255 membership_success_threshold,
1256 upgrade_lock,
1257 ) {
1258 consensus.read().await.metrics.invalid_qc.update(1);
1259 return Err(warn!("Invalid certificate: {e}"));
1260 }
1261
1262 if upgrade_lock.epochs_enabled(cert.view_number())
1264 && let Some(next_epoch_qc) = cert.verify_next_epoch_qc(epoch_height)?
1265 {
1266 epoch_membership = epoch_membership.next_epoch_stake_table().await?;
1267 let membership_next_stake_table = epoch_membership.stake_table().await;
1268 let membership_next_success_threshold = epoch_membership.success_threshold().await;
1269 next_epoch_qc
1270 .is_valid_cert(
1271 &StakeTableEntries::<TYPES>::from(membership_next_stake_table).0,
1272 membership_next_success_threshold,
1273 upgrade_lock,
1274 )
1275 .context(|e| warn!("Invalid next epoch certificate: {e}"))?;
1276 }
1277
1278 Ok(())
1279}
1280
1281pub async fn validate_light_client_state_update_certificate<TYPES: NodeType>(
1283 state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1284 membership_coordinator: &EpochMembershipCoordinator<TYPES>,
1285 upgrade_lock: &UpgradeLock<TYPES>,
1286) -> Result<()> {
1287 tracing::debug!("Validating light client state update certificate");
1288
1289 let epoch_membership = membership_coordinator
1290 .membership_for_epoch(state_cert.epoch())
1291 .await?;
1292
1293 let membership_stake_table = epoch_membership.stake_table().await;
1294 let membership_success_threshold = epoch_membership.success_threshold().await;
1295
1296 let mut state_key_map = HashMap::new();
1297 membership_stake_table.into_iter().for_each(|config| {
1298 state_key_map.insert(
1299 config.state_ver_key.clone(),
1300 config.stake_table_entry.stake(),
1301 );
1302 });
1303
1304 let mut accumulated_stake = U256::from(0);
1305 let signed_state_digest = derive_signed_state_digest(
1306 &state_cert.light_client_state,
1307 &state_cert.next_stake_table_state,
1308 &state_cert.auth_root,
1309 );
1310 for (key, sig, sig_v2) in state_cert.signatures.iter() {
1311 if let Some(stake) = state_key_map.get(key) {
1312 accumulated_stake += *stake;
1313 #[allow(clippy::collapsible_else_if)]
1314 if !upgrade_lock
1316 .proposal2_version(ViewNumber::new(state_cert.light_client_state.view_number))
1317 {
1318 if !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1319 key,
1320 sig_v2,
1321 &state_cert.light_client_state,
1322 &state_cert.next_stake_table_state,
1323 ) {
1324 bail!("Invalid light client state update certificate signature");
1325 }
1326 } else {
1327 if !<TYPES::StateSignatureKey as LCV3StateSignatureKey>::verify_state_sig(
1328 key,
1329 sig,
1330 signed_state_digest,
1331 ) || !<TYPES::StateSignatureKey as LCV2StateSignatureKey>::verify_state_sig(
1332 key,
1333 sig_v2,
1334 &state_cert.light_client_state,
1335 &state_cert.next_stake_table_state,
1336 ) {
1337 bail!("Invalid light client state update certificate signature");
1338 }
1339 }
1340 } else {
1341 bail!("Invalid light client state update certificate signature");
1342 }
1343 }
1344 if accumulated_stake < membership_success_threshold {
1345 bail!("Light client state update certificate does not meet the success threshold");
1346 }
1347
1348 Ok(())
1349}
1350
1351pub(crate) fn check_qc_state_cert_correspondence<TYPES: NodeType>(
1352 qc: &QuorumCertificate2<TYPES>,
1353 state_cert: &LightClientStateUpdateCertificateV2<TYPES>,
1354 epoch_height: u64,
1355) -> bool {
1356 qc.data
1357 .block_number
1358 .is_some_and(|bn| is_epoch_root(bn, epoch_height))
1359 && Some(state_cert.epoch) == qc.data.epoch()
1360 && qc.view_number().u64() == state_cert.light_client_state.view_number
1361}
1362
1363pub async fn wait_for_second_vid_share<TYPES: NodeType>(
1367 target_epoch: Option<EpochNumber>,
1368 vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
1369 da_cert: &DaCertificate2<TYPES>,
1370 consensus: &OuterConsensus<TYPES>,
1371 receiver: &Receiver<Arc<HotShotEvent<TYPES>>>,
1372 cancel_receiver: Receiver<()>,
1373 id: u64,
1374) -> Result<Proposal<TYPES, VidDisperseShare<TYPES>>> {
1375 tracing::debug!("getting the second VID share for epoch {:?}", target_epoch);
1376 let maybe_second_vid_share = consensus
1377 .read()
1378 .await
1379 .vid_shares()
1380 .get(&vid_share.data.view_number())
1381 .and_then(|key_map| key_map.get(vid_share.data.recipient_key()))
1382 .and_then(|epoch_map| epoch_map.get(&target_epoch))
1383 .cloned();
1384 if let Some(second_vid_share) = maybe_second_vid_share
1385 && ((target_epoch == da_cert.epoch()
1386 && second_vid_share.data.payload_commitment() == da_cert.data().payload_commit)
1387 || (target_epoch != da_cert.epoch()
1388 && Some(second_vid_share.data.payload_commitment())
1389 == da_cert.data().next_epoch_payload_commit))
1390 {
1391 return Ok(second_vid_share);
1392 }
1393
1394 let receiver = receiver.clone();
1395 let da_cert_clone = da_cert.clone();
1396 let Some(event) = EventDependency::new(
1397 receiver,
1398 cancel_receiver,
1399 format!(
1400 "VoteDependency Second VID share for view {:?}, my id {:?}",
1401 vid_share.data.view_number(),
1402 id
1403 ),
1404 Box::new(move |event| {
1405 let event = event.as_ref();
1406 if let HotShotEvent::VidShareValidated(second_vid_share) = event {
1407 if target_epoch == da_cert_clone.epoch() {
1408 second_vid_share.data.payload_commitment()
1409 == da_cert_clone.data().payload_commit
1410 } else {
1411 Some(second_vid_share.data.payload_commitment())
1412 == da_cert_clone.data().next_epoch_payload_commit
1413 }
1414 } else {
1415 false
1416 }
1417 }),
1418 )
1419 .completed()
1420 .await
1421 else {
1422 return Err(warn!("Error while waiting for the second VID share."));
1423 };
1424 let HotShotEvent::VidShareValidated(second_vid_share) = event.as_ref() else {
1425 return Err(warn!(
1427 "Received event is not VidShareValidated but we checked it earlier. Shouldn't be \
1428 possible."
1429 ));
1430 };
1431 Ok(second_vid_share.clone())
1432}
1433
1434pub async fn broadcast_view_change<TYPES: NodeType>(
1435 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
1436 new_view_number: ViewNumber,
1437 epoch: Option<EpochNumber>,
1438 first_epoch: Option<(ViewNumber, EpochNumber)>,
1439) {
1440 let mut broadcast_epoch = epoch;
1441 if let Some((first_epoch_view, first_epoch)) = first_epoch
1442 && new_view_number == first_epoch_view
1443 && broadcast_epoch != Some(first_epoch)
1444 {
1445 broadcast_epoch = Some(first_epoch);
1446 }
1447 tracing::trace!("Sending ViewChange for view {new_view_number} and epoch {broadcast_epoch:?}");
1448 broadcast_event(
1449 Arc::new(HotShotEvent::ViewChange(new_view_number, broadcast_epoch)),
1450 sender,
1451 )
1452 .await
1453}
1454
1455pub fn derive_signed_state_digest(
1456 lc_state: &LightClientState,
1457 next_stake_state: &StakeTableState,
1458 auth_root: &FixedBytes<32>,
1459) -> CircuitField {
1460 let lc_state_sol: LightClientStateSol = (*lc_state).into();
1461 let stake_st_sol: StakeTableStateSol = (*next_stake_state).into();
1462
1463 let res = alloy::primitives::keccak256(
1464 (
1465 lc_state_sol.abi_encode(),
1466 stake_st_sol.abi_encode(),
1467 auth_root.abi_encode(),
1468 )
1469 .abi_encode_packed(),
1470 );
1471 CircuitField::from_be_bytes_mod_order(res.as_ref())
1472}