Skip to main content

hotshot_task_impls/consensus/
mod.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use handlers::handle_epoch_root_quorum_vote_recv;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14    consensus::OuterConsensus,
15    data::{EpochNumber, ViewNumber},
16    epoch_membership::EpochMembershipCoordinator,
17    event::Event,
18    message::UpgradeLock,
19    simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, TimeoutCertificate2},
20    simple_vote::{HasEpoch, NextEpochQuorumVote2, QuorumVote2, TimeoutVote2},
21    traits::{
22        node_implementation::{NodeImplementation, NodeType},
23        signature_key::SignatureKey,
24        storage::Storage,
25    },
26    utils::{epoch_from_block_number, is_last_block},
27    vote::HasViewNumber,
28};
29use hotshot_utils::anytrace::*;
30use tokio::task::JoinHandle;
31use tracing::instrument;
32
33use self::handlers::{
34    handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
35};
36use crate::{
37    events::HotShotEvent,
38    helpers::{broadcast_view_change, validate_qc_and_next_epoch_qc},
39    vote_collection::{EpochRootVoteCollectorsMap, VoteCollectorsMap},
40};
41
42/// Event handlers for use in the `handle` method.
43mod handlers;
44
45/// Task state for the Consensus task.
46pub struct ConsensusTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
47    /// Our public key
48    pub public_key: TYPES::SignatureKey,
49
50    /// Our Private Key
51    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
52
53    /// Immutable instance state
54    pub instance_state: Arc<TYPES::InstanceState>,
55
56    /// The underlying network
57    pub network: Arc<I::Network>,
58
59    /// Membership for Quorum Certs/votes
60    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
61
62    /// A map of `QuorumVote` collector tasks.
63    pub vote_collectors: VoteCollectorsMap<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>,
64
65    /// A map of `EpochRootQuorumVote` collector tasks.
66    pub epoch_root_vote_collectors: EpochRootVoteCollectorsMap<TYPES>,
67
68    /// A map of `QuorumVote` collector tasks. They collect votes from the nodes in the next epoch.
69    pub next_epoch_vote_collectors:
70        VoteCollectorsMap<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>,
71
72    /// A map of `TimeoutVote` collector tasks.
73    pub timeout_vote_collectors:
74        VoteCollectorsMap<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>,
75
76    /// The view number that this node is currently executing in.
77    pub cur_view: ViewNumber,
78
79    /// Timestamp this view starts at.
80    pub cur_view_time: i64,
81
82    /// The epoch number that this node is currently executing in.
83    pub cur_epoch: Option<EpochNumber>,
84
85    /// Output events to application
86    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
87
88    /// Timeout task handle
89    pub timeout_task: JoinHandle<()>,
90
91    /// View timeout from config.
92    pub timeout: u64,
93
94    /// A reference to the metrics trait.
95    pub consensus: OuterConsensus<TYPES>,
96
97    /// A reference to the storage trait.
98    pub storage: I::Storage,
99
100    /// The node's id
101    pub id: u64,
102
103    /// Lock for a decided upgrade
104    pub upgrade_lock: UpgradeLock<TYPES>,
105
106    /// Number of blocks in an epoch, zero means there are no epochs
107    pub epoch_height: u64,
108
109    /// The time this view started
110    pub view_start_time: Instant,
111
112    /// First view in which epoch version takes effect
113    pub first_epoch: Option<(ViewNumber, EpochNumber)>,
114}
115
116impl<TYPES: NodeType, I: NodeImplementation<TYPES>> ConsensusTaskState<TYPES, I> {
117    /// Handles a consensus event received on the event stream
118    #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, cur_epoch = self.cur_epoch.map(|x| *x)), name = "Consensus replica task", level = "error", target = "ConsensusTaskState")]
119    pub async fn handle(
120        &mut self,
121        event: Arc<HotShotEvent<TYPES>>,
122        sender: Sender<Arc<HotShotEvent<TYPES>>>,
123    ) -> Result<()> {
124        match event.as_ref() {
125            HotShotEvent::QuorumVoteRecv(vote) => {
126                if let Err(e) =
127                    handle_quorum_vote_recv(vote, Arc::clone(&event), &sender, self).await
128                {
129                    tracing::debug!("Failed to handle QuorumVoteRecv event; error = {e}");
130                }
131            },
132            HotShotEvent::EpochRootQuorumVoteRecv(vote) => {
133                if let Err(e) =
134                    handle_epoch_root_quorum_vote_recv(vote, Arc::clone(&event), &sender, self)
135                        .await
136                {
137                    tracing::debug!("Failed to handle EpochRootQuorumVoteRecv event; error = {e}");
138                }
139            },
140            HotShotEvent::TimeoutVoteRecv(vote) => {
141                if let Err(e) =
142                    handle_timeout_vote_recv(vote, Arc::clone(&event), &sender, self).await
143                {
144                    tracing::debug!("Failed to handle TimeoutVoteRecv event; error = {e}");
145                }
146            },
147            HotShotEvent::SetFirstEpoch(view, epoch) => {
148                self.first_epoch = Some((*view, *epoch));
149            },
150            HotShotEvent::ViewChange(new_view_number, epoch_number) => {
151                // Request the randomized stake table for the subsequent epoch,
152                // to trigger catchup and the DRB calculation if it happens to be missing.
153                //
154                // the frequency is dynamic, depending on the epoch height. if the epoch height is low
155                // (e.g. like it is in tests), we do this every view
156                let frequency = (self.epoch_height / 30).clamp(1, 100);
157                if **new_view_number % frequency == 0 {
158                    let _ = self
159                        .membership_coordinator
160                        .membership_for_epoch(epoch_number.map(|e| e + 1));
161                }
162
163                if let Err(e) =
164                    handle_view_change(*new_view_number, *epoch_number, &sender, self).await
165                {
166                    tracing::trace!("Failed to handle ViewChange event; error = {e}");
167                }
168                self.view_start_time = Instant::now();
169            },
170            HotShotEvent::Timeout(view_number, epoch) => {
171                if let Err(e) = handle_timeout(*view_number, *epoch, &sender, self).await {
172                    tracing::debug!("Failed to handle Timeout event; error = {e}");
173                }
174            },
175            HotShotEvent::ExtendedQc2Formed(eqc) => {
176                let cert_view = eqc.view_number();
177                let Some(cert_block_number) = eqc.data.block_number else {
178                    tracing::error!("Received extended QC but no block number");
179                    return Ok(());
180                };
181                let cert_epoch = epoch_from_block_number(cert_block_number, self.epoch_height);
182                tracing::error!("Formed Extended QC for view {cert_view} and epoch {cert_epoch}.");
183                // Transition to the new epoch by sending ViewChange
184                let next_epoch = EpochNumber::new(cert_epoch + 1);
185                broadcast_view_change(&sender, cert_view + 1, Some(next_epoch), self.first_epoch)
186                    .await;
187                tracing::info!("Entering new epoch: {next_epoch}");
188                tracing::info!(
189                    "Stake table for epoch {}:\n\n{:?}",
190                    next_epoch,
191                    self.membership_coordinator
192                        .stake_table_for_epoch(Some(next_epoch))?
193                        .stake_table()
194                        .cloned()
195                        .collect::<Vec<_>>()
196                );
197            },
198            HotShotEvent::ExtendedQcRecv(high_qc, next_epoch_high_qc, _) => {
199                if !high_qc
200                    .data
201                    .block_number
202                    .is_some_and(|bn| is_last_block(bn, self.epoch_height))
203                {
204                    tracing::warn!("Received extended QC but we can't verify the leaf is extended");
205                    return Ok(());
206                }
207                if let Err(e) = validate_qc_and_next_epoch_qc(
208                    high_qc,
209                    Some(next_epoch_high_qc),
210                    &self.consensus,
211                    &self.membership_coordinator,
212                    &self.upgrade_lock,
213                    self.epoch_height,
214                )
215                .await
216                {
217                    tracing::error!("Received invalid extended QC: {e}");
218                    return Ok(());
219                }
220
221                let next_epoch = high_qc.data.epoch().map(|x| x + 1);
222
223                let mut consensus_writer = self.consensus.write().await;
224                let high_qc_updated = consensus_writer.update_high_qc(high_qc.clone()).is_ok();
225                let next_high_qc_updated = consensus_writer
226                    .update_next_epoch_high_qc(next_epoch_high_qc.clone())
227                    .is_ok();
228                if let Some(next_epoch) = next_epoch {
229                    consensus_writer.update_validator_participation_epoch(next_epoch);
230                }
231                drop(consensus_writer);
232
233                self.storage
234                    .update_high_qc2(high_qc.clone())
235                    .await
236                    .map_err(|_| warn!("Failed to update high QC"))?;
237                self.storage
238                    .update_next_epoch_high_qc2(next_epoch_high_qc.clone())
239                    .await
240                    .map_err(|_| warn!("Failed to update next epoch high QC"))?;
241                self.storage
242                    .update_eqc(high_qc.clone(), next_epoch_high_qc.clone())
243                    .await
244                    .map_err(|_| warn!("Failed to store eQC"))?;
245
246                tracing::debug!(
247                    "Received Extended QC for view {} and epoch {:?}.",
248                    high_qc.view_number(),
249                    high_qc.epoch()
250                );
251                if high_qc_updated || next_high_qc_updated {
252                    // Send ViewChange indicating new view and new epoch.
253                    let next_epoch = high_qc.data.epoch().map(|x| x + 1);
254                    tracing::info!("Entering new epoch: {next_epoch:?}");
255                    broadcast_view_change(
256                        &sender,
257                        high_qc.view_number() + 1,
258                        next_epoch,
259                        self.first_epoch,
260                    )
261                    .await;
262                }
263            },
264            _ => {},
265        }
266
267        Ok(())
268    }
269}
270
271#[async_trait]
272impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for ConsensusTaskState<TYPES, I> {
273    type Event = HotShotEvent<TYPES>;
274
275    async fn handle_event(
276        &mut self,
277        event: Arc<Self::Event>,
278        sender: &Sender<Arc<Self::Event>>,
279        _receiver: &Receiver<Arc<Self::Event>>,
280    ) -> Result<()> {
281        if self.upgrade_lock.new_protocol_active(self.cur_view) {
282            return Ok(());
283        }
284        self.handle(event, sender.clone()).await
285    }
286
287    /// Joins all subtasks.
288    fn cancel_subtasks(&mut self) {
289        // Cancel the old timeout task
290        std::mem::replace(&mut self.timeout_task, tokio::spawn(async {})).abort();
291    }
292}