1use std::{sync::Arc, time::Instant};
8
9use alloy::primitives::U256;
10use async_broadcast::{InactiveReceiver, Sender};
11use chrono::Utc;
12use committable::Committable;
13use hotshot_contract_adapter::light_client::derive_signed_state_digest;
14use hotshot_types::{
15 consensus::OuterConsensus,
16 data::{EpochNumber, Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewNumber},
17 drb::INITIAL_DRB_RESULT,
18 epoch_membership::{EpochMembership, EpochMembershipCoordinator},
19 event::{Event, EventType},
20 message::{Proposal, UpgradeLock},
21 simple_vote::{
22 EpochRootQuorumVote2, HasEpoch, LightClientStateUpdateVote2, QuorumData2, QuorumVote2,
23 },
24 stake_table::HSStakeTable,
25 storage_metrics::StorageMetricsValue,
26 traits::{
27 ValidatedState,
28 block_contents::BlockHeader,
29 election::Membership,
30 node_implementation::{NodeImplementation, NodeType},
31 signature_key::{
32 LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StateSignatureKey,
33 },
34 storage::Storage,
35 },
36 utils::{epoch_from_block_number, is_epoch_transition, is_last_block, is_transition_block},
37 vote::HasViewNumber,
38};
39use hotshot_utils::anytrace::*;
40use tracing::instrument;
41use versions::EPOCH_VERSION;
42
43use super::QuorumVoteTaskState;
44use crate::{
45 events::HotShotEvent,
46 helpers::{
47 LeafChainTraversalOutcome, broadcast_event, decide_from_proposal, decide_from_proposal_2,
48 fetch_proposal, handle_drb_result,
49 },
50};
51
52async fn store_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
54 task_state: &mut QuorumVoteTaskState<TYPES, I>,
55 decided_leaf: &Leaf2<TYPES>,
56) -> Result<()> {
57 if task_state.epoch_height == 0 {
58 tracing::info!("Epoch height is 0, skipping DRB storage.");
59 return Ok(());
60 }
61
62 let decided_block_number = decided_leaf.block_header().block_number();
63 let current_epoch_number = EpochNumber::new(epoch_from_block_number(
64 decided_block_number,
65 task_state.epoch_height,
66 ));
67 if is_transition_block(decided_block_number, task_state.epoch_height) {
69 if let Some(result) = decided_leaf.next_drb_result {
70 handle_drb_result::<TYPES, I>(
73 task_state.membership.membership(),
74 current_epoch_number + 1,
75 &task_state.storage,
76 result,
77 )
78 .await;
79 } else {
80 bail!("The last block of the epoch is decided but doesn't contain a DRB result.");
81 }
82 }
83 Ok(())
84}
85
86#[instrument(skip_all, fields(id = task_state.id, view = *proposal.view_number()))]
88pub(crate) async fn handle_quorum_proposal_validated<
89 TYPES: NodeType,
90 I: NodeImplementation<TYPES>,
91>(
92 proposal: &QuorumProposalWrapper<TYPES>,
93 task_state: &mut QuorumVoteTaskState<TYPES, I>,
94 event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
95) -> Result<()> {
96 let version = task_state.upgrade_lock.version(proposal.view_number())?;
97
98 let LeafChainTraversalOutcome {
99 new_locked_view_number,
100 new_decided_view_number,
101 committing_qc,
102 deciding_qc,
103 leaf_views,
104 included_txns,
105 decided_upgrade_cert,
106 } = if version >= EPOCH_VERSION {
107 if !is_last_block(
110 proposal.block_header().block_number(),
111 task_state.epoch_height,
112 ) {
113 decide_from_proposal_2::<TYPES, I>(
114 proposal,
115 OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
116 &task_state.upgrade_lock,
117 &task_state.public_key,
118 version >= EPOCH_VERSION,
119 &task_state.membership,
120 &task_state.storage,
121 )
122 .await
123 } else {
124 LeafChainTraversalOutcome::default()
125 }
126 } else {
127 decide_from_proposal::<TYPES, I>(
128 proposal,
129 OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
130 &task_state.upgrade_lock,
131 &task_state.public_key,
132 version >= EPOCH_VERSION,
133 &task_state.membership,
134 &task_state.storage,
135 task_state.epoch_height,
136 )
137 .await
138 };
139
140 if let (Some(cert), Some(_)) = (decided_upgrade_cert.clone(), new_decided_view_number) {
141 task_state
142 .upgrade_lock
143 .set_decided_upgrade_cert(cert.clone());
144 if cert.data.new_version >= EPOCH_VERSION
145 && task_state.upgrade_lock.upgrade().base < EPOCH_VERSION
146 {
147 let epoch_height = task_state.consensus.read().await.epoch_height;
148 let first_epoch_number = EpochNumber::new(epoch_from_block_number(
149 proposal.block_header().block_number(),
150 epoch_height,
151 ));
152
153 tracing::debug!("Calling set_first_epoch for epoch {first_epoch_number:?}");
154 task_state
155 .membership
156 .membership()
157 .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
158
159 broadcast_event(
160 Arc::new(HotShotEvent::SetFirstEpoch(
161 cert.data.new_version_first_view,
162 first_epoch_number,
163 )),
164 event_sender,
165 )
166 .await;
167 }
168
169 for da_committee in &task_state.da_committees {
170 if cert.data.new_version >= da_committee.start_version {
171 task_state.membership.membership().add_da_committee(
172 da_committee.start_epoch.into(),
173 da_committee.committee.clone(),
174 );
175 }
176 }
177
178 let _ = task_state
179 .storage
180 .update_decided_upgrade_certificate(Some(cert.clone()))
181 .await;
182 }
183
184 let mut consensus_writer = task_state.consensus.write().await;
185 if let Some(locked_view_number) = new_locked_view_number {
186 let _ = consensus_writer.update_locked_view(locked_view_number);
187 }
188
189 #[allow(clippy::cast_precision_loss)]
190 if let Some(decided_view_number) = new_decided_view_number {
191 let old_decided_view = consensus_writer.last_decided_view();
194 consensus_writer.collect_garbage(old_decided_view, decided_view_number);
195
196 consensus_writer
198 .update_last_decided_view(decided_view_number)
199 .context(|e| {
200 warn!("`update_last_decided_view` failed; this should never happen. Error: {e}")
201 })?;
202
203 consensus_writer
204 .metrics
205 .last_decided_time
206 .set(Utc::now().timestamp().try_into().unwrap());
207 consensus_writer.metrics.invalid_qc.set(0);
208 consensus_writer
209 .metrics
210 .last_decided_view
211 .set(usize::try_from(consensus_writer.last_decided_view().u64()).unwrap());
212 let cur_number_of_views_per_decide_event =
213 *proposal.view_number() - consensus_writer.last_decided_view().u64();
214 consensus_writer
215 .metrics
216 .number_of_views_per_decide_event
217 .add_point(cur_number_of_views_per_decide_event as f64);
218 for leaf in leaf_views.iter().rev() {
219 let qc_epoch = leaf.leaf.justify_qc().epoch();
220 if qc_epoch > Some(consensus_writer.current_proposal_participation_epoch())
221 && let Some(e) = qc_epoch
222 {
223 consensus_writer.update_validator_participation_epoch(e);
224 }
225 if qc_epoch > consensus_writer.current_vote_participation_epoch() {
226 let (stake_table, success_threshold) = if let Ok(epoch_membership) =
227 task_state.membership.stake_table_for_epoch(qc_epoch)
228 {
229 (
230 HSStakeTable::from_iter(epoch_membership.stake_table()),
231 epoch_membership.success_threshold(),
232 )
233 } else {
234 tracing::warn!(
235 "Failed to get stake table for epoch {:?} while updating vote \
236 participation",
237 qc_epoch
238 );
239 (HSStakeTable::default(), U256::MAX)
240 };
241 consensus_writer
242 .update_vote_participation_epoch(stake_table, success_threshold, qc_epoch)
243 .context(warn!("Updating vote participation"))?;
244 }
245 if let Err(e) = consensus_writer.update_vote_participation(leaf.leaf.justify_qc()) {
246 tracing::warn!("Failed to update vote participation: {e}");
247 }
248 }
249
250 drop(consensus_writer);
252
253 for leaf_info in &leaf_views {
254 tracing::info!(
255 "Sending decide for view {:?} at height {:?}",
256 leaf_info.leaf.view_number(),
257 leaf_info.leaf.block_header().block_number(),
258 );
259 }
260
261 broadcast_event(
262 Arc::new(HotShotEvent::LeavesDecided(
263 leaf_views
264 .iter()
265 .map(|leaf_info| leaf_info.leaf.clone())
266 .collect(),
267 )),
268 event_sender,
269 )
270 .await;
271
272 let committing_qc = Arc::new(committing_qc.unwrap());
275 broadcast_event(
276 Event {
277 view_number: decided_view_number,
278 event: EventType::Decide {
279 leaf_chain: Arc::new(leaf_views.clone()),
280 committing_qc: committing_qc.clone(),
281 deciding_qc: deciding_qc.map(Arc::new),
282 block_size: included_txns.map(|txns| txns.len().try_into().unwrap()),
283 },
284 },
285 &task_state.output_event_stream,
286 )
287 .await;
288
289 tracing::debug!(
290 "Successfully sent decide event, leaf views: {:?}, leaf views len: {:?}, qc view: {:?}",
291 decided_view_number,
292 leaf_views.len(),
293 committing_qc.view_number()
294 );
295
296 if version >= EPOCH_VERSION {
297 for leaf_view in leaf_views {
298 store_drb_result(task_state, &leaf_view.leaf).await?;
299 }
300 }
301 }
302
303 Ok(())
304}
305
306#[instrument(skip_all, target = "VoteDependencyHandle", fields(view = *view_number))]
308#[allow(clippy::too_many_arguments)]
309pub(crate) async fn update_shared_state<TYPES: NodeType>(
310 consensus: OuterConsensus<TYPES>,
311 sender: Sender<Arc<HotShotEvent<TYPES>>>,
312 receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
313 membership: EpochMembershipCoordinator<TYPES>,
314 public_key: TYPES::SignatureKey,
315 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
316 upgrade_lock: UpgradeLock<TYPES>,
317 view_number: ViewNumber,
318 instance_state: Arc<TYPES::InstanceState>,
319 proposed_leaf: &Leaf2<TYPES>,
320 vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
321 parent_view_number: Option<ViewNumber>,
322 epoch_height: u64,
323) -> Result<()> {
324 let justify_qc = &proposed_leaf.justify_qc();
325
326 let consensus_reader = consensus.read().await;
327 let mut maybe_validated_view = parent_view_number.and_then(|view_number| {
330 consensus_reader
331 .validated_state_map()
332 .get(&view_number)
333 .cloned()
334 });
335
336 let mut maybe_parent = consensus_reader
338 .saved_leaves()
339 .get(&justify_qc.data.leaf_commit)
340 .cloned();
341
342 drop(consensus_reader);
343
344 maybe_parent = match maybe_parent {
345 Some(p) => Some(p),
346 None => {
347 match fetch_proposal(
348 justify_qc,
349 sender.clone(),
350 receiver.activate_cloned(),
351 membership.clone(),
352 OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
353 public_key.clone(),
354 private_key.clone(),
355 &upgrade_lock,
356 epoch_height,
357 )
358 .await
359 .ok()
360 {
361 Some((leaf, view)) => {
362 maybe_validated_view = Some(view);
363 Some(leaf)
364 },
365 None => None,
366 }
367 },
368 };
369
370 let parent = maybe_parent.context(info!(
371 "Proposal's parent missing from storage with commitment: {:?}, proposal view {}",
372 justify_qc.data.leaf_commit,
373 proposed_leaf.view_number(),
374 ))?;
375
376 let Some(validated_view) = maybe_validated_view else {
377 bail!("Failed to fetch view for parent, parent view {parent_view_number:?}");
378 };
379
380 let (Some(parent_state), _) = validated_view.state_and_delta() else {
381 bail!("Parent state not found! Consensus internally inconsistent");
382 };
383
384 let version = upgrade_lock.version(view_number)?;
385
386 let now = Instant::now();
387 let (validated_state, state_delta) = parent_state
388 .validate_and_apply_header(
389 &instance_state,
390 &parent,
391 &proposed_leaf.block_header().clone(),
392 vid_share.data.payload_byte_len(),
393 version,
394 *view_number,
395 )
396 .await
397 .wrap()
398 .context(warn!("Block header doesn't extend the proposal!"))?;
399 let validation_duration = now.elapsed();
400 tracing::debug!("Validation time: {validation_duration:?}");
401
402 let now = Instant::now();
403 let mut consensus_writer = consensus.write().await;
405
406 if let Err(e) = consensus_writer.update_leaf(
407 proposed_leaf.clone(),
408 Arc::new(validated_state),
409 Some(Arc::new(state_delta)),
410 ) {
411 tracing::trace!("{e:?}");
412 }
413 let update_leaf_duration = now.elapsed();
414
415 consensus_writer
416 .metrics
417 .validate_and_apply_header_duration
418 .add_point(validation_duration.as_secs_f64());
419 consensus_writer
420 .metrics
421 .update_leaf_duration
422 .add_point(update_leaf_duration.as_secs_f64());
423 drop(consensus_writer);
424 tracing::debug!("update_leaf time: {update_leaf_duration:?}");
425
426 Ok(())
427}
428
429#[instrument(skip_all, fields(name = "Submit quorum vote", level = "error"))]
431#[allow(clippy::too_many_arguments)]
432pub(crate) async fn submit_vote<TYPES: NodeType, I: NodeImplementation<TYPES>>(
433 sender: Sender<Arc<HotShotEvent<TYPES>>>,
434 membership: EpochMembership<TYPES>,
435 public_key: TYPES::SignatureKey,
436 private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
437 upgrade_lock: UpgradeLock<TYPES>,
438 view_number: ViewNumber,
439 storage: I::Storage,
440 storage_metrics: Arc<StorageMetricsValue>,
441 leaf: Leaf2<TYPES>,
442 vid_share: Proposal<TYPES, VidDisperseShare<TYPES>>,
443 extended_vote: bool,
444 epoch_root_vote: bool,
445 epoch_height: u64,
446 state_private_key: &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
447 stake_table_capacity: usize,
448) -> Result<()> {
449 let committee_member_in_current_epoch = membership.has_stake(&public_key);
450 let committee_member_in_next_epoch = leaf.with_epoch
453 && is_epoch_transition(leaf.height(), epoch_height)
454 && membership.next_epoch_stake_table()?.has_stake(&public_key);
455
456 ensure!(
457 committee_member_in_current_epoch || committee_member_in_next_epoch,
458 info!("We were not chosen for quorum committee on {view_number}")
459 );
460
461 let height = if membership.epoch().is_some() {
462 Some(leaf.height())
463 } else {
464 None
465 };
466
467 let vote = QuorumVote2::<TYPES>::create_signed_vote(
469 QuorumData2 {
470 leaf_commit: leaf.commit(),
471 epoch: membership.epoch(),
472 block_number: height,
473 },
474 view_number,
475 &public_key,
476 &private_key,
477 &upgrade_lock,
478 )
479 .wrap()
480 .context(error!("Failed to sign vote. This should never happen."))?;
481 let now = Instant::now();
482 storage
484 .append_vid(&vid_share)
485 .await
486 .wrap()
487 .context(error!("Failed to store VID share"))?;
488 let append_vid_duration = now.elapsed();
489 storage_metrics
490 .append_vid_duration
491 .add_point(append_vid_duration.as_secs_f64());
492 tracing::debug!("append_vid_general time: {append_vid_duration:?}");
493
494 let epoch_enabled = upgrade_lock.epochs_enabled(view_number);
497 if extended_vote && epoch_enabled {
498 tracing::debug!("sending extended vote to everybody",);
499 broadcast_event(
500 Arc::new(HotShotEvent::ExtendedQuorumVoteSend(vote)),
501 &sender,
502 )
503 .await;
504 } else if epoch_root_vote && epoch_enabled {
505 tracing::debug!(
506 "sending epoch root vote to next quorum leader {:?}",
507 vote.view_number() + 1
508 );
509 let light_client_state = leaf
510 .block_header()
511 .get_light_client_state(view_number)
512 .wrap()
513 .context(error!("Failed to generate light client state"))?;
514 let next_stake_table =
515 HSStakeTable::from_iter(membership.next_epoch_stake_table()?.stake_table());
516 let next_stake_table_state = next_stake_table
517 .commitment(stake_table_capacity)
518 .wrap()
519 .context(error!("Failed to compute stake table commitment"))?;
520 let v2_signature = <TYPES::StateSignatureKey as LCV2StateSignatureKey>::sign_state(
522 state_private_key,
523 &light_client_state,
524 &next_stake_table_state,
525 )
526 .wrap()
527 .context(error!("Failed to sign the light client state"))?;
528 let auth_root = leaf
529 .block_header()
530 .auth_root()
531 .wrap()
532 .context(error!(format!(
533 "Failed to get auth root for light client state certificate. view={view_number}"
534 )))?;
535 let signed_state_digest =
536 derive_signed_state_digest(&light_client_state, &next_stake_table_state, &auth_root);
537 let signature = <TYPES::StateSignatureKey as LCV3StateSignatureKey>::sign_state(
538 state_private_key,
539 signed_state_digest,
540 )
541 .wrap()
542 .context(error!("Failed to sign the light client state"))?;
543 let state_vote = LightClientStateUpdateVote2 {
544 epoch: EpochNumber::new(epoch_from_block_number(leaf.height(), epoch_height)),
545 light_client_state,
546 next_stake_table_state,
547 signature,
548 v2_signature,
549 auth_root,
550 signed_state_digest,
551 };
552 broadcast_event(
553 Arc::new(HotShotEvent::EpochRootQuorumVoteSend(
554 EpochRootQuorumVote2 { vote, state_vote },
555 )),
556 &sender,
557 )
558 .await;
559 } else {
560 tracing::debug!(
561 "sending vote to next quorum leader {:?}",
562 vote.view_number() + 1
563 );
564 broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await;
565 }
566
567 Ok(())
568}