Skip to main content

hotshot_task_impls/quorum_vote/
handlers.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Instant};
8
9use alloy::primitives::U256;
10use async_broadcast::{InactiveReceiver, Sender};
11use chrono::Utc;
12use committable::Committable;
13use hotshot_contract_adapter::light_client::derive_signed_state_digest;
14use hotshot_types::{
15    consensus::OuterConsensus,
16    data::{EpochNumber, Leaf2, QuorumProposalWrapper, VidDisperseShare, ViewNumber},
17    drb::INITIAL_DRB_RESULT,
18    epoch_membership::{EpochMembership, EpochMembershipCoordinator},
19    event::{Event, EventType},
20    message::{Proposal, UpgradeLock},
21    simple_vote::{
22        EpochRootQuorumVote2, HasEpoch, LightClientStateUpdateVote2, QuorumData2, QuorumVote2,
23    },
24    stake_table::HSStakeTable,
25    storage_metrics::StorageMetricsValue,
26    traits::{
27        ValidatedState,
28        block_contents::BlockHeader,
29        election::Membership,
30        node_implementation::{NodeImplementation, NodeType},
31        signature_key::{
32            LCV2StateSignatureKey, LCV3StateSignatureKey, SignatureKey, StateSignatureKey,
33        },
34        storage::Storage,
35    },
36    utils::{epoch_from_block_number, is_epoch_transition, is_last_block, is_transition_block},
37    vote::HasViewNumber,
38};
39use hotshot_utils::anytrace::*;
40use tracing::instrument;
41use versions::EPOCH_VERSION;
42
43use super::QuorumVoteTaskState;
44use crate::{
45    events::HotShotEvent,
46    helpers::{
47        LeafChainTraversalOutcome, broadcast_event, decide_from_proposal, decide_from_proposal_2,
48        fetch_proposal, handle_drb_result,
49    },
50};
51
52/// Store the DRB result for the next epoch if we received it in a decided leaf.
53async fn store_drb_result<TYPES: NodeType, I: NodeImplementation<TYPES>>(
54    task_state: &mut QuorumVoteTaskState<TYPES, I>,
55    decided_leaf: &Leaf2<TYPES>,
56) -> Result<()> {
57    if task_state.epoch_height == 0 {
58        tracing::info!("Epoch height is 0, skipping DRB storage.");
59        return Ok(());
60    }
61
62    let decided_block_number = decided_leaf.block_header().block_number();
63    let current_epoch_number = EpochNumber::new(epoch_from_block_number(
64        decided_block_number,
65        task_state.epoch_height,
66    ));
67    // Skip storing the received result if this is not the transition block.
68    if is_transition_block(decided_block_number, task_state.epoch_height) {
69        if let Some(result) = decided_leaf.next_drb_result {
70            // We don't need to check value existence and consistency because it should be
71            // impossible to decide on a block with different DRB results.
72            handle_drb_result::<TYPES, I>(
73                task_state.membership.membership(),
74                current_epoch_number + 1,
75                &task_state.storage,
76                result,
77            )
78            .await;
79        } else {
80            bail!("The last block of the epoch is decided but doesn't contain a DRB result.");
81        }
82    }
83    Ok(())
84}
85
86/// Handles the `QuorumProposalValidated` event.
87#[instrument(skip_all, fields(id = task_state.id, view = *proposal.view_number()))]
88pub(crate) async fn handle_quorum_proposal_validated<
89    TYPES: NodeType,
90    I: NodeImplementation<TYPES>,
91>(
92    proposal: &QuorumProposalWrapper<TYPES>,
93    task_state: &mut QuorumVoteTaskState<TYPES, I>,
94    event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
95) -> Result<()> {
96    let version = task_state.upgrade_lock.version(proposal.view_number())?;
97
98    let LeafChainTraversalOutcome {
99        new_locked_view_number,
100        new_decided_view_number,
101        committing_qc,
102        deciding_qc,
103        leaf_views,
104        included_txns,
105        decided_upgrade_cert,
106    } = if version >= EPOCH_VERSION {
107        // Skip the decide rule for the last block of the epoch.  This is so
108        // that we do not decide the block with epoch_height -2 before we enter the new epoch
109        if !is_last_block(
110            proposal.block_header().block_number(),
111            task_state.epoch_height,
112        ) {
113            decide_from_proposal_2::<TYPES, I>(
114                proposal,
115                OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
116                &task_state.upgrade_lock,
117                &task_state.public_key,
118                version >= EPOCH_VERSION,
119                &task_state.membership,
120                &task_state.storage,
121            )
122            .await
123        } else {
124            LeafChainTraversalOutcome::default()
125        }
126    } else {
127        decide_from_proposal::<TYPES, I>(
128            proposal,
129            OuterConsensus::new(Arc::clone(&task_state.consensus.inner_consensus)),
130            &task_state.upgrade_lock,
131            &task_state.public_key,
132            version >= EPOCH_VERSION,
133            &task_state.membership,
134            &task_state.storage,
135            task_state.epoch_height,
136        )
137        .await
138    };
139
140    if let (Some(cert), Some(_)) = (decided_upgrade_cert.clone(), new_decided_view_number) {
141        task_state
142            .upgrade_lock
143            .set_decided_upgrade_cert(cert.clone());
144        if cert.data.new_version >= EPOCH_VERSION
145            && task_state.upgrade_lock.upgrade().base < EPOCH_VERSION
146        {
147            let epoch_height = task_state.consensus.read().await.epoch_height;
148            let first_epoch_number = EpochNumber::new(epoch_from_block_number(
149                proposal.block_header().block_number(),
150                epoch_height,
151            ));
152
153            tracing::debug!("Calling set_first_epoch for epoch {first_epoch_number:?}");
154            task_state
155                .membership
156                .membership()
157                .set_first_epoch(first_epoch_number, INITIAL_DRB_RESULT);
158
159            broadcast_event(
160                Arc::new(HotShotEvent::SetFirstEpoch(
161                    cert.data.new_version_first_view,
162                    first_epoch_number,
163                )),
164                event_sender,
165            )
166            .await;
167        }
168
169        for da_committee in &task_state.da_committees {
170            if cert.data.new_version >= da_committee.start_version {
171                task_state.membership.membership().add_da_committee(
172                    da_committee.start_epoch.into(),
173                    da_committee.committee.clone(),
174                );
175            }
176        }
177
178        let _ = task_state
179            .storage
180            .update_decided_upgrade_certificate(Some(cert.clone()))
181            .await;
182    }
183
184    let mut consensus_writer = task_state.consensus.write().await;
185    if let Some(locked_view_number) = new_locked_view_number {
186        let _ = consensus_writer.update_locked_view(locked_view_number);
187    }
188
189    #[allow(clippy::cast_precision_loss)]
190    if let Some(decided_view_number) = new_decided_view_number {
191        // Bring in the cleanup crew. When a new decide is indeed valid, we need to clear out old memory.
192
193        let old_decided_view = consensus_writer.last_decided_view();
194        consensus_writer.collect_garbage(old_decided_view, decided_view_number);
195
196        // Set the new decided view.
197        consensus_writer
198            .update_last_decided_view(decided_view_number)
199            .context(|e| {
200                warn!("`update_last_decided_view` failed; this should never happen. Error: {e}")
201            })?;
202
203        consensus_writer
204            .metrics
205            .last_decided_time
206            .set(Utc::now().timestamp().try_into().unwrap());
207        consensus_writer.metrics.invalid_qc.set(0);
208        consensus_writer
209            .metrics
210            .last_decided_view
211            .set(usize::try_from(consensus_writer.last_decided_view().u64()).unwrap());
212        let cur_number_of_views_per_decide_event =
213            *proposal.view_number() - consensus_writer.last_decided_view().u64();
214        consensus_writer
215            .metrics
216            .number_of_views_per_decide_event
217            .add_point(cur_number_of_views_per_decide_event as f64);
218        for leaf in leaf_views.iter().rev() {
219            let qc_epoch = leaf.leaf.justify_qc().epoch();
220            if qc_epoch > Some(consensus_writer.current_proposal_participation_epoch())
221                && let Some(e) = qc_epoch
222            {
223                consensus_writer.update_validator_participation_epoch(e);
224            }
225            if qc_epoch > consensus_writer.current_vote_participation_epoch() {
226                let (stake_table, success_threshold) = if let Ok(epoch_membership) =
227                    task_state.membership.stake_table_for_epoch(qc_epoch)
228                {
229                    (
230                        HSStakeTable::from_iter(epoch_membership.stake_table()),
231                        epoch_membership.success_threshold(),
232                    )
233                } else {
234                    tracing::warn!(
235                        "Failed to get stake table for epoch {:?} while updating vote \
236                         participation",
237                        qc_epoch
238                    );
239                    (HSStakeTable::default(), U256::MAX)
240                };
241                consensus_writer
242                    .update_vote_participation_epoch(stake_table, success_threshold, qc_epoch)
243                    .context(warn!("Updating vote participation"))?;
244            }
245            if let Err(e) = consensus_writer.update_vote_participation(leaf.leaf.justify_qc()) {
246                tracing::warn!("Failed to update vote participation: {e}");
247            }
248        }
249
250        // We don't need to hold this while we broadcast
251        drop(consensus_writer);
252
253        for leaf_info in &leaf_views {
254            tracing::info!(
255                "Sending decide for view {:?} at height {:?}",
256                leaf_info.leaf.view_number(),
257                leaf_info.leaf.block_header().block_number(),
258            );
259        }
260
261        broadcast_event(
262            Arc::new(HotShotEvent::LeavesDecided(
263                leaf_views
264                    .iter()
265                    .map(|leaf_info| leaf_info.leaf.clone())
266                    .collect(),
267            )),
268            event_sender,
269        )
270        .await;
271
272        // Send an update to everyone saying that we've reached a decide. The committing QC is never
273        // none if we've reached a new decide, so this is safe to unwrap.
274        let committing_qc = Arc::new(committing_qc.unwrap());
275        broadcast_event(
276            Event {
277                view_number: decided_view_number,
278                event: EventType::Decide {
279                    leaf_chain: Arc::new(leaf_views.clone()),
280                    committing_qc: committing_qc.clone(),
281                    deciding_qc: deciding_qc.map(Arc::new),
282                    block_size: included_txns.map(|txns| txns.len().try_into().unwrap()),
283                },
284            },
285            &task_state.output_event_stream,
286        )
287        .await;
288
289        tracing::debug!(
290            "Successfully sent decide event, leaf views: {:?}, leaf views len: {:?}, qc view: {:?}",
291            decided_view_number,
292            leaf_views.len(),
293            committing_qc.view_number()
294        );
295
296        if version >= EPOCH_VERSION {
297            for leaf_view in leaf_views {
298                store_drb_result(task_state, &leaf_view.leaf).await?;
299            }
300        }
301    }
302
303    Ok(())
304}
305
306/// Updates the shared consensus state with the new voting data.
307#[instrument(skip_all, target = "VoteDependencyHandle", fields(view = *view_number))]
308#[allow(clippy::too_many_arguments)]
309pub(crate) async fn update_shared_state<TYPES: NodeType>(
310    consensus: OuterConsensus<TYPES>,
311    sender: Sender<Arc<HotShotEvent<TYPES>>>,
312    receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
313    membership: EpochMembershipCoordinator<TYPES>,
314    public_key: TYPES::SignatureKey,
315    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
316    upgrade_lock: UpgradeLock<TYPES>,
317    view_number: ViewNumber,
318    instance_state: Arc<TYPES::InstanceState>,
319    proposed_leaf: &Leaf2<TYPES>,
320    vid_share: &Proposal<TYPES, VidDisperseShare<TYPES>>,
321    parent_view_number: Option<ViewNumber>,
322    epoch_height: u64,
323) -> Result<()> {
324    let justify_qc = &proposed_leaf.justify_qc();
325
326    let consensus_reader = consensus.read().await;
327    // Try to find the validated view within the validated state map. This will be present
328    // if we have the saved leaf, but if not we'll get it when we fetch_proposal.
329    let mut maybe_validated_view = parent_view_number.and_then(|view_number| {
330        consensus_reader
331            .validated_state_map()
332            .get(&view_number)
333            .cloned()
334    });
335
336    // Justify qc's leaf commitment should be the same as the parent's leaf commitment.
337    let mut maybe_parent = consensus_reader
338        .saved_leaves()
339        .get(&justify_qc.data.leaf_commit)
340        .cloned();
341
342    drop(consensus_reader);
343
344    maybe_parent = match maybe_parent {
345        Some(p) => Some(p),
346        None => {
347            match fetch_proposal(
348                justify_qc,
349                sender.clone(),
350                receiver.activate_cloned(),
351                membership.clone(),
352                OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
353                public_key.clone(),
354                private_key.clone(),
355                &upgrade_lock,
356                epoch_height,
357            )
358            .await
359            .ok()
360            {
361                Some((leaf, view)) => {
362                    maybe_validated_view = Some(view);
363                    Some(leaf)
364                },
365                None => None,
366            }
367        },
368    };
369
370    let parent = maybe_parent.context(info!(
371        "Proposal's parent missing from storage with commitment: {:?}, proposal view {}",
372        justify_qc.data.leaf_commit,
373        proposed_leaf.view_number(),
374    ))?;
375
376    let Some(validated_view) = maybe_validated_view else {
377        bail!("Failed to fetch view for parent, parent view {parent_view_number:?}");
378    };
379
380    let (Some(parent_state), _) = validated_view.state_and_delta() else {
381        bail!("Parent state not found! Consensus internally inconsistent");
382    };
383
384    let version = upgrade_lock.version(view_number)?;
385
386    let now = Instant::now();
387    let (validated_state, state_delta) = parent_state
388        .validate_and_apply_header(
389            &instance_state,
390            &parent,
391            &proposed_leaf.block_header().clone(),
392            vid_share.data.payload_byte_len(),
393            version,
394            *view_number,
395        )
396        .await
397        .wrap()
398        .context(warn!("Block header doesn't extend the proposal!"))?;
399    let validation_duration = now.elapsed();
400    tracing::debug!("Validation time: {validation_duration:?}");
401
402    let now = Instant::now();
403    // Now that we've rounded everyone up, we need to update the shared state
404    let mut consensus_writer = consensus.write().await;
405
406    if let Err(e) = consensus_writer.update_leaf(
407        proposed_leaf.clone(),
408        Arc::new(validated_state),
409        Some(Arc::new(state_delta)),
410    ) {
411        tracing::trace!("{e:?}");
412    }
413    let update_leaf_duration = now.elapsed();
414
415    consensus_writer
416        .metrics
417        .validate_and_apply_header_duration
418        .add_point(validation_duration.as_secs_f64());
419    consensus_writer
420        .metrics
421        .update_leaf_duration
422        .add_point(update_leaf_duration.as_secs_f64());
423    drop(consensus_writer);
424    tracing::debug!("update_leaf time: {update_leaf_duration:?}");
425
426    Ok(())
427}
428
429/// Submits the `QuorumVoteSend` event if all the dependencies are met.
430#[instrument(skip_all, fields(name = "Submit quorum vote", level = "error"))]
431#[allow(clippy::too_many_arguments)]
432pub(crate) async fn submit_vote<TYPES: NodeType, I: NodeImplementation<TYPES>>(
433    sender: Sender<Arc<HotShotEvent<TYPES>>>,
434    membership: EpochMembership<TYPES>,
435    public_key: TYPES::SignatureKey,
436    private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
437    upgrade_lock: UpgradeLock<TYPES>,
438    view_number: ViewNumber,
439    storage: I::Storage,
440    storage_metrics: Arc<StorageMetricsValue>,
441    leaf: Leaf2<TYPES>,
442    vid_share: Proposal<TYPES, VidDisperseShare<TYPES>>,
443    extended_vote: bool,
444    epoch_root_vote: bool,
445    epoch_height: u64,
446    state_private_key: &<TYPES::StateSignatureKey as StateSignatureKey>::StatePrivateKey,
447    stake_table_capacity: usize,
448) -> Result<()> {
449    let committee_member_in_current_epoch = membership.has_stake(&public_key);
450    // If the proposed leaf is for the last block in the epoch and the node is part of the quorum committee
451    // in the next epoch, the node should vote to achieve the double quorum.
452    let committee_member_in_next_epoch = leaf.with_epoch
453        && is_epoch_transition(leaf.height(), epoch_height)
454        && membership.next_epoch_stake_table()?.has_stake(&public_key);
455
456    ensure!(
457        committee_member_in_current_epoch || committee_member_in_next_epoch,
458        info!("We were not chosen for quorum committee on {view_number}")
459    );
460
461    let height = if membership.epoch().is_some() {
462        Some(leaf.height())
463    } else {
464        None
465    };
466
467    // Create and send the vote.
468    let vote = QuorumVote2::<TYPES>::create_signed_vote(
469        QuorumData2 {
470            leaf_commit: leaf.commit(),
471            epoch: membership.epoch(),
472            block_number: height,
473        },
474        view_number,
475        &public_key,
476        &private_key,
477        &upgrade_lock,
478    )
479    .wrap()
480    .context(error!("Failed to sign vote. This should never happen."))?;
481    let now = Instant::now();
482    // Add to the storage.
483    storage
484        .append_vid(&vid_share)
485        .await
486        .wrap()
487        .context(error!("Failed to store VID share"))?;
488    let append_vid_duration = now.elapsed();
489    storage_metrics
490        .append_vid_duration
491        .add_point(append_vid_duration.as_secs_f64());
492    tracing::debug!("append_vid_general time: {append_vid_duration:?}");
493
494    // Make epoch root vote
495
496    let epoch_enabled = upgrade_lock.epochs_enabled(view_number);
497    if extended_vote && epoch_enabled {
498        tracing::debug!("sending extended vote to everybody",);
499        broadcast_event(
500            Arc::new(HotShotEvent::ExtendedQuorumVoteSend(vote)),
501            &sender,
502        )
503        .await;
504    } else if epoch_root_vote && epoch_enabled {
505        tracing::debug!(
506            "sending epoch root vote to next quorum leader {:?}",
507            vote.view_number() + 1
508        );
509        let light_client_state = leaf
510            .block_header()
511            .get_light_client_state(view_number)
512            .wrap()
513            .context(error!("Failed to generate light client state"))?;
514        let next_stake_table =
515            HSStakeTable::from_iter(membership.next_epoch_stake_table()?.stake_table());
516        let next_stake_table_state = next_stake_table
517            .commitment(stake_table_capacity)
518            .wrap()
519            .context(error!("Failed to compute stake table commitment"))?;
520        // We are still providing LCV2 state signatures for backward compatibility
521        let v2_signature = <TYPES::StateSignatureKey as LCV2StateSignatureKey>::sign_state(
522            state_private_key,
523            &light_client_state,
524            &next_stake_table_state,
525        )
526        .wrap()
527        .context(error!("Failed to sign the light client state"))?;
528        let auth_root = leaf
529            .block_header()
530            .auth_root()
531            .wrap()
532            .context(error!(format!(
533                "Failed to get auth root for light client state certificate. view={view_number}"
534            )))?;
535        let signed_state_digest =
536            derive_signed_state_digest(&light_client_state, &next_stake_table_state, &auth_root);
537        let signature = <TYPES::StateSignatureKey as LCV3StateSignatureKey>::sign_state(
538            state_private_key,
539            signed_state_digest,
540        )
541        .wrap()
542        .context(error!("Failed to sign the light client state"))?;
543        let state_vote = LightClientStateUpdateVote2 {
544            epoch: EpochNumber::new(epoch_from_block_number(leaf.height(), epoch_height)),
545            light_client_state,
546            next_stake_table_state,
547            signature,
548            v2_signature,
549            auth_root,
550            signed_state_digest,
551        };
552        broadcast_event(
553            Arc::new(HotShotEvent::EpochRootQuorumVoteSend(
554                EpochRootQuorumVote2 { vote, state_vote },
555            )),
556            &sender,
557        )
558        .await;
559    } else {
560        tracing::debug!(
561            "sending vote to next quorum leader {:?}",
562            vote.view_number() + 1
563        );
564        broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await;
565    }
566
567    Ok(())
568}