1use std::{sync::Arc, time::Duration};
8
9use async_broadcast::Sender;
10use chrono::Utc;
11use hotshot_types::{
12 data::{EpochNumber, ViewNumber},
13 event::{Event, EventType},
14 simple_certificate::{EpochRootQuorumCertificateV2, check_qc_state_cert_correspondence},
15 simple_vote::{EpochRootQuorumVote2, HasEpoch, QuorumVote2, TimeoutData2, TimeoutVote2},
16 traits::node_implementation::{NodeImplementation, NodeType},
17 utils::{EpochTransitionIndicator, is_epoch_root, is_epoch_transition, is_last_block},
18 vote::{HasViewNumber, Vote},
19};
20use hotshot_utils::anytrace::*;
21use tokio::{spawn, time::sleep};
22use tracing::instrument;
23use versions::EPOCH_VERSION;
24
25use super::ConsensusTaskState;
26use crate::{
27 events::HotShotEvent,
28 helpers::broadcast_event,
29 vote_collection::{handle_epoch_root_vote, handle_vote},
30};
31
32pub(crate) async fn handle_quorum_vote_recv<TYPES: NodeType, I: NodeImplementation<TYPES>>(
34 vote: &QuorumVote2<TYPES>,
35 event: Arc<HotShotEvent<TYPES>>,
36 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
37 task_state: &mut ConsensusTaskState<TYPES, I>,
38) -> Result<()> {
39 let in_transition = task_state
40 .consensus
41 .read()
42 .await
43 .is_high_qc_for_epoch_transition();
44 let epoch_membership = task_state
45 .membership_coordinator
46 .membership_for_epoch(vote.data.epoch)
47 .context(warn!("No stake table for epoch"))?;
48
49 let we_are_leader = epoch_membership.leader(vote.view_number() + 1)? == task_state.public_key;
50 ensure!(
51 in_transition || we_are_leader,
52 info!(
53 "We are not the leader for view {} and we are not in the epoch transition",
54 vote.view_number() + 1
55 )
56 );
57
58 let transition_indicator = if in_transition {
59 EpochTransitionIndicator::InTransition
60 } else {
61 EpochTransitionIndicator::NotInTransition
62 };
63 handle_vote(
64 &mut task_state.vote_collectors,
65 vote,
66 task_state.public_key.clone(),
67 &epoch_membership,
68 task_state.id,
69 &event,
70 sender,
71 &task_state.upgrade_lock,
72 transition_indicator.clone(),
73 )
74 .await?;
75
76 if vote.epoch().is_some()
77 && vote
78 .data
79 .block_number
80 .is_some_and(|b| is_epoch_transition(b, task_state.epoch_height))
81 {
82 let has_stake = epoch_membership
84 .next_epoch_stake_table()?
85 .has_stake(&vote.signing_key());
86 if has_stake {
87 handle_vote(
88 &mut task_state.next_epoch_vote_collectors,
89 &vote.clone().into(),
90 task_state.public_key.clone(),
91 &epoch_membership.next_epoch()?.clone(),
96 task_state.id,
97 &event,
98 sender,
99 &task_state.upgrade_lock,
100 transition_indicator,
101 )
102 .await?;
103 }
104 }
105
106 Ok(())
107}
108
109pub(crate) async fn handle_epoch_root_quorum_vote_recv<
111 TYPES: NodeType,
112 I: NodeImplementation<TYPES>,
113>(
114 vote: &EpochRootQuorumVote2<TYPES>,
115 event: Arc<HotShotEvent<TYPES>>,
116 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
117 task_state: &mut ConsensusTaskState<TYPES, I>,
118) -> Result<()> {
119 ensure!(
120 vote.vote
121 .data
122 .block_number
123 .is_some_and(|bn| is_epoch_root(bn, task_state.epoch_height)),
124 error!("Received epoch root quorum vote for non epoch root block.")
125 );
126
127 let epoch_membership = task_state
128 .membership_coordinator
129 .membership_for_epoch(vote.vote.data.epoch)
130 .context(warn!("No stake table for epoch"))?;
131
132 let we_are_leader = epoch_membership.leader(vote.view_number() + 1)? == task_state.public_key;
133 ensure!(
134 we_are_leader,
135 info!("We are not the leader for view {}", vote.view_number() + 1)
136 );
137
138 handle_epoch_root_vote(
139 &mut task_state.epoch_root_vote_collectors,
140 vote,
141 task_state.public_key.clone(),
142 &epoch_membership,
143 task_state.id,
144 &event,
145 sender,
146 &task_state.upgrade_lock,
147 )
148 .await?;
149
150 Ok(())
151}
152
153pub(crate) async fn handle_timeout_vote_recv<TYPES: NodeType, I: NodeImplementation<TYPES>>(
155 vote: &TimeoutVote2<TYPES>,
156 event: Arc<HotShotEvent<TYPES>>,
157 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
158 task_state: &mut ConsensusTaskState<TYPES, I>,
159) -> Result<()> {
160 let epoch_membership = task_state
161 .membership_coordinator
162 .membership_for_epoch(task_state.cur_epoch)
163 .context(warn!("No stake table for epoch"))?;
164 ensure!(
166 epoch_membership.leader(vote.view_number() + 1)? == task_state.public_key,
167 info!("We are not the leader for view {}", vote.view_number() + 1)
168 );
169
170 handle_vote(
171 &mut task_state.timeout_vote_collectors,
172 vote,
173 task_state.public_key.clone(),
174 &task_state
175 .membership_coordinator
176 .membership_for_epoch(vote.data.epoch)?,
177 task_state.id,
178 &event,
179 sender,
180 &task_state.upgrade_lock,
181 EpochTransitionIndicator::NotInTransition,
182 )
183 .await?;
184
185 Ok(())
186}
187
188pub async fn send_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
195 new_view_number: ViewNumber,
196 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
197 task_state: &mut ConsensusTaskState<TYPES, I>,
198) -> Result<()> {
199 let version = task_state.upgrade_lock.version(new_view_number)?;
200 ensure!(
201 version >= EPOCH_VERSION,
202 debug!("HotStuff 2 upgrade not yet in effect")
203 );
204
205 let consensus_reader = task_state.consensus.read().await;
206 let high_qc = consensus_reader.high_qc().clone();
207 let is_eqc = high_qc
208 .data
209 .block_number
210 .is_some_and(|b| is_last_block(b, task_state.epoch_height));
211 let is_epoch_root = high_qc
212 .data
213 .block_number
214 .is_some_and(|b| is_epoch_root(b, task_state.epoch_height));
215 let state_cert = if is_epoch_root {
216 consensus_reader.state_cert().cloned()
217 } else {
218 None
219 };
220 drop(consensus_reader);
221
222 if is_eqc {
223 let maybe_next_epoch_high_qc = task_state
224 .consensus
225 .read()
226 .await
227 .next_epoch_high_qc()
228 .cloned();
229 ensure!(
230 maybe_next_epoch_high_qc
231 .as_ref()
232 .is_some_and(|neqc| neqc.data.leaf_commit == high_qc.data.leaf_commit),
233 "We've seen an extended QC but we don't have a corresponding next epoch extended QC"
234 );
235
236 tracing::debug!(
237 "Broadcasting Extended QC for view {} and epoch {:?}, my id {}.",
238 high_qc.view_number(),
239 high_qc.epoch(),
240 task_state.id
241 );
242 broadcast_event(
243 Arc::new(HotShotEvent::ExtendedQcSend(
244 high_qc,
245 maybe_next_epoch_high_qc.unwrap(),
246 task_state.public_key.clone(),
247 )),
248 sender,
249 )
250 .await;
251 } else {
252 let leader = task_state
253 .membership_coordinator
254 .membership_for_epoch(task_state.cur_epoch)?
255 .leader(new_view_number)?;
256
257 let (high_qc, maybe_next_epoch_qc) = if high_qc
258 .data
259 .block_number
260 .is_some_and(|b| is_epoch_transition(b, task_state.epoch_height))
261 {
262 let Some((qc, next_epoch_qc)) =
263 task_state.consensus.read().await.transition_qc().cloned()
264 else {
265 bail!("We don't have a transition QC");
266 };
267 ensure!(
268 next_epoch_qc.data.leaf_commit == qc.data.leaf_commit,
269 "Transition QC is invalid because leaf commits are not equal."
270 );
271 (qc, Some(next_epoch_qc))
272 } else {
273 (high_qc, None)
274 };
275
276 if is_epoch_root {
277 let Some(state_cert) = state_cert else {
279 bail!(
280 "We are sending an epoch root QC but we don't have the corresponding state \
281 cert."
282 );
283 };
284 ensure!(
285 check_qc_state_cert_correspondence(&high_qc, &state_cert, task_state.epoch_height),
286 "We are sending an epoch root QC but we don't have the corresponding state cert."
287 );
288
289 tracing::trace!(
290 "Sending epoch root QC for view {}, height {:?}",
291 high_qc.view_number(),
292 high_qc.data.block_number
293 );
294 broadcast_event(
295 Arc::new(HotShotEvent::EpochRootQcSend(
296 EpochRootQuorumCertificateV2 {
297 qc: high_qc,
298 state_cert,
299 },
300 leader,
301 task_state.public_key.clone(),
302 )),
303 sender,
304 )
305 .await;
306 } else {
307 tracing::trace!(
308 "Sending high QC for view {}, height {:?}",
309 high_qc.view_number(),
310 high_qc.data.block_number
311 );
312 broadcast_event(
313 Arc::new(HotShotEvent::HighQcSend(
314 high_qc,
315 maybe_next_epoch_qc,
316 leader,
317 task_state.public_key.clone(),
318 )),
319 sender,
320 )
321 .await;
322 }
323 }
324 Ok(())
325}
326
327#[instrument(skip_all)]
329pub(crate) async fn handle_view_change<TYPES: NodeType, I: NodeImplementation<TYPES>>(
330 new_view_number: ViewNumber,
331 epoch_number: Option<EpochNumber>,
332 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
333 task_state: &mut ConsensusTaskState<TYPES, I>,
334) -> Result<()> {
335 if epoch_number > task_state.cur_epoch {
336 task_state.cur_epoch = epoch_number;
337 if let Some(new_epoch) = epoch_number {
338 let _ = task_state.consensus.write().await.update_epoch(new_epoch);
339 tracing::info!("Progress: entered epoch {:>6}", *new_epoch);
340 }
341 }
342
343 ensure!(
344 new_view_number > task_state.cur_view,
345 "New view is not larger than the current view"
346 );
347
348 let old_view_number = task_state.cur_view;
349 tracing::debug!("Updating view from {old_view_number} to {new_view_number}");
350
351 if *old_view_number / 100 != *new_view_number / 100 {
352 tracing::info!("Progress: entered view {:>6}", *new_view_number);
353 }
354
355 let _ = send_high_qc(new_view_number, sender, task_state)
358 .await
359 .inspect_err(|e| {
360 tracing::debug!("High QC sending failed with error: {e:?}");
361 });
362
363 task_state.cur_view = new_view_number;
365 task_state
366 .consensus
367 .write()
368 .await
369 .update_view(new_view_number)?;
370
371 if let Some(cert) = task_state.upgrade_lock.decided_upgrade_cert()
373 && new_view_number == cert.data.new_version_first_view
374 {
375 tracing::error!("Version upgraded based on a decided upgrade cert: {cert:?}");
376 }
377
378 let timeout = task_state.timeout;
380 let new_timeout_task = spawn({
381 let stream = sender.clone();
382 let view_number = new_view_number;
383 async move {
384 sleep(Duration::from_millis(timeout)).await;
385 broadcast_event(
386 Arc::new(HotShotEvent::Timeout(
387 ViewNumber::new(*view_number),
388 epoch_number,
389 )),
390 &stream,
391 )
392 .await;
393 }
394 });
395
396 std::mem::replace(&mut task_state.timeout_task, new_timeout_task).abort();
398
399 let old_view_leader_key = task_state
400 .membership_coordinator
401 .membership_for_epoch(task_state.cur_epoch)
402 .context(warn!("No stake table for epoch"))?
403 .leader(old_view_number)?;
404
405 let consensus_reader = task_state.consensus.read().await;
406 consensus_reader
407 .metrics
408 .current_view
409 .set(usize::try_from(task_state.cur_view.u64()).unwrap());
410 let cur_view_time = Utc::now().timestamp();
411 if old_view_leader_key == task_state.public_key {
412 #[allow(clippy::cast_precision_loss)]
413 consensus_reader
414 .metrics
415 .view_duration_as_leader
416 .add_point((cur_view_time - task_state.cur_view_time) as f64);
417 }
418 task_state.cur_view_time = cur_view_time;
419
420 if usize::try_from(task_state.cur_view.u64()).unwrap()
423 > usize::try_from(consensus_reader.last_decided_view().u64()).unwrap()
424 {
425 consensus_reader
426 .metrics
427 .number_of_views_since_last_decide
428 .set(
429 usize::try_from(task_state.cur_view.u64()).unwrap()
430 - usize::try_from(consensus_reader.last_decided_view().u64()).unwrap(),
431 );
432 }
433
434 broadcast_event(
435 Event {
436 view_number: old_view_number,
437 event: EventType::ViewFinished {
438 view_number: old_view_number,
439 },
440 },
441 &task_state.output_event_stream,
442 )
443 .await;
444 Ok(())
445}
446
447#[instrument(skip_all)]
449pub(crate) async fn handle_timeout<TYPES: NodeType, I: NodeImplementation<TYPES>>(
450 view_number: ViewNumber,
451 epoch: Option<EpochNumber>,
452 sender: &Sender<Arc<HotShotEvent<TYPES>>>,
453 task_state: &mut ConsensusTaskState<TYPES, I>,
454) -> Result<()> {
455 ensure!(
456 task_state.cur_view <= view_number,
457 "Timeout event is for an old view"
458 );
459
460 ensure!(
461 task_state
462 .membership_coordinator
463 .stake_table_for_epoch(epoch)
464 .context(warn!("No stake table for epoch"))?
465 .has_stake(&task_state.public_key),
466 debug!("We were not chosen for the consensus committee for view {view_number}",)
467 );
468
469 let vote = TimeoutVote2::create_signed_vote(
470 TimeoutData2 {
471 view: view_number,
472 epoch,
473 },
474 view_number,
475 &task_state.public_key,
476 &task_state.private_key,
477 &task_state.upgrade_lock,
478 )
479 .wrap()
480 .context(error!("Failed to sign TimeoutData"))?;
481
482 broadcast_event(
483 Arc::new(HotShotEvent::TimeoutVoteSend(vote.clone())),
484 sender,
485 )
486 .await;
487
488 if task_state.upgrade_lock.decided_upgrade_cert().is_some() {
494 broadcast_event(
495 Event {
496 view_number,
497 event: EventType::LegacyTimeoutVoteEmitted { vote: vote.clone() },
498 },
499 &task_state.output_event_stream,
500 )
501 .await;
502 }
503
504 broadcast_event(
505 Event {
506 view_number,
507 event: EventType::ViewTimeout { view_number },
508 },
509 &task_state.output_event_stream,
510 )
511 .await;
512
513 tracing::error!(
514 "We did not receive evidence for view {view_number} in time, sending timeout vote for \
515 that view!"
516 );
517
518 broadcast_event(
519 Event {
520 view_number,
521 event: EventType::ReplicaViewTimeout { view_number },
522 },
523 &task_state.output_event_stream,
524 )
525 .await;
526
527 let leader = task_state
528 .membership_coordinator
529 .membership_for_epoch(task_state.cur_epoch)
530 .context(warn!("No stake table for epoch"))?
531 .leader(view_number);
532
533 let consensus_reader = task_state.consensus.read().await;
534 consensus_reader.metrics.number_of_timeouts.add(1);
535 if leader.as_ref().is_ok_and(|l| *l == task_state.public_key) {
536 consensus_reader.metrics.number_of_timeouts_as_leader.add(1);
537 }
538 drop(consensus_reader);
539 task_state
540 .consensus
541 .write()
542 .await
543 .update_validator_participation(
544 leader?,
545 task_state.cur_epoch.ok_or(debug!("No epoch"))?,
546 false,
547 );
548
549 Ok(())
550}