hotshot_task_impls/
da.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{marker::PhantomData, sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13    consensus::{Consensus, OuterConsensus, PayloadWithMetadata},
14    data::{
15        DaProposal2, EpochNumber, PackedBundle, ViewNumber, vid_commitment,
16        vid_disperse::vid_total_weight,
17    },
18    epoch_membership::EpochMembershipCoordinator,
19    event::{Event, EventType},
20    message::{Proposal, UpgradeLock},
21    simple_certificate::DaCertificate2,
22    simple_vote::{DaData2, DaVote2},
23    storage_metrics::StorageMetricsValue,
24    traits::{
25        BlockPayload, EncodeBytes,
26        network::ConnectedNetwork,
27        node_implementation::{NodeImplementation, NodeType},
28        signature_key::SignatureKey,
29        storage::Storage,
30    },
31    utils::{EpochTransitionIndicator, epoch_from_block_number, is_ge_epoch_root, is_last_block},
32    vote::HasViewNumber,
33};
34use hotshot_utils::anytrace::*;
35use sha2::{Digest, Sha256};
36use tokio::{spawn, task::spawn_blocking};
37use tracing::instrument;
38
39use crate::{
40    events::HotShotEvent,
41    helpers::broadcast_event,
42    vote_collection::{VoteCollectorsMap, handle_vote},
43};
44
45/// Tracks state of a DA task
46pub struct DaTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
47    /// Output events to application
48    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
49
50    /// View number this view is executing in.
51    pub cur_view: ViewNumber,
52
53    /// Epoch number this node is executing in.
54    pub cur_epoch: Option<EpochNumber>,
55
56    /// Reference to consensus. Leader will require a read lock on this.
57    pub consensus: OuterConsensus<TYPES>,
58
59    /// Membership for the DA committee and quorum committee.
60    /// We need the latter only for calculating the proper VID scheme
61    /// from the number of nodes in the quorum.
62    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
63
64    /// The underlying network
65    pub network: Arc<I::Network>,
66
67    /// A map of `DaVote` collector tasks.
68    pub vote_collectors: VoteCollectorsMap<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>,
69
70    /// This Nodes public key
71    pub public_key: TYPES::SignatureKey,
72
73    /// This Nodes private key
74    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
75
76    /// This state's ID
77    pub id: u64,
78
79    /// This node's storage ref
80    pub storage: I::Storage,
81
82    /// Storage metrics
83    pub storage_metrics: Arc<StorageMetricsValue>,
84
85    /// Lock for a decided upgrade
86    pub upgrade_lock: UpgradeLock<TYPES>,
87}
88
89impl<TYPES: NodeType, I: NodeImplementation<TYPES>> DaTaskState<TYPES, I> {
90    /// main task event handler
91    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "DA Main Task", level = "error", target = "DaTaskState")]
92    pub async fn handle(
93        &mut self,
94        event: Arc<HotShotEvent<TYPES>>,
95        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
96    ) -> Result<()> {
97        match event.as_ref() {
98            HotShotEvent::DaProposalRecv(proposal, sender) => {
99                let sender = sender.clone();
100                tracing::debug!(
101                    "DA proposal received for view: {}",
102                    proposal.data.view_number()
103                );
104                // ED NOTE: Assuming that the next view leader is the one who sends DA proposal for this view
105                let view = proposal.data.view_number();
106
107                // Allow a DA proposal that is one view older, in case we have voted on a quorum
108                // proposal and updated the view.
109                //
110                // Anything older is discarded because it is no longer relevant.
111                ensure!(
112                    self.cur_view <= view + 1,
113                    "Throwing away DA proposal that is more than one view older"
114                );
115
116                if let Some(entry) = self.consensus.read().await.saved_payloads().get(&view) {
117                    ensure!(
118                        entry.payload.encode() == proposal.data.encoded_transactions,
119                        "Received DA proposal for view {view} but we already have a payload for \
120                         that view and they are not identical.  Throwing it away",
121                    );
122                }
123
124                let encoded_transactions_hash = Sha256::digest(&proposal.data.encoded_transactions);
125                let view_leader_key = self
126                    .membership_coordinator
127                    .membership_for_epoch(proposal.data.epoch)
128                    .await
129                    .context(warn!("No stake table for epoch {:?}", proposal.data.epoch))?
130                    .leader(view)
131                    .await?;
132                ensure!(
133                    view_leader_key == sender,
134                    warn!(
135                        "DA proposal doesn't have expected leader key for view {} \n DA proposal \
136                         is: {:?}",
137                        *view,
138                        proposal.data.clone()
139                    )
140                );
141
142                ensure!(
143                    view_leader_key.validate(&proposal.signature, &encoded_transactions_hash),
144                    warn!("Could not verify proposal.")
145                );
146
147                broadcast_event(
148                    Arc::new(HotShotEvent::DaProposalValidated(proposal.clone(), sender)),
149                    &event_stream,
150                )
151                .await;
152            },
153            HotShotEvent::DaProposalValidated(proposal, sender) => {
154                tracing::debug!(
155                    "DA proposal validated for view {}",
156                    proposal.data.view_number()
157                );
158                let cur_view = self.consensus.read().await.cur_view();
159                let view_number = proposal.data.view_number();
160                let epoch_number = proposal.data.epoch;
161                let membership = self
162                    .membership_coordinator
163                    .stake_table_for_epoch(epoch_number)
164                    .await
165                    .context(warn!("No stake table for epoch"))?;
166
167                ensure!(
168                    cur_view <= view_number + 1,
169                    debug!(
170                        "Validated DA proposal for prior view but it's too old now Current view \
171                         {cur_view}, DA Proposal view {}",
172                        proposal.data.view_number()
173                    )
174                );
175
176                // Proposal is fresh and valid, notify the application layer
177                broadcast_event(
178                    Event {
179                        view_number,
180                        event: EventType::DaProposal {
181                            proposal: proposal.clone(),
182                            sender: sender.clone(),
183                        },
184                    },
185                    &self.output_event_stream,
186                )
187                .await;
188
189                ensure!(
190                    membership.has_da_stake(&self.public_key).await,
191                    debug!(
192                        "We were not chosen for consensus committee for view {view_number} in \
193                         epoch {epoch_number:?}"
194                    )
195                );
196                let total_weight =
197                    vid_total_weight::<TYPES>(&membership.stake_table().await, epoch_number);
198
199                let version = self.upgrade_lock.version_infallible(view_number);
200
201                let txns = Arc::clone(&proposal.data.encoded_transactions);
202                let txns_clone = Arc::clone(&txns);
203                let metadata = proposal.data.metadata.encode();
204                let metadata_clone = metadata.clone();
205                let payload_commitment =
206                    spawn_blocking(move || vid_commitment(&txns, &metadata, total_weight, version))
207                        .await;
208                let payload_commitment = payload_commitment.unwrap();
209                let next_epoch_payload_commitment = if matches!(
210                    proposal.data.epoch_transition_indicator,
211                    EpochTransitionIndicator::InTransition
212                ) && self
213                    .upgrade_lock
214                    .epochs_enabled(proposal.data.view_number())
215                    && epoch_number.is_some()
216                {
217                    let next_epoch_total_weight = vid_total_weight::<TYPES>(
218                        &membership
219                            .next_epoch_stake_table()
220                            .await?
221                            .stake_table()
222                            .await,
223                        epoch_number.map(|epoch| epoch + 1),
224                    );
225
226                    let commit_result = spawn_blocking(move || {
227                        vid_commitment(
228                            &txns_clone,
229                            &metadata_clone,
230                            next_epoch_total_weight,
231                            version,
232                        )
233                    })
234                    .await;
235                    Some(commit_result.unwrap())
236                } else {
237                    None
238                };
239
240                let now = Instant::now();
241                self.storage
242                    .append_da2(proposal, payload_commitment)
243                    .await
244                    .wrap()
245                    .context(error!("Failed to append DA proposal to storage"))?;
246                self.storage_metrics
247                    .append_da_duration
248                    .add_point(now.elapsed().as_secs_f64());
249
250                // Generate and send vote
251                let vote = DaVote2::create_signed_vote(
252                    DaData2 {
253                        payload_commit: payload_commitment,
254                        next_epoch_payload_commit: next_epoch_payload_commitment,
255                        epoch: epoch_number,
256                    },
257                    view_number,
258                    &self.public_key,
259                    &self.private_key,
260                    &self.upgrade_lock,
261                )?;
262
263                tracing::debug!("Sending vote to the DA leader {}", vote.view_number());
264
265                broadcast_event(Arc::new(HotShotEvent::DaVoteSend(vote)), &event_stream).await;
266                let mut consensus_writer = self.consensus.write().await;
267
268                // Ensure this view is in the view map for garbage collection.
269
270                if let Err(e) =
271                    consensus_writer.update_da_view(view_number, epoch_number, payload_commitment)
272                {
273                    tracing::trace!("{e:?}");
274                }
275
276                let payload_with_metadata = Arc::new(PayloadWithMetadata {
277                    payload: TYPES::BlockPayload::from_bytes(
278                        proposal.data.encoded_transactions.as_ref(),
279                        &proposal.data.metadata,
280                    ),
281                    metadata: proposal.data.metadata.clone(),
282                });
283
284                // Record the payload we have promised to make available.
285                if let Err(e) =
286                    consensus_writer.update_saved_payloads(view_number, payload_with_metadata)
287                {
288                    tracing::trace!("{e:?}");
289                }
290                drop(consensus_writer);
291
292                // Optimistically calculate and update VID if we know that the primary network is down.
293                if self.network.is_primary_down() {
294                    let my_id = self.id;
295                    let consensus =
296                        OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
297                    let pk = self.private_key.clone();
298                    let public_key = self.public_key.clone();
299                    let chan = event_stream.clone();
300                    let upgrade_lock = self.upgrade_lock.clone();
301                    let next_epoch = epoch_number.map(|epoch| epoch + 1);
302
303                    let mut target_epochs = vec![];
304                    if membership.has_stake(&public_key).await {
305                        target_epochs.push(epoch_number);
306                    }
307                    if membership
308                        .next_epoch_stake_table()
309                        .await?
310                        .has_stake(&public_key)
311                        .await
312                    {
313                        target_epochs.push(next_epoch);
314                    }
315                    if target_epochs.is_empty() {
316                        bail!(
317                            "Not calculating VID, the node doesn't belong to the current epoch or \
318                             the next epoch."
319                        );
320                    };
321
322                    tracing::debug!(
323                        "Primary network is down. Optimistically calculate own VID share."
324                    );
325                    let membership = membership.clone();
326                    spawn(async move {
327                        for target_epoch in target_epochs {
328                            Consensus::calculate_and_update_vid(
329                                OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
330                                view_number,
331                                target_epoch,
332                                membership.coordinator.clone(),
333                                &pk,
334                                &upgrade_lock,
335                            )
336                            .await;
337                            if let Some(vid_share) = consensus
338                                .read()
339                                .await
340                                .vid_shares()
341                                .get(&view_number)
342                                .and_then(|key_map| key_map.get(&public_key))
343                                .and_then(|epoch_map| epoch_map.get(&target_epoch))
344                            {
345                                tracing::debug!(
346                                    "Primary network is down. Calculated own VID share for epoch \
347                                     {target_epoch:?}, my id {my_id}"
348                                );
349                                broadcast_event(
350                                    Arc::new(HotShotEvent::VidShareRecv(
351                                        public_key.clone(),
352                                        vid_share.clone(),
353                                    )),
354                                    &chan,
355                                )
356                                .await;
357                            }
358                        }
359                    });
360                }
361            },
362            HotShotEvent::DaVoteRecv(vote) => {
363                tracing::debug!("DA vote recv, Main Task {}", vote.view_number());
364                // Check if we are the leader and the vote is from the sender.
365                let view = vote.view_number();
366                let epoch = vote.data.epoch;
367                let membership = self
368                    .membership_coordinator
369                    .membership_for_epoch(epoch)
370                    .await
371                    .context(warn!("No stake table for epoch"))?;
372
373                ensure!(
374                    membership.leader(view).await? == self.public_key,
375                    debug!(
376                        "We are not the DA committee leader for view {} are we leader for next \
377                         view? {}",
378                        *view,
379                        membership.leader(view + 1).await? == self.public_key
380                    )
381                );
382
383                handle_vote(
384                    &mut self.vote_collectors,
385                    vote,
386                    self.public_key.clone(),
387                    &membership,
388                    self.id,
389                    &event,
390                    &event_stream,
391                    &self.upgrade_lock,
392                    EpochTransitionIndicator::NotInTransition,
393                )
394                .await?;
395            },
396            HotShotEvent::ViewChange(view, epoch) => {
397                if *epoch > self.cur_epoch {
398                    self.cur_epoch = *epoch;
399                }
400
401                let view = *view;
402                ensure!(
403                    *self.cur_view < *view,
404                    info!("Received a view change to an older view.")
405                );
406
407                if *view - *self.cur_view > 1 {
408                    tracing::info!("View changed by more than 1 going to view {view}");
409                }
410                self.cur_view = view;
411            },
412            HotShotEvent::BlockRecv(packed_bundle) => {
413                let PackedBundle::<TYPES> {
414                    encoded_transactions,
415                    metadata,
416                    view_number,
417                    ..
418                } = packed_bundle;
419                let view_number = *view_number;
420
421                // quick hash the encoded txns with sha256
422                let encoded_transactions_hash = Sha256::digest(encoded_transactions);
423
424                // sign the encoded transactions as opposed to the VID commitment
425                let signature =
426                    TYPES::SignatureKey::sign(&self.private_key, &encoded_transactions_hash)
427                        .wrap()?;
428
429                let epoch = self.cur_epoch;
430                let leader = self
431                    .membership_coordinator
432                    .membership_for_epoch(epoch)
433                    .await
434                    .context(warn!("No stake table for epoch"))?
435                    .leader(view_number)
436                    .await?;
437                if leader != self.public_key {
438                    tracing::debug!(
439                        "We are not the leader in the current epoch. Do not send the DA proposal"
440                    );
441                    return Ok(());
442                }
443                let consensus_reader = self.consensus.read().await;
444                let high_qc_block_number = consensus_reader.high_qc().data.block_number;
445                // We in transition if our high QC is after the epoch root block, not the last block,
446                // And we aren't in an epoch greater than the high qc's epoch.  In other words
447                // we expect to propose to both epochs if the next block after our current high QC is
448                // going to be a transition block.  We most likely will propose the high QC's block height + 1.
449                let epoch_transition_indicator = if self.upgrade_lock.epochs_enabled(view_number) {
450                    match (high_qc_block_number, self.cur_epoch) {
451                        (Some(block_number), Some(cur_epoch)) => {
452                            let epoch = epoch_from_block_number(
453                                block_number,
454                                self.membership_coordinator.epoch_height,
455                            );
456                            if epoch < *cur_epoch {
457                                // We are in a new epoch, we can't be in transition
458                                EpochTransitionIndicator::NotInTransition
459                            } else if !is_last_block(
460                                block_number,
461                                self.membership_coordinator.epoch_height,
462                            ) && is_ge_epoch_root(
463                                block_number,
464                                self.membership_coordinator.epoch_height,
465                            ) {
466                                EpochTransitionIndicator::InTransition
467                            } else {
468                                EpochTransitionIndicator::NotInTransition
469                            }
470                        },
471                        _ => EpochTransitionIndicator::NotInTransition,
472                    }
473                } else {
474                    EpochTransitionIndicator::NotInTransition
475                };
476
477                drop(consensus_reader);
478
479                let data: DaProposal2<TYPES> = DaProposal2 {
480                    encoded_transactions: Arc::clone(encoded_transactions),
481                    metadata: metadata.clone(),
482                    // Upon entering a new view we want to send a DA Proposal for the next view -> Is it always the case that this is cur_view + 1?
483                    view_number,
484                    epoch,
485                    epoch_transition_indicator,
486                };
487
488                let message = Proposal {
489                    data,
490                    signature,
491                    _pd: PhantomData,
492                };
493
494                broadcast_event(
495                    Arc::new(HotShotEvent::DaProposalSend(
496                        message.clone(),
497                        self.public_key.clone(),
498                    )),
499                    &event_stream,
500                )
501                .await;
502                let payload_with_metadata = Arc::new(PayloadWithMetadata {
503                    payload: TYPES::BlockPayload::from_bytes(
504                        encoded_transactions.as_ref(),
505                        metadata,
506                    ),
507                    metadata: metadata.clone(),
508                });
509                // Save the payload early because we might need it to calculate VID for the next epoch nodes.
510                let update_result = self
511                    .consensus
512                    .write()
513                    .await
514                    .update_saved_payloads(view_number, payload_with_metadata);
515                if let Err(e) = update_result {
516                    tracing::trace!("{e:?}");
517                }
518            },
519            _ => {},
520        }
521        Ok(())
522    }
523}
524
525/// task state implementation for DA Task
526#[async_trait]
527impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for DaTaskState<TYPES, I> {
528    type Event = HotShotEvent<TYPES>;
529
530    async fn handle_event(
531        &mut self,
532        event: Arc<Self::Event>,
533        sender: &Sender<Arc<Self::Event>>,
534        _receiver: &Receiver<Arc<Self::Event>>,
535    ) -> Result<()> {
536        self.handle(event, sender.clone()).await
537    }
538
539    fn cancel_subtasks(&mut self) {}
540}