1use std::{marker::PhantomData, sync::Arc, time::SystemTime};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14 consensus::OuterConsensus,
15 data::{EpochNumber, UpgradeProposal, ViewNumber},
16 epoch_membership::EpochMembershipCoordinator,
17 event::{Event, EventType},
18 message::{Proposal, UpgradeLock},
19 simple_certificate::UpgradeCertificate,
20 simple_vote::{UpgradeProposalData, UpgradeVote},
21 traits::{
22 block_contents::BlockHeader, node_implementation::NodeType, signature_key::SignatureKey,
23 },
24 utils::{EpochTransitionIndicator, epoch_from_block_number},
25 vote::HasViewNumber,
26};
27use hotshot_utils::anytrace::*;
28use tracing::instrument;
29use versions::EPOCH_VERSION;
30
31use crate::{
32 events::HotShotEvent,
33 helpers::broadcast_event,
34 vote_collection::{VoteCollectorsMap, handle_vote},
35};
36
37pub struct UpgradeTaskState<TYPES: NodeType> {
39 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
41
42 pub cur_view: ViewNumber,
44
45 pub cur_epoch: Option<EpochNumber>,
47
48 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
50
51 pub vote_collectors: VoteCollectorsMap<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>,
53
54 pub public_key: TYPES::SignatureKey,
56
57 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
59
60 pub id: u64,
62
63 pub epoch_start_block: u64,
65
66 pub start_proposing_view: u64,
68
69 pub stop_proposing_view: u64,
71
72 pub start_voting_view: u64,
74
75 pub stop_voting_view: u64,
77
78 pub start_proposing_time: u64,
80
81 pub stop_proposing_time: u64,
83
84 pub start_voting_time: u64,
86
87 pub stop_voting_time: u64,
89
90 pub upgrade_lock: UpgradeLock<TYPES>,
92
93 pub consensus: OuterConsensus<TYPES>,
95
96 pub epoch_height: u64,
98}
99
100impl<TYPES: NodeType> UpgradeTaskState<TYPES> {
101 async fn upgraded(&self) -> bool {
103 self.upgrade_lock.decided_upgrade_cert().is_some()
104 }
105
106 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Upgrade Task", level = "error")]
108 pub async fn handle(
109 &mut self,
110 event: Arc<HotShotEvent<TYPES>>,
111 tx: Sender<Arc<HotShotEvent<TYPES>>>,
112 ) -> Result<()> {
113 let upgrade = self.upgrade_lock.upgrade();
114 match event.as_ref() {
115 HotShotEvent::UpgradeProposalRecv(proposal, sender) => {
116 tracing::info!("Received upgrade proposal: {proposal:?}");
117
118 let view = *proposal.data.view_number();
119
120 ensure!(
122 !self.upgraded().await,
123 info!("Already upgraded to {upgrade:?}; not voting.")
124 );
125
126 let time = SystemTime::now()
127 .duration_since(SystemTime::UNIX_EPOCH)
128 .wrap()
129 .context(error!(
130 "Failed to calculate duration. This should never happen."
131 ))?
132 .as_secs();
133
134 ensure!(
135 time >= self.start_voting_time && time < self.stop_voting_time,
136 "Refusing to vote because we are no longer in the configured vote time window."
137 );
138
139 ensure!(
140 view >= self.start_voting_view && view < self.stop_voting_view,
141 "Refusing to vote because we are no longer in the configured vote view window."
142 );
143
144 ensure!(
146 proposal.data.upgrade_proposal.new_version_hash == *upgrade.hash()
147 && proposal.data.upgrade_proposal.old_version == upgrade.base
148 && proposal.data.upgrade_proposal.new_version == upgrade.target,
149 "Proposal does not match our upgrade target"
150 );
151
152 tracing::info!(
154 "Upgrade proposal received for view: {:?}",
155 proposal.data.view_number()
156 );
157
158 let epoch_upgrade_checks = if upgrade.target >= EPOCH_VERSION
159 && upgrade.base < EPOCH_VERSION
160 {
161 let consensus_reader = self.consensus.read().await;
162
163 let Some((_, last_proposal)) =
164 consensus_reader.last_proposals().last_key_value()
165 else {
166 tracing::error!(
167 "No recent quorum proposals in consensus state -- skipping upgrade \
168 proposal vote."
169 );
170 return Err(error!(
171 "No recent quorum proposals in consensus state -- skipping upgrade \
172 proposal vote."
173 ));
174 };
175
176 let last_proposal_view: u64 = *last_proposal.data.view_number();
177 let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
178
179 drop(consensus_reader);
180
181 let target_start_epoch =
182 epoch_from_block_number(self.epoch_start_block, self.epoch_height);
183 let last_proposal_epoch =
184 epoch_from_block_number(last_proposal_block, self.epoch_height);
185 let upgrade_finish_epoch = epoch_from_block_number(
186 last_proposal_block
187 + (*proposal.data.upgrade_proposal.new_version_first_view
188 - last_proposal_view)
189 + 10,
190 self.epoch_height,
191 );
192
193 target_start_epoch == last_proposal_epoch
194 && last_proposal_epoch == upgrade_finish_epoch
195 } else {
196 true
197 };
198
199 ensure!(
200 epoch_upgrade_checks,
201 error!("Epoch upgrade safety check failed! Refusing to vote on upgrade.")
202 );
203
204 let view = proposal.data.view_number();
205
206 ensure!(
222 self.cur_view != ViewNumber::genesis()
223 && *view >= self.cur_view.saturating_sub(1),
224 warn!(
225 "Discarding old upgrade proposal; the proposal is for view {view}, but \
226 the current view is {}.",
227 self.cur_view
228 )
229 );
230
231 let view_leader_key = self
233 .membership_coordinator
234 .membership_for_epoch(self.cur_epoch)
235 .await?
236 .leader(view)
237 .await?;
238 ensure!(
239 view_leader_key == *sender,
240 info!(
241 "Upgrade proposal doesn't have expected leader key for view {} \n Upgrade \
242 proposal is: {:?}",
243 *view, proposal.data
244 )
245 );
246
247 broadcast_event(
252 Event {
253 view_number: self.cur_view,
254 event: EventType::UpgradeProposal {
255 proposal: proposal.clone(),
256 sender: sender.clone(),
257 },
258 },
259 &self.output_event_stream,
260 )
261 .await;
262
263 let vote = UpgradeVote::create_signed_vote(
265 proposal.data.upgrade_proposal.clone(),
266 view,
267 &self.public_key,
268 &self.private_key,
269 &self.upgrade_lock,
270 )?;
271
272 tracing::debug!("Sending upgrade vote {:?}", vote.view_number());
273 broadcast_event(Arc::new(HotShotEvent::UpgradeVoteSend(vote)), &tx).await;
274 },
275 HotShotEvent::UpgradeVoteRecv(vote) => {
276 tracing::debug!("Upgrade vote recv, Main Task {:?}", vote.view_number());
277
278 let view = vote.view_number();
280 let epoch_membership = self
281 .membership_coordinator
282 .membership_for_epoch(self.cur_epoch)
283 .await?;
284 ensure!(
285 epoch_membership.leader(view).await? == self.public_key,
286 debug!(
287 "We are not the leader for view {} are we leader for next view? {}",
288 *view,
289 epoch_membership.leader(view + 1).await? == self.public_key
290 )
291 );
292
293 handle_vote(
294 &mut self.vote_collectors,
295 vote,
296 self.public_key.clone(),
297 &epoch_membership,
298 self.id,
299 &event,
300 &tx,
301 &self.upgrade_lock,
302 EpochTransitionIndicator::NotInTransition,
303 )
304 .await?;
305 },
306 HotShotEvent::ViewChange(new_view, epoch_number) => {
307 if *epoch_number > self.cur_epoch {
308 self.cur_epoch = *epoch_number;
309 }
310 ensure!(self.cur_view < *new_view || *self.cur_view == 0);
311
312 self.cur_view = *new_view;
313
314 let view: u64 = *self.cur_view;
315 let time = SystemTime::now()
316 .duration_since(SystemTime::UNIX_EPOCH)
317 .wrap()
318 .context(error!(
319 "Failed to calculate duration. This should never happen."
320 ))?
321 .as_secs();
322
323 let leader = self
324 .membership_coordinator
325 .membership_for_epoch(self.cur_epoch)
326 .await?
327 .leader(ViewNumber::new(
328 view + TYPES::UPGRADE_CONSTANTS.propose_offset,
329 ))
330 .await?;
331
332 let old_version_last_view = view + TYPES::UPGRADE_CONSTANTS.begin_offset;
333 let new_version_first_view = view + TYPES::UPGRADE_CONSTANTS.finish_offset;
334 let decide_by = view + TYPES::UPGRADE_CONSTANTS.decide_by_offset;
335
336 let epoch_upgrade_checks = if upgrade.target >= EPOCH_VERSION
337 && upgrade.base < EPOCH_VERSION
338 {
339 let consensus_reader = self.consensus.read().await;
340
341 let Some((_, last_proposal)) =
342 consensus_reader.last_proposals().last_key_value()
343 else {
344 tracing::error!(
345 "No recent quorum proposals in consensus state -- skipping upgrade \
346 proposal."
347 );
348 return Err(error!(
349 "No recent quorum proposals in consensus state -- skipping upgrade \
350 proposal."
351 ));
352 };
353
354 let last_proposal_view: u64 = *last_proposal.data.view_number();
355 let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
356
357 drop(consensus_reader);
358
359 let target_start_epoch =
360 epoch_from_block_number(self.epoch_start_block, self.epoch_height);
361 let last_proposal_epoch =
362 epoch_from_block_number(last_proposal_block, self.epoch_height);
363 let upgrade_finish_epoch = epoch_from_block_number(
364 last_proposal_block + (new_version_first_view - last_proposal_view) + 10,
365 self.epoch_height,
366 );
367
368 target_start_epoch == last_proposal_epoch
369 && last_proposal_epoch == upgrade_finish_epoch
370 } else {
371 true
372 };
373
374 if view >= self.start_proposing_view
376 && view < self.stop_proposing_view
377 && time >= self.start_proposing_time
378 && time < self.stop_proposing_time
379 && !self.upgraded().await
380 && epoch_upgrade_checks
381 && leader == self.public_key
382 {
383 let upgrade_proposal_data = UpgradeProposalData {
384 old_version: upgrade.base,
385 new_version: upgrade.target,
386 new_version_hash: upgrade.hash().into(),
387 old_version_last_view: ViewNumber::new(old_version_last_view),
388 new_version_first_view: ViewNumber::new(new_version_first_view),
389 decide_by: ViewNumber::new(decide_by),
390 };
391
392 let upgrade_proposal = UpgradeProposal {
393 upgrade_proposal: upgrade_proposal_data.clone(),
394 view_number: ViewNumber::new(
395 view + TYPES::UPGRADE_CONSTANTS.propose_offset,
396 ),
397 };
398
399 let signature = TYPES::SignatureKey::sign(
400 &self.private_key,
401 upgrade_proposal_data.commit().as_ref(),
402 )
403 .expect("Failed to sign upgrade proposal commitment!");
404
405 tracing::warn!("Sending upgrade proposal:\n\n {upgrade_proposal:?}");
406
407 let message = Proposal {
408 data: upgrade_proposal,
409 signature,
410 _pd: PhantomData,
411 };
412
413 broadcast_event(
414 Arc::new(HotShotEvent::UpgradeProposalSend(
415 message,
416 self.public_key.clone(),
417 )),
418 &tx,
419 )
420 .await;
421 }
422 },
423 _ => {},
424 }
425 Ok(())
426 }
427}
428
429#[async_trait]
431impl<TYPES: NodeType> TaskState for UpgradeTaskState<TYPES> {
432 type Event = HotShotEvent<TYPES>;
433
434 async fn handle_event(
435 &mut self,
436 event: Arc<Self::Event>,
437 sender: &Sender<Arc<Self::Event>>,
438 _receiver: &Receiver<Arc<Self::Event>>,
439 ) -> Result<()> {
440 self.handle(event, sender.clone()).await?;
441
442 Ok(())
443 }
444
445 fn cancel_subtasks(&mut self) {}
446}