hotshot_task_impls/
upgrade.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{marker::PhantomData, sync::Arc, time::SystemTime};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use committable::Committable;
12use hotshot_task::task::TaskState;
13use hotshot_types::{
14    consensus::OuterConsensus,
15    data::{EpochNumber, UpgradeProposal, ViewNumber},
16    epoch_membership::EpochMembershipCoordinator,
17    event::{Event, EventType},
18    message::{Proposal, UpgradeLock},
19    simple_certificate::UpgradeCertificate,
20    simple_vote::{UpgradeProposalData, UpgradeVote},
21    traits::{
22        block_contents::BlockHeader, node_implementation::NodeType, signature_key::SignatureKey,
23    },
24    utils::{EpochTransitionIndicator, epoch_from_block_number},
25    vote::HasViewNumber,
26};
27use hotshot_utils::anytrace::*;
28use tracing::instrument;
29use versions::EPOCH_VERSION;
30
31use crate::{
32    events::HotShotEvent,
33    helpers::broadcast_event,
34    vote_collection::{VoteCollectorsMap, handle_vote},
35};
36
37/// Tracks state of an upgrade task
38pub struct UpgradeTaskState<TYPES: NodeType> {
39    /// Output events to application
40    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
41
42    /// View number this view is executing in.
43    pub cur_view: ViewNumber,
44
45    /// Epoch number this node is executing in.
46    pub cur_epoch: Option<EpochNumber>,
47
48    /// Membership for Quorum Certs/votes
49    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
50
51    /// A map of `UpgradeVote` collector tasks
52    pub vote_collectors: VoteCollectorsMap<TYPES, UpgradeVote<TYPES>, UpgradeCertificate<TYPES>>,
53
54    /// This Nodes public key
55    pub public_key: TYPES::SignatureKey,
56
57    /// This Nodes private key
58    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
59
60    /// This state's ID
61    pub id: u64,
62
63    /// Target block for the epoch upgrade
64    pub epoch_start_block: u64,
65
66    /// View to start proposing an upgrade
67    pub start_proposing_view: u64,
68
69    /// View to stop proposing an upgrade
70    pub stop_proposing_view: u64,
71
72    /// View to start voting on an upgrade
73    pub start_voting_view: u64,
74
75    /// View to stop voting on an upgrade
76    pub stop_voting_view: u64,
77
78    /// Unix time in seconds at which we start proposing an upgrade
79    pub start_proposing_time: u64,
80
81    /// Unix time in seconds at which we stop proposing an upgrade
82    pub stop_proposing_time: u64,
83
84    /// Unix time in seconds at which we start voting on an upgrade
85    pub start_voting_time: u64,
86
87    /// Unix time in seconds at which we stop voting on an upgrade
88    pub stop_voting_time: u64,
89
90    /// Lock for a decided upgrade
91    pub upgrade_lock: UpgradeLock<TYPES>,
92
93    /// Reference to consensus. The replica will require a write lock on this.
94    pub consensus: OuterConsensus<TYPES>,
95
96    /// Number of blocks in an epoch, zero means there are no epochs
97    pub epoch_height: u64,
98}
99
100impl<TYPES: NodeType> UpgradeTaskState<TYPES> {
101    /// Check if we have decided on an upgrade certificate
102    async fn upgraded(&self) -> bool {
103        self.upgrade_lock.decided_upgrade_cert().is_some()
104    }
105
106    /// main task event handler
107    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Upgrade Task", level = "error")]
108    pub async fn handle(
109        &mut self,
110        event: Arc<HotShotEvent<TYPES>>,
111        tx: Sender<Arc<HotShotEvent<TYPES>>>,
112    ) -> Result<()> {
113        let upgrade = self.upgrade_lock.upgrade();
114        match event.as_ref() {
115            HotShotEvent::UpgradeProposalRecv(proposal, sender) => {
116                tracing::info!("Received upgrade proposal: {proposal:?}");
117
118                let view = *proposal.data.view_number();
119
120                // Skip voting if the version has already been upgraded.
121                ensure!(
122                    !self.upgraded().await,
123                    info!("Already upgraded to {upgrade:?}; not voting.")
124                );
125
126                let time = SystemTime::now()
127                    .duration_since(SystemTime::UNIX_EPOCH)
128                    .wrap()
129                    .context(error!(
130                        "Failed to calculate duration. This should never happen."
131                    ))?
132                    .as_secs();
133
134                ensure!(
135                    time >= self.start_voting_time && time < self.stop_voting_time,
136                    "Refusing to vote because we are no longer in the configured vote time window."
137                );
138
139                ensure!(
140                    view >= self.start_voting_view && view < self.stop_voting_view,
141                    "Refusing to vote because we are no longer in the configured vote view window."
142                );
143
144                // If the proposal does not match our upgrade target, we immediately exit.
145                ensure!(
146                    proposal.data.upgrade_proposal.new_version_hash == *upgrade.hash()
147                        && proposal.data.upgrade_proposal.old_version == upgrade.base
148                        && proposal.data.upgrade_proposal.new_version == upgrade.target,
149                    "Proposal does not match our upgrade target"
150                );
151
152                // If we have an upgrade target, we validate that the proposal is relevant for the current view.
153                tracing::info!(
154                    "Upgrade proposal received for view: {:?}",
155                    proposal.data.view_number()
156                );
157
158                let epoch_upgrade_checks = if upgrade.target >= EPOCH_VERSION
159                    && upgrade.base < EPOCH_VERSION
160                {
161                    let consensus_reader = self.consensus.read().await;
162
163                    let Some((_, last_proposal)) =
164                        consensus_reader.last_proposals().last_key_value()
165                    else {
166                        tracing::error!(
167                            "No recent quorum proposals in consensus state -- skipping upgrade \
168                             proposal vote."
169                        );
170                        return Err(error!(
171                            "No recent quorum proposals in consensus state -- skipping upgrade \
172                             proposal vote."
173                        ));
174                    };
175
176                    let last_proposal_view: u64 = *last_proposal.data.view_number();
177                    let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
178
179                    drop(consensus_reader);
180
181                    let target_start_epoch =
182                        epoch_from_block_number(self.epoch_start_block, self.epoch_height);
183                    let last_proposal_epoch =
184                        epoch_from_block_number(last_proposal_block, self.epoch_height);
185                    let upgrade_finish_epoch = epoch_from_block_number(
186                        last_proposal_block
187                            + (*proposal.data.upgrade_proposal.new_version_first_view
188                                - last_proposal_view)
189                            + 10,
190                        self.epoch_height,
191                    );
192
193                    target_start_epoch == last_proposal_epoch
194                        && last_proposal_epoch == upgrade_finish_epoch
195                } else {
196                    true
197                };
198
199                ensure!(
200                    epoch_upgrade_checks,
201                    error!("Epoch upgrade safety check failed! Refusing to vote on upgrade.")
202                );
203
204                let view = proposal.data.view_number();
205
206                // At this point, we could choose to validate
207                // that the proposal was issued by the correct leader
208                // for the indicated view.
209                //
210                // We choose not to, because we don't gain that much from it.
211                // The certificate itself is only useful to the leader for that view anyway,
212                // and from the node's perspective it doesn't matter who the sender is.
213                // All we'd save is the cost of signing the vote, and we'd lose some flexibility.
214
215                // Allow an upgrade proposal that is one view older, in case we have voted on a quorum
216                // proposal and updated the view.
217                // `self.cur_view` should be at least 1 since there is a view change before getting
218                // the `UpgradeProposalRecv` event. Otherwise, the view number subtraction below will
219                // cause an overflow error.
220                // TODO Come back to this - we probably don't need this, but we should also never receive a UpgradeCertificate where this fails, investigate block ready so it doesn't make one for the genesis block
221                ensure!(
222                    self.cur_view != ViewNumber::genesis()
223                        && *view >= self.cur_view.saturating_sub(1),
224                    warn!(
225                        "Discarding old upgrade proposal; the proposal is for view {view}, but \
226                         the current view is {}.",
227                        self.cur_view
228                    )
229                );
230
231                // We then validate that the proposal was issued by the leader for the view.
232                let view_leader_key = self
233                    .membership_coordinator
234                    .membership_for_epoch(self.cur_epoch)
235                    .await?
236                    .leader(view)
237                    .await?;
238                ensure!(
239                    view_leader_key == *sender,
240                    info!(
241                        "Upgrade proposal doesn't have expected leader key for view {} \n Upgrade \
242                         proposal is: {:?}",
243                        *view, proposal.data
244                    )
245                );
246
247                // At this point, we've checked that:
248                //   * the proposal was expected,
249                //   * the proposal is valid, and
250                // so we notify the application layer
251                broadcast_event(
252                    Event {
253                        view_number: self.cur_view,
254                        event: EventType::UpgradeProposal {
255                            proposal: proposal.clone(),
256                            sender: sender.clone(),
257                        },
258                    },
259                    &self.output_event_stream,
260                )
261                .await;
262
263                // If everything is fine up to here, we generate and send a vote on the proposal.
264                let vote = UpgradeVote::create_signed_vote(
265                    proposal.data.upgrade_proposal.clone(),
266                    view,
267                    &self.public_key,
268                    &self.private_key,
269                    &self.upgrade_lock,
270                )?;
271
272                tracing::debug!("Sending upgrade vote {:?}", vote.view_number());
273                broadcast_event(Arc::new(HotShotEvent::UpgradeVoteSend(vote)), &tx).await;
274            },
275            HotShotEvent::UpgradeVoteRecv(vote) => {
276                tracing::debug!("Upgrade vote recv, Main Task {:?}", vote.view_number());
277
278                // Check if we are the leader.
279                let view = vote.view_number();
280                let epoch_membership = self
281                    .membership_coordinator
282                    .membership_for_epoch(self.cur_epoch)
283                    .await?;
284                ensure!(
285                    epoch_membership.leader(view).await? == self.public_key,
286                    debug!(
287                        "We are not the leader for view {} are we leader for next view? {}",
288                        *view,
289                        epoch_membership.leader(view + 1).await? == self.public_key
290                    )
291                );
292
293                handle_vote(
294                    &mut self.vote_collectors,
295                    vote,
296                    self.public_key.clone(),
297                    &epoch_membership,
298                    self.id,
299                    &event,
300                    &tx,
301                    &self.upgrade_lock,
302                    EpochTransitionIndicator::NotInTransition,
303                )
304                .await?;
305            },
306            HotShotEvent::ViewChange(new_view, epoch_number) => {
307                if *epoch_number > self.cur_epoch {
308                    self.cur_epoch = *epoch_number;
309                }
310                ensure!(self.cur_view < *new_view || *self.cur_view == 0);
311
312                self.cur_view = *new_view;
313
314                let view: u64 = *self.cur_view;
315                let time = SystemTime::now()
316                    .duration_since(SystemTime::UNIX_EPOCH)
317                    .wrap()
318                    .context(error!(
319                        "Failed to calculate duration. This should never happen."
320                    ))?
321                    .as_secs();
322
323                let leader = self
324                    .membership_coordinator
325                    .membership_for_epoch(self.cur_epoch)
326                    .await?
327                    .leader(ViewNumber::new(
328                        view + TYPES::UPGRADE_CONSTANTS.propose_offset,
329                    ))
330                    .await?;
331
332                let old_version_last_view = view + TYPES::UPGRADE_CONSTANTS.begin_offset;
333                let new_version_first_view = view + TYPES::UPGRADE_CONSTANTS.finish_offset;
334                let decide_by = view + TYPES::UPGRADE_CONSTANTS.decide_by_offset;
335
336                let epoch_upgrade_checks = if upgrade.target >= EPOCH_VERSION
337                    && upgrade.base < EPOCH_VERSION
338                {
339                    let consensus_reader = self.consensus.read().await;
340
341                    let Some((_, last_proposal)) =
342                        consensus_reader.last_proposals().last_key_value()
343                    else {
344                        tracing::error!(
345                            "No recent quorum proposals in consensus state -- skipping upgrade \
346                             proposal."
347                        );
348                        return Err(error!(
349                            "No recent quorum proposals in consensus state -- skipping upgrade \
350                             proposal."
351                        ));
352                    };
353
354                    let last_proposal_view: u64 = *last_proposal.data.view_number();
355                    let last_proposal_block: u64 = last_proposal.data.block_header().block_number();
356
357                    drop(consensus_reader);
358
359                    let target_start_epoch =
360                        epoch_from_block_number(self.epoch_start_block, self.epoch_height);
361                    let last_proposal_epoch =
362                        epoch_from_block_number(last_proposal_block, self.epoch_height);
363                    let upgrade_finish_epoch = epoch_from_block_number(
364                        last_proposal_block + (new_version_first_view - last_proposal_view) + 10,
365                        self.epoch_height,
366                    );
367
368                    target_start_epoch == last_proposal_epoch
369                        && last_proposal_epoch == upgrade_finish_epoch
370                } else {
371                    true
372                };
373
374                // We try to form a certificate 5 views before we're leader.
375                if view >= self.start_proposing_view
376                    && view < self.stop_proposing_view
377                    && time >= self.start_proposing_time
378                    && time < self.stop_proposing_time
379                    && !self.upgraded().await
380                    && epoch_upgrade_checks
381                    && leader == self.public_key
382                {
383                    let upgrade_proposal_data = UpgradeProposalData {
384                        old_version: upgrade.base,
385                        new_version: upgrade.target,
386                        new_version_hash: upgrade.hash().into(),
387                        old_version_last_view: ViewNumber::new(old_version_last_view),
388                        new_version_first_view: ViewNumber::new(new_version_first_view),
389                        decide_by: ViewNumber::new(decide_by),
390                    };
391
392                    let upgrade_proposal = UpgradeProposal {
393                        upgrade_proposal: upgrade_proposal_data.clone(),
394                        view_number: ViewNumber::new(
395                            view + TYPES::UPGRADE_CONSTANTS.propose_offset,
396                        ),
397                    };
398
399                    let signature = TYPES::SignatureKey::sign(
400                        &self.private_key,
401                        upgrade_proposal_data.commit().as_ref(),
402                    )
403                    .expect("Failed to sign upgrade proposal commitment!");
404
405                    tracing::warn!("Sending upgrade proposal:\n\n {upgrade_proposal:?}");
406
407                    let message = Proposal {
408                        data: upgrade_proposal,
409                        signature,
410                        _pd: PhantomData,
411                    };
412
413                    broadcast_event(
414                        Arc::new(HotShotEvent::UpgradeProposalSend(
415                            message,
416                            self.public_key.clone(),
417                        )),
418                        &tx,
419                    )
420                    .await;
421                }
422            },
423            _ => {},
424        }
425        Ok(())
426    }
427}
428
429/// task state implementation for the upgrade task
430#[async_trait]
431impl<TYPES: NodeType> TaskState for UpgradeTaskState<TYPES> {
432    type Event = HotShotEvent<TYPES>;
433
434    async fn handle_event(
435        &mut self,
436        event: Arc<Self::Event>,
437        sender: &Sender<Arc<Self::Event>>,
438        _receiver: &Receiver<Arc<Self::Event>>,
439    ) -> Result<()> {
440        self.handle(event, sender.clone()).await?;
441
442        Ok(())
443    }
444
445    fn cancel_subtasks(&mut self) {}
446}