Skip to main content

hotshot_task_impls/consensus/
handlers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Duration};
8
9use async_broadcast::Sender;
10use chrono::Utc;
11use hotshot_types::{
12    data::{EpochNumber, ViewNumber},
13    event::{Event, EventType},
14    simple_certificate::{EpochRootQuorumCertificateV2, check_qc_state_cert_correspondence},
15    simple_vote::{EpochRootQuorumVote2, HasEpoch, QuorumVote2, TimeoutData2, TimeoutVote2},
16    traits::node_implementation::{NodeImplementation, NodeType},
17    utils::{EpochTransitionIndicator, is_epoch_root, is_epoch_transition, is_last_block},
18    vote::{HasViewNumber, Vote},
19};
20use hotshot_utils::anytrace::*;
21use tokio::{spawn, time::sleep};
22use tracing::instrument;
23use versions::EPOCH_VERSION;
24
25use super::ConsensusTaskState;
26use crate::{
27    events::HotShotEvent,
28    helpers::broadcast_event,
29    vote_collection::{handle_epoch_root_vote, handle_vote},
30};
31
32/// Handle a `QuorumVoteRecv` event.
33pub(crate) async fn handle_quorum_vote_recv<TYPES: NodeType, I: NodeImplementation<TYPES>>(
34    vote: &QuorumVote2<TYPES>,
35    event: Arc<HotShotEvent<TYPES>>,
36    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
37    task_state: &mut ConsensusTaskState<TYPES, I>,
38) -> Result<()> {
39    let in_transition = task_state
40        .consensus
41        .read()
42        .await
43        .is_high_qc_for_epoch_transition();
44    let epoch_membership = task_state
45        .membership_coordinator
46        .membership_for_epoch(vote.data.epoch)
47        .context(warn!("No stake table for epoch"))?;
48
49    let we_are_leader = epoch_membership.leader(vote.view_number() + 1)? == task_state.public_key;
50    ensure!(
51        in_transition || we_are_leader,
52        info!(
53            "We are not the leader for view {} and we are not in the epoch transition",
54            vote.view_number() + 1
55        )
56    );
57
58    let transition_indicator = if in_transition {
59        EpochTransitionIndicator::InTransition
60    } else {
61        EpochTransitionIndicator::NotInTransition
62    };
63    handle_vote(
64        &mut task_state.vote_collectors,
65        vote,
66        task_state.public_key.clone(),
67        &epoch_membership,
68        task_state.id,
69        &event,
70        sender,
71        &task_state.upgrade_lock,
72        transition_indicator.clone(),
73    )
74    .await?;
75
76    if vote.epoch().is_some()
77        && vote
78            .data
79            .block_number
80            .is_some_and(|b| is_epoch_transition(b, task_state.epoch_height))
81    {
82        // If the vote sender belongs to the next epoch, collect it separately to form the second QC
83        let has_stake = epoch_membership
84            .next_epoch_stake_table()?
85            .has_stake(&vote.signing_key());
86        if has_stake {
87            handle_vote(
88                &mut task_state.next_epoch_vote_collectors,
89                &vote.clone().into(),
90                task_state.public_key.clone(),
91                // We eventually verify in `handle_vote` that we are the leader before assembling the certificate here,
92                // so we must request the full randomized stake table.
93                //
94                // I'm not sure this is really necessary, but I've opted not to modify the logic.
95                &epoch_membership.next_epoch()?.clone(),
96                task_state.id,
97                &event,
98                sender,
99                &task_state.upgrade_lock,
100                transition_indicator,
101            )
102            .await?;
103        }
104    }
105
106    Ok(())
107}
108
109/// Handle a `QuorumVoteRecv` event.
110pub(crate) async fn handle_epoch_root_quorum_vote_recv<
111    TYPES: NodeType,
112    I: NodeImplementation<TYPES>,
113>(
114    vote: &EpochRootQuorumVote2<TYPES>,
115    event: Arc<HotShotEvent<TYPES>>,
116    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
117    task_state: &mut ConsensusTaskState<TYPES, I>,
118) -> Result<()> {
119    ensure!(
120        vote.vote
121            .data
122            .block_number
123            .is_some_and(|bn| is_epoch_root(bn, task_state.epoch_height)),
124        error!("Received epoch root quorum vote for non epoch root block.")
125    );
126
127    let epoch_membership = task_state
128        .membership_coordinator
129        .membership_for_epoch(vote.vote.data.epoch)
130        .context(warn!("No stake table for epoch"))?;
131
132    let we_are_leader = epoch_membership.leader(vote.view_number() + 1)? == task_state.public_key;
133    ensure!(
134        we_are_leader,
135        info!("We are not the leader for view {}", vote.view_number() + 1)
136    );
137
138    handle_epoch_root_vote(
139        &mut task_state.epoch_root_vote_collectors,
140        vote,
141        task_state.public_key.clone(),
142        &epoch_membership,
143        task_state.id,
144        &event,
145        sender,
146        &task_state.upgrade_lock,
147    )
148    .await?;
149
150    Ok(())
151}
152
153/// Handle a `TimeoutVoteRecv` event.
154pub(crate) async fn handle_timeout_vote_recv<TYPES: NodeType, I: NodeImplementation<TYPES>>(
155    vote: &TimeoutVote2<TYPES>,
156    event: Arc<HotShotEvent<TYPES>>,
157    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
158    task_state: &mut ConsensusTaskState<TYPES, I>,
159) -> Result<()> {
160    let epoch_membership = task_state
161        .membership_coordinator
162        .membership_for_epoch(task_state.cur_epoch)
163        .context(warn!("No stake table for epoch"))?;
164    // Are we the leader for this view?
165    ensure!(
166        epoch_membership.leader(vote.view_number() + 1)? == task_state.public_key,
167        info!("We are not the leader for view {}", vote.view_number() + 1)
168    );
169
170    handle_vote(
171        &mut task_state.timeout_vote_collectors,
172        vote,
173        task_state.public_key.clone(),
174        &task_state
175            .membership_coordinator
176            .membership_for_epoch(vote.data.epoch)?,
177        task_state.id,
178        &event,
179        sender,
180        &task_state.upgrade_lock,
181        EpochTransitionIndicator::NotInTransition,
182    )
183    .await?;
184
185    Ok(())
186}
187
188/// Send an event to the next leader containing the highest QC we have
189/// This is a necessary part of HotStuff 2 but not the original HotStuff
190///
191/// #Errors
192/// Returns and error if we can't get the version or the version doesn't
193/// yet support HS 2
194pub async fn send_high_qc<TYPES: NodeType, I: NodeImplementation<TYPES>>(
195    new_view_number: ViewNumber,
196    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
197    task_state: &mut ConsensusTaskState<TYPES, I>,
198) -> Result<()> {
199    let version = task_state.upgrade_lock.version(new_view_number)?;
200    ensure!(
201        version >= EPOCH_VERSION,
202        debug!("HotStuff 2 upgrade not yet in effect")
203    );
204
205    let consensus_reader = task_state.consensus.read().await;
206    let high_qc = consensus_reader.high_qc().clone();
207    let is_eqc = high_qc
208        .data
209        .block_number
210        .is_some_and(|b| is_last_block(b, task_state.epoch_height));
211    let is_epoch_root = high_qc
212        .data
213        .block_number
214        .is_some_and(|b| is_epoch_root(b, task_state.epoch_height));
215    let state_cert = if is_epoch_root {
216        consensus_reader.state_cert().cloned()
217    } else {
218        None
219    };
220    drop(consensus_reader);
221
222    if is_eqc {
223        let maybe_next_epoch_high_qc = task_state
224            .consensus
225            .read()
226            .await
227            .next_epoch_high_qc()
228            .cloned();
229        ensure!(
230            maybe_next_epoch_high_qc
231                .as_ref()
232                .is_some_and(|neqc| neqc.data.leaf_commit == high_qc.data.leaf_commit),
233            "We've seen an extended QC but we don't have a corresponding next epoch extended QC"
234        );
235
236        tracing::debug!(
237            "Broadcasting Extended QC for view {} and epoch {:?}, my id {}.",
238            high_qc.view_number(),
239            high_qc.epoch(),
240            task_state.id
241        );
242        broadcast_event(
243            Arc::new(HotShotEvent::ExtendedQcSend(
244                high_qc,
245                maybe_next_epoch_high_qc.unwrap(),
246                task_state.public_key.clone(),
247            )),
248            sender,
249        )
250        .await;
251    } else {
252        let leader = task_state
253            .membership_coordinator
254            .membership_for_epoch(task_state.cur_epoch)?
255            .leader(new_view_number)?;
256
257        let (high_qc, maybe_next_epoch_qc) = if high_qc
258            .data
259            .block_number
260            .is_some_and(|b| is_epoch_transition(b, task_state.epoch_height))
261        {
262            let Some((qc, next_epoch_qc)) =
263                task_state.consensus.read().await.transition_qc().cloned()
264            else {
265                bail!("We don't have a transition QC");
266            };
267            ensure!(
268                next_epoch_qc.data.leaf_commit == qc.data.leaf_commit,
269                "Transition QC is invalid because leaf commits are not equal."
270            );
271            (qc, Some(next_epoch_qc))
272        } else {
273            (high_qc, None)
274        };
275
276        if is_epoch_root {
277            // For epoch root QC, we are sending high QC and state cert
278            let Some(state_cert) = state_cert else {
279                bail!(
280                    "We are sending an epoch root QC but we don't have the corresponding state \
281                     cert."
282                );
283            };
284            ensure!(
285                check_qc_state_cert_correspondence(&high_qc, &state_cert, task_state.epoch_height),
286                "We are sending an epoch root QC but we don't have the corresponding state cert."
287            );
288
289            tracing::trace!(
290                "Sending epoch root QC for view {}, height {:?}",
291                high_qc.view_number(),
292                high_qc.data.block_number
293            );
294            broadcast_event(
295                Arc::new(HotShotEvent::EpochRootQcSend(
296                    EpochRootQuorumCertificateV2 {
297                        qc: high_qc,
298                        state_cert,
299                    },
300                    leader,
301                    task_state.public_key.clone(),
302                )),
303                sender,
304            )
305            .await;
306        } else {
307            tracing::trace!(
308                "Sending high QC for view {}, height {:?}",
309                high_qc.view_number(),
310                high_qc.data.block_number
311            );
312            broadcast_event(
313                Arc::new(HotShotEvent::HighQcSend(
314                    high_qc,
315                    maybe_next_epoch_qc,
316                    leader,
317                    task_state.public_key.clone(),
318                )),
319                sender,
320            )
321            .await;
322        }
323    }
324    Ok(())
325}
326
327/// Handle a `ViewChange` event.
328#[instrument(skip_all)]
329pub(crate) async fn handle_view_change<TYPES: NodeType, I: NodeImplementation<TYPES>>(
330    new_view_number: ViewNumber,
331    epoch_number: Option<EpochNumber>,
332    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
333    task_state: &mut ConsensusTaskState<TYPES, I>,
334) -> Result<()> {
335    if epoch_number > task_state.cur_epoch {
336        task_state.cur_epoch = epoch_number;
337        if let Some(new_epoch) = epoch_number {
338            let _ = task_state.consensus.write().await.update_epoch(new_epoch);
339            tracing::info!("Progress: entered epoch {:>6}", *new_epoch);
340        }
341    }
342
343    ensure!(
344        new_view_number > task_state.cur_view,
345        "New view is not larger than the current view"
346    );
347
348    let old_view_number = task_state.cur_view;
349    tracing::debug!("Updating view from {old_view_number} to {new_view_number}");
350
351    if *old_view_number / 100 != *new_view_number / 100 {
352        tracing::info!("Progress: entered view {:>6}", *new_view_number);
353    }
354
355    // Send our high qc to the next leader immediately upon finishing a view.
356    // Part of HotStuff 2
357    let _ = send_high_qc(new_view_number, sender, task_state)
358        .await
359        .inspect_err(|e| {
360            tracing::debug!("High QC sending failed with error: {e:?}");
361        });
362
363    // Move this node to the next view
364    task_state.cur_view = new_view_number;
365    task_state
366        .consensus
367        .write()
368        .await
369        .update_view(new_view_number)?;
370
371    // If we have a decided upgrade certificate, the protocol version may also have been upgraded.
372    if let Some(cert) = task_state.upgrade_lock.decided_upgrade_cert()
373        && new_view_number == cert.data.new_version_first_view
374    {
375        tracing::error!("Version upgraded based on a decided upgrade cert: {cert:?}");
376    }
377
378    // Spawn a timeout task if we did actually update view
379    let timeout = task_state.timeout;
380    let new_timeout_task = spawn({
381        let stream = sender.clone();
382        let view_number = new_view_number;
383        async move {
384            sleep(Duration::from_millis(timeout)).await;
385            broadcast_event(
386                Arc::new(HotShotEvent::Timeout(
387                    ViewNumber::new(*view_number),
388                    epoch_number,
389                )),
390                &stream,
391            )
392            .await;
393        }
394    });
395
396    // Cancel the old timeout task
397    std::mem::replace(&mut task_state.timeout_task, new_timeout_task).abort();
398
399    let old_view_leader_key = task_state
400        .membership_coordinator
401        .membership_for_epoch(task_state.cur_epoch)
402        .context(warn!("No stake table for epoch"))?
403        .leader(old_view_number)?;
404
405    let consensus_reader = task_state.consensus.read().await;
406    consensus_reader
407        .metrics
408        .current_view
409        .set(usize::try_from(task_state.cur_view.u64()).unwrap());
410    let cur_view_time = Utc::now().timestamp();
411    if old_view_leader_key == task_state.public_key {
412        #[allow(clippy::cast_precision_loss)]
413        consensus_reader
414            .metrics
415            .view_duration_as_leader
416            .add_point((cur_view_time - task_state.cur_view_time) as f64);
417    }
418    task_state.cur_view_time = cur_view_time;
419
420    // Do the comparison before the subtraction to avoid potential overflow, since
421    // `last_decided_view` may be greater than `cur_view` if the node is catching up.
422    if usize::try_from(task_state.cur_view.u64()).unwrap()
423        > usize::try_from(consensus_reader.last_decided_view().u64()).unwrap()
424    {
425        consensus_reader
426            .metrics
427            .number_of_views_since_last_decide
428            .set(
429                usize::try_from(task_state.cur_view.u64()).unwrap()
430                    - usize::try_from(consensus_reader.last_decided_view().u64()).unwrap(),
431            );
432    }
433
434    broadcast_event(
435        Event {
436            view_number: old_view_number,
437            event: EventType::ViewFinished {
438                view_number: old_view_number,
439            },
440        },
441        &task_state.output_event_stream,
442    )
443    .await;
444    Ok(())
445}
446
447/// Handle a `Timeout` event.
448#[instrument(skip_all)]
449pub(crate) async fn handle_timeout<TYPES: NodeType, I: NodeImplementation<TYPES>>(
450    view_number: ViewNumber,
451    epoch: Option<EpochNumber>,
452    sender: &Sender<Arc<HotShotEvent<TYPES>>>,
453    task_state: &mut ConsensusTaskState<TYPES, I>,
454) -> Result<()> {
455    ensure!(
456        task_state.cur_view <= view_number,
457        "Timeout event is for an old view"
458    );
459
460    ensure!(
461        task_state
462            .membership_coordinator
463            .stake_table_for_epoch(epoch)
464            .context(warn!("No stake table for epoch"))?
465            .has_stake(&task_state.public_key),
466        debug!("We were not chosen for the consensus committee for view {view_number}",)
467    );
468
469    let vote = TimeoutVote2::create_signed_vote(
470        TimeoutData2 {
471            view: view_number,
472            epoch,
473        },
474        view_number,
475        &task_state.public_key,
476        &task_state.private_key,
477        &task_state.upgrade_lock,
478    )
479    .wrap()
480    .context(error!("Failed to sign TimeoutData"))?;
481
482    broadcast_event(
483        Arc::new(HotShotEvent::TimeoutVoteSend(vote.clone())),
484        sender,
485    )
486    .await;
487
488    // Forward the same vote to any external listener so the espresso bridge
489    // can submit it to the new-protocol coordinator at the legacy → 0.8
490    // upgrade boundary. Only emit when a target upgrade is decided (i.e.
491    // we know a cutover is coming) to avoid spurious events in normal
492    // operation. The check is cheap; the event payload is small.
493    if task_state.upgrade_lock.decided_upgrade_cert().is_some() {
494        broadcast_event(
495            Event {
496                view_number,
497                event: EventType::LegacyTimeoutVoteEmitted { vote: vote.clone() },
498            },
499            &task_state.output_event_stream,
500        )
501        .await;
502    }
503
504    broadcast_event(
505        Event {
506            view_number,
507            event: EventType::ViewTimeout { view_number },
508        },
509        &task_state.output_event_stream,
510    )
511    .await;
512
513    tracing::error!(
514        "We did not receive evidence for view {view_number} in time, sending timeout vote for \
515         that view!"
516    );
517
518    broadcast_event(
519        Event {
520            view_number,
521            event: EventType::ReplicaViewTimeout { view_number },
522        },
523        &task_state.output_event_stream,
524    )
525    .await;
526
527    let leader = task_state
528        .membership_coordinator
529        .membership_for_epoch(task_state.cur_epoch)
530        .context(warn!("No stake table for epoch"))?
531        .leader(view_number);
532
533    let consensus_reader = task_state.consensus.read().await;
534    consensus_reader.metrics.number_of_timeouts.add(1);
535    if leader.as_ref().is_ok_and(|l| *l == task_state.public_key) {
536        consensus_reader.metrics.number_of_timeouts_as_leader.add(1);
537    }
538    drop(consensus_reader);
539    task_state
540        .consensus
541        .write()
542        .await
543        .update_validator_participation(
544            leader?,
545            task_state.cur_epoch.ok_or(debug!("No epoch"))?,
546            false,
547        );
548
549    Ok(())
550}