hotshot_task_impls/consensus/
mod.rs1use std::{sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use handlers::handle_epoch_root_quorum_vote_recv;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14 consensus::OuterConsensus,
15 data::{EpochNumber, ViewNumber},
16 epoch_membership::EpochMembershipCoordinator,
17 event::Event,
18 message::UpgradeLock,
19 simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, TimeoutCertificate2},
20 simple_vote::{HasEpoch, NextEpochQuorumVote2, QuorumVote2, TimeoutVote2},
21 traits::{
22 node_implementation::{NodeImplementation, NodeType},
23 signature_key::SignatureKey,
24 storage::Storage,
25 },
26 utils::{epoch_from_block_number, is_last_block},
27 vote::HasViewNumber,
28};
29use hotshot_utils::anytrace::*;
30use tokio::task::JoinHandle;
31use tracing::instrument;
32
33use self::handlers::{
34 handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
35};
36use crate::{
37 events::HotShotEvent,
38 helpers::{broadcast_view_change, validate_qc_and_next_epoch_qc},
39 vote_collection::{EpochRootVoteCollectorsMap, VoteCollectorsMap},
40};
41
42mod handlers;
44
45pub struct ConsensusTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
47 pub public_key: TYPES::SignatureKey,
49
50 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
52
53 pub instance_state: Arc<TYPES::InstanceState>,
55
56 pub network: Arc<I::Network>,
58
59 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
61
62 pub vote_collectors: VoteCollectorsMap<TYPES, QuorumVote2<TYPES>, QuorumCertificate2<TYPES>>,
64
65 pub epoch_root_vote_collectors: EpochRootVoteCollectorsMap<TYPES>,
67
68 pub next_epoch_vote_collectors:
70 VoteCollectorsMap<TYPES, NextEpochQuorumVote2<TYPES>, NextEpochQuorumCertificate2<TYPES>>,
71
72 pub timeout_vote_collectors:
74 VoteCollectorsMap<TYPES, TimeoutVote2<TYPES>, TimeoutCertificate2<TYPES>>,
75
76 pub cur_view: ViewNumber,
78
79 pub cur_view_time: i64,
81
82 pub cur_epoch: Option<EpochNumber>,
84
85 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
87
88 pub timeout_task: JoinHandle<()>,
90
91 pub timeout: u64,
93
94 pub consensus: OuterConsensus<TYPES>,
96
97 pub storage: I::Storage,
99
100 pub id: u64,
102
103 pub upgrade_lock: UpgradeLock<TYPES>,
105
106 pub epoch_height: u64,
108
109 pub view_start_time: Instant,
111
112 pub first_epoch: Option<(ViewNumber, EpochNumber)>,
114}
115
116impl<TYPES: NodeType, I: NodeImplementation<TYPES>> ConsensusTaskState<TYPES, I> {
117 #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, cur_epoch = self.cur_epoch.map(|x| *x)), name = "Consensus replica task", level = "error", target = "ConsensusTaskState")]
119 pub async fn handle(
120 &mut self,
121 event: Arc<HotShotEvent<TYPES>>,
122 sender: Sender<Arc<HotShotEvent<TYPES>>>,
123 ) -> Result<()> {
124 match event.as_ref() {
125 HotShotEvent::QuorumVoteRecv(vote) => {
126 if let Err(e) =
127 handle_quorum_vote_recv(vote, Arc::clone(&event), &sender, self).await
128 {
129 tracing::debug!("Failed to handle QuorumVoteRecv event; error = {e}");
130 }
131 },
132 HotShotEvent::EpochRootQuorumVoteRecv(vote) => {
133 if let Err(e) =
134 handle_epoch_root_quorum_vote_recv(vote, Arc::clone(&event), &sender, self)
135 .await
136 {
137 tracing::debug!("Failed to handle EpochRootQuorumVoteRecv event; error = {e}");
138 }
139 },
140 HotShotEvent::TimeoutVoteRecv(vote) => {
141 if let Err(e) =
142 handle_timeout_vote_recv(vote, Arc::clone(&event), &sender, self).await
143 {
144 tracing::debug!("Failed to handle TimeoutVoteRecv event; error = {e}");
145 }
146 },
147 HotShotEvent::SetFirstEpoch(view, epoch) => {
148 self.first_epoch = Some((*view, *epoch));
149 },
150 HotShotEvent::ViewChange(new_view_number, epoch_number) => {
151 let frequency = (self.epoch_height / 30).clamp(1, 100);
157 if **new_view_number % frequency == 0 {
158 let _ = self
159 .membership_coordinator
160 .membership_for_epoch(epoch_number.map(|e| e + 1));
161 }
162
163 if let Err(e) =
164 handle_view_change(*new_view_number, *epoch_number, &sender, self).await
165 {
166 tracing::trace!("Failed to handle ViewChange event; error = {e}");
167 }
168 self.view_start_time = Instant::now();
169 },
170 HotShotEvent::Timeout(view_number, epoch) => {
171 if let Err(e) = handle_timeout(*view_number, *epoch, &sender, self).await {
172 tracing::debug!("Failed to handle Timeout event; error = {e}");
173 }
174 },
175 HotShotEvent::ExtendedQc2Formed(eqc) => {
176 let cert_view = eqc.view_number();
177 let Some(cert_block_number) = eqc.data.block_number else {
178 tracing::error!("Received extended QC but no block number");
179 return Ok(());
180 };
181 let cert_epoch = epoch_from_block_number(cert_block_number, self.epoch_height);
182 tracing::error!("Formed Extended QC for view {cert_view} and epoch {cert_epoch}.");
183 let next_epoch = EpochNumber::new(cert_epoch + 1);
185 broadcast_view_change(&sender, cert_view + 1, Some(next_epoch), self.first_epoch)
186 .await;
187 tracing::info!("Entering new epoch: {next_epoch}");
188 tracing::info!(
189 "Stake table for epoch {}:\n\n{:?}",
190 next_epoch,
191 self.membership_coordinator
192 .stake_table_for_epoch(Some(next_epoch))?
193 .stake_table()
194 .cloned()
195 .collect::<Vec<_>>()
196 );
197 },
198 HotShotEvent::ExtendedQcRecv(high_qc, next_epoch_high_qc, _) => {
199 if !high_qc
200 .data
201 .block_number
202 .is_some_and(|bn| is_last_block(bn, self.epoch_height))
203 {
204 tracing::warn!("Received extended QC but we can't verify the leaf is extended");
205 return Ok(());
206 }
207 if let Err(e) = validate_qc_and_next_epoch_qc(
208 high_qc,
209 Some(next_epoch_high_qc),
210 &self.consensus,
211 &self.membership_coordinator,
212 &self.upgrade_lock,
213 self.epoch_height,
214 )
215 .await
216 {
217 tracing::error!("Received invalid extended QC: {e}");
218 return Ok(());
219 }
220
221 let next_epoch = high_qc.data.epoch().map(|x| x + 1);
222
223 let mut consensus_writer = self.consensus.write().await;
224 let high_qc_updated = consensus_writer.update_high_qc(high_qc.clone()).is_ok();
225 let next_high_qc_updated = consensus_writer
226 .update_next_epoch_high_qc(next_epoch_high_qc.clone())
227 .is_ok();
228 if let Some(next_epoch) = next_epoch {
229 consensus_writer.update_validator_participation_epoch(next_epoch);
230 }
231 drop(consensus_writer);
232
233 self.storage
234 .update_high_qc2(high_qc.clone())
235 .await
236 .map_err(|_| warn!("Failed to update high QC"))?;
237 self.storage
238 .update_next_epoch_high_qc2(next_epoch_high_qc.clone())
239 .await
240 .map_err(|_| warn!("Failed to update next epoch high QC"))?;
241 self.storage
242 .update_eqc(high_qc.clone(), next_epoch_high_qc.clone())
243 .await
244 .map_err(|_| warn!("Failed to store eQC"))?;
245
246 tracing::debug!(
247 "Received Extended QC for view {} and epoch {:?}.",
248 high_qc.view_number(),
249 high_qc.epoch()
250 );
251 if high_qc_updated || next_high_qc_updated {
252 let next_epoch = high_qc.data.epoch().map(|x| x + 1);
254 tracing::info!("Entering new epoch: {next_epoch:?}");
255 broadcast_view_change(
256 &sender,
257 high_qc.view_number() + 1,
258 next_epoch,
259 self.first_epoch,
260 )
261 .await;
262 }
263 },
264 _ => {},
265 }
266
267 Ok(())
268 }
269}
270
271#[async_trait]
272impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for ConsensusTaskState<TYPES, I> {
273 type Event = HotShotEvent<TYPES>;
274
275 async fn handle_event(
276 &mut self,
277 event: Arc<Self::Event>,
278 sender: &Sender<Arc<Self::Event>>,
279 _receiver: &Receiver<Arc<Self::Event>>,
280 ) -> Result<()> {
281 if self.upgrade_lock.new_protocol_active(self.cur_view) {
282 return Ok(());
283 }
284 self.handle(event, sender.clone()).await
285 }
286
287 fn cancel_subtasks(&mut self) {
289 std::mem::replace(&mut self.timeout_task, tokio::spawn(async {})).abort();
291 }
292}