1use std::{marker::PhantomData, sync::Arc, time::SystemTime};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14 consensus::OuterConsensus,
15 data::{EpochNumber, UpgradeProposal, ViewNumber},
16 epoch_membership::EpochMembershipCoordinator,
17 event::{Event, EventType},
18 message::{Proposal, UpgradeLock},
19 simple_certificate::UpgradeCertificate,
20 simple_vote::{UpgradeProposalData, UpgradeVote},
21 traits::{
22 block_contents::BlockHeader, node_implementation::NodeType, signature_key::SignatureKey,
23 },
24 utils::{EpochTransitionIndicator, epoch_from_block_number},
25 vote::HasViewNumber,
26};
27use hotshot_utils::anytrace::*;
28use tracing::instrument;
29use versions::EPOCH_VERSION;
30
31use crate::{
32 events::HotShotEvent,
33 helpers::broadcast_event,
34 vote_collection::{VoteCollectorsMap, handle_vote},
35};
36
37pub struct UpgradeTaskState<TYPES: NodeType> {
39 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
41
42 pub cur_view: ViewNumber,
44
45 pub cur_epoch: Option<EpochNumber>,
47
48 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
50
51 pub vote_collectors: VoteCollectorsMap<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>,
53
54 pub public_key: TYPES::SignatureKey,
56
57 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
59
60 pub id: u64,
62
63 pub epoch_start_block: u64,
65
66 pub start_proposing_view: u64,
68
69 pub stop_proposing_view: u64,
71
72 pub start_voting_view: u64,
74
75 pub stop_voting_view: u64,
77
78 pub start_proposing_time: u64,
80
81 pub stop_proposing_time: u64,
83
84 pub start_voting_time: u64,
86
87 pub stop_voting_time: u64,
89
90 pub upgrade_lock: UpgradeLock<TYPES>,
92
93 pub consensus: OuterConsensus<TYPES>,
95
96 pub epoch_height: u64,
98}
99
100impl<TYPES: NodeType> UpgradeTaskState<TYPES> {
101 fn upgraded(&self) -> bool {
103 self.upgrade_lock.decided_upgrade_cert().is_some()
104 }
105
106 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Upgrade Task", level = "error")]
108 pub async fn handle(
109 &mut self,
110 event: Arc<HotShotEvent<TYPES>>,
111 tx: Sender<Arc<HotShotEvent<TYPES>>>,
112 ) -> Result<()> {
113 let upgrade = self.upgrade_lock.upgrade();
114 match event.as_ref() {
115 HotShotEvent::UpgradeProposalRecv(proposal, sender) => {
116 tracing::info!("Received upgrade proposal: {proposal:?}");
117
118 let view = *proposal.data.view_number();
119
120 ensure!(
122 !self.upgraded(),
123 info!("Already upgraded to {upgrade:?}; not voting.")
124 );
125
126 let time = SystemTime::now()
127 .duration_since(SystemTime::UNIX_EPOCH)
128 .wrap()
129 .context(error!(
130 "Failed to calculate duration. This should never happen."
131 ))?
132 .as_secs();
133
134 ensure!(
135 time >= self.start_voting_time && time < self.stop_voting_time,
136 "Refusing to vote because we are no longer in the configured vote time window."
137 );
138
139 ensure!(
140 view >= self.start_voting_view && view < self.stop_voting_view,
141 "Refusing to vote because we are no longer in the configured vote view window."
142 );
143
144 ensure!(
146 proposal.data.upgrade_proposal.new_version_hash == *upgrade.hash()
147 && proposal.data.upgrade_proposal.old_version == upgrade.base
148 && proposal.data.upgrade_proposal.new_version == upgrade.target,
149 "Proposal does not match our upgrade target"
150 );
151
152 tracing::info!(
154 "Upgrade proposal received for view: {:?}",
155 proposal.data.view_number()
156 );
157
158 let epoch_upgrade_checks = if upgrade.target >= EPOCH_VERSION
159 && upgrade.base < EPOCH_VERSION
160 {
161 let consensus_reader = self.consensus.read().await;
162
163 let Some((_, last_proposal)) =
164 consensus_reader.last_proposals().last_key_value()
165 else {
166 tracing::error!(
167 "No recent quorum proposals in consensus state -- skipping upgrade \
168 proposal vote."
169 );
170 return Err(error!(
171 "No recent quorum proposals in consensus state -- skipping upgrade \
172 proposal vote."
173 ));
174 };
175
176 let last_proposal_view: u64 = *last_proposal.data.view_number();
177 let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
178
179 drop(consensus_reader);
180
181 let target_start_epoch =
182 epoch_from_block_number(self.epoch_start_block, self.epoch_height);
183 let last_proposal_epoch =
184 epoch_from_block_number(last_proposal_block, self.epoch_height);
185 let upgrade_finish_epoch = epoch_from_block_number(
186 last_proposal_block
187 + (*proposal.data.upgrade_proposal.new_version_first_view
188 - last_proposal_view)
189 + 10,
190 self.epoch_height,
191 );
192
193 target_start_epoch == last_proposal_epoch
194 && last_proposal_epoch == upgrade_finish_epoch
195 } else {
196 true
197 };
198
199 ensure!(
200 epoch_upgrade_checks,
201 error!("Epoch upgrade safety check failed! Refusing to vote on upgrade.")
202 );
203
204 let view = proposal.data.view_number();
205
206 ensure!(
222 self.cur_view != ViewNumber::genesis()
223 && *view >= self.cur_view.saturating_sub(1),
224 warn!(
225 "Discarding old upgrade proposal; the proposal is for view {view}, but \
226 the current view is {}.",
227 self.cur_view
228 )
229 );
230
231 let view_leader_key = self
233 .membership_coordinator
234 .membership_for_epoch(self.cur_epoch)?
235 .leader(view)?;
236 ensure!(
237 view_leader_key == *sender,
238 info!(
239 "Upgrade proposal doesn't have expected leader key for view {} \n Upgrade \
240 proposal is: {:?}",
241 *view, proposal.data
242 )
243 );
244
245 broadcast_event(
250 Event {
251 view_number: self.cur_view,
252 event: EventType::UpgradeProposal {
253 proposal: proposal.clone(),
254 sender: sender.clone(),
255 },
256 },
257 &self.output_event_stream,
258 )
259 .await;
260
261 let vote = UpgradeVote::create_signed_vote(
263 proposal.data.upgrade_proposal.clone(),
264 view,
265 &self.public_key,
266 &self.private_key,
267 &self.upgrade_lock,
268 )?;
269
270 tracing::debug!("Sending upgrade vote {:?}", vote.view_number());
271 broadcast_event(Arc::new(HotShotEvent::UpgradeVoteSend(vote)), &tx).await;
272 },
273 HotShotEvent::UpgradeVoteRecv(vote) => {
274 tracing::debug!("Upgrade vote recv, Main Task {:?}", vote.view_number());
275
276 let view = vote.view_number();
278 let epoch_membership = self
279 .membership_coordinator
280 .membership_for_epoch(self.cur_epoch)?;
281 ensure!(
282 epoch_membership.leader(view)? == self.public_key,
283 debug!(
284 "We are not the leader for view {} are we leader for next view? {}",
285 *view,
286 epoch_membership.leader(view + 1)? == self.public_key
287 )
288 );
289
290 handle_vote(
291 &mut self.vote_collectors,
292 vote,
293 self.public_key.clone(),
294 &epoch_membership,
295 self.id,
296 &event,
297 &tx,
298 &self.upgrade_lock,
299 EpochTransitionIndicator::NotInTransition,
300 )
301 .await?;
302 },
303 HotShotEvent::ViewChange(new_view, epoch_number) => {
304 if *epoch_number > self.cur_epoch {
305 self.cur_epoch = *epoch_number;
306 }
307 ensure!(self.cur_view < *new_view || *self.cur_view == 0);
308
309 self.cur_view = *new_view;
310
311 let view: u64 = *self.cur_view;
312 let time = SystemTime::now()
313 .duration_since(SystemTime::UNIX_EPOCH)
314 .wrap()
315 .context(error!(
316 "Failed to calculate duration. This should never happen."
317 ))?
318 .as_secs();
319
320 let leader = self
321 .membership_coordinator
322 .membership_for_epoch(self.cur_epoch)?
323 .leader(ViewNumber::new(
324 view + TYPES::UPGRADE_CONSTANTS.propose_offset,
325 ))?;
326
327 let old_version_last_view = view + TYPES::UPGRADE_CONSTANTS.begin_offset;
328 let new_version_first_view = view + TYPES::UPGRADE_CONSTANTS.finish_offset;
329 let decide_by = view + TYPES::UPGRADE_CONSTANTS.decide_by_offset;
330
331 let epoch_upgrade_checks = if upgrade.target >= EPOCH_VERSION
332 && upgrade.base < EPOCH_VERSION
333 {
334 let consensus_reader = self.consensus.read().await;
335
336 let Some((_, last_proposal)) =
337 consensus_reader.last_proposals().last_key_value()
338 else {
339 tracing::error!(
340 "No recent quorum proposals in consensus state -- skipping upgrade \
341 proposal."
342 );
343 return Err(error!(
344 "No recent quorum proposals in consensus state -- skipping upgrade \
345 proposal."
346 ));
347 };
348
349 let last_proposal_view: u64 = *last_proposal.data.view_number();
350 let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
351
352 drop(consensus_reader);
353
354 let target_start_epoch =
355 epoch_from_block_number(self.epoch_start_block, self.epoch_height);
356 let last_proposal_epoch =
357 epoch_from_block_number(last_proposal_block, self.epoch_height);
358 let upgrade_finish_epoch = epoch_from_block_number(
359 last_proposal_block + (new_version_first_view - last_proposal_view) + 10,
360 self.epoch_height,
361 );
362
363 target_start_epoch == last_proposal_epoch
364 && last_proposal_epoch == upgrade_finish_epoch
365 } else {
366 true
367 };
368
369 if view >= self.start_proposing_view
371 && view < self.stop_proposing_view
372 && time >= self.start_proposing_time
373 && time < self.stop_proposing_time
374 && !self.upgraded()
375 && epoch_upgrade_checks
376 && leader == self.public_key
377 {
378 let upgrade_proposal_data = UpgradeProposalData {
379 old_version: upgrade.base,
380 new_version: upgrade.target,
381 new_version_hash: upgrade.hash().into(),
382 old_version_last_view: ViewNumber::new(old_version_last_view),
383 new_version_first_view: ViewNumber::new(new_version_first_view),
384 decide_by: ViewNumber::new(decide_by),
385 };
386
387 let upgrade_proposal = UpgradeProposal {
388 upgrade_proposal: upgrade_proposal_data.clone(),
389 view_number: ViewNumber::new(
390 view + TYPES::UPGRADE_CONSTANTS.propose_offset,
391 ),
392 };
393
394 let signature = TYPES::SignatureKey::sign(
395 &self.private_key,
396 upgrade_proposal_data.commit().as_ref(),
397 )
398 .expect("Failed to sign upgrade proposal commitment!");
399
400 tracing::warn!("Sending upgrade proposal:\n\n {upgrade_proposal:?}");
401
402 let message = Proposal {
403 data: upgrade_proposal,
404 signature,
405 _pd: PhantomData,
406 };
407
408 broadcast_event(
409 Arc::new(HotShotEvent::UpgradeProposalSend(
410 message,
411 self.public_key.clone(),
412 )),
413 &tx,
414 )
415 .await;
416 }
417 },
418 _ => {},
419 }
420 Ok(())
421 }
422}
423
424#[async_trait]
426impl<TYPES: NodeType> TaskState for UpgradeTaskState<TYPES> {
427 type Event = HotShotEvent<TYPES>;
428
429 async fn handle_event(
430 &mut self,
431 event: Arc<Self::Event>,
432 sender: &Sender<Arc<Self::Event>>,
433 _receiver: &Receiver<Arc<Self::Event>>,
434 ) -> Result<()> {
435 self.handle(event, sender.clone()).await?;
436
437 Ok(())
438 }
439
440 fn cancel_subtasks(&mut self) {}
441}