Skip to main content

hotshot_task_impls/
da.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{marker::PhantomData, sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13    consensus::{Consensus, OuterConsensus, PayloadWithMetadata},
14    data::{
15        DaProposal2, EpochNumber, PackedBundle, ViewNumber, vid_commitment,
16        vid_disperse::vid_total_weight,
17    },
18    epoch_membership::EpochMembershipCoordinator,
19    event::{Event, EventType},
20    message::{Proposal, UpgradeLock},
21    simple_certificate::DaCertificate2,
22    simple_vote::{DaData2, DaVote2},
23    storage_metrics::StorageMetricsValue,
24    traits::{
25        BlockPayload, EncodeBytes,
26        network::ConnectedNetwork,
27        node_implementation::{NodeImplementation, NodeType},
28        signature_key::SignatureKey,
29        storage::Storage,
30    },
31    utils::{EpochTransitionIndicator, epoch_from_block_number, is_ge_epoch_root, is_last_block},
32    vote::HasViewNumber,
33};
34use hotshot_utils::anytrace::*;
35use sha2::{Digest, Sha256};
36use tokio::{spawn, task::spawn_blocking};
37use tracing::instrument;
38
39use crate::{
40    events::HotShotEvent,
41    helpers::broadcast_event,
42    vote_collection::{VoteCollectorsMap, handle_vote},
43};
44
45/// Tracks state of a DA task
46pub struct DaTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
47    /// Output events to application
48    pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
49
50    /// View number this view is executing in.
51    pub cur_view: ViewNumber,
52
53    /// Epoch number this node is executing in.
54    pub cur_epoch: Option<EpochNumber>,
55
56    /// Reference to consensus. Leader will require a read lock on this.
57    pub consensus: OuterConsensus<TYPES>,
58
59    /// Membership for the DA committee and quorum committee.
60    /// We need the latter only for calculating the proper VID scheme
61    /// from the number of nodes in the quorum.
62    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
63
64    /// The underlying network
65    pub network: Arc<I::Network>,
66
67    /// A map of `DaVote` collector tasks.
68    pub vote_collectors: VoteCollectorsMap<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>,
69
70    /// This Nodes public key
71    pub public_key: TYPES::SignatureKey,
72
73    /// This Nodes private key
74    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
75
76    /// This state's ID
77    pub id: u64,
78
79    /// This node's storage ref
80    pub storage: I::Storage,
81
82    /// Storage metrics
83    pub storage_metrics: Arc<StorageMetricsValue>,
84
85    /// Lock for a decided upgrade
86    pub upgrade_lock: UpgradeLock<TYPES>,
87}
88
89impl<TYPES: NodeType, I: NodeImplementation<TYPES>> DaTaskState<TYPES, I> {
90    /// main task event handler
91    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "DA Main Task", level = "error", target = "DaTaskState")]
92    pub async fn handle(
93        &mut self,
94        event: Arc<HotShotEvent<TYPES>>,
95        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
96    ) -> Result<()> {
97        match event.as_ref() {
98            HotShotEvent::DaProposalRecv(proposal, sender) => {
99                let sender = sender.clone();
100                tracing::debug!(
101                    "DA proposal received for view: {}",
102                    proposal.data.view_number()
103                );
104                // ED NOTE: Assuming that the next view leader is the one who sends DA proposal for this view
105                let view = proposal.data.view_number();
106
107                // Allow a DA proposal that is one view older, in case we have voted on a quorum
108                // proposal and updated the view.
109                //
110                // Anything older is discarded because it is no longer relevant.
111                ensure!(
112                    self.cur_view <= view + 1,
113                    "Throwing away DA proposal that is more than one view older"
114                );
115
116                if let Some(entry) = self.consensus.read().await.saved_payloads().get(&view) {
117                    ensure!(
118                        entry.payload.encode() == proposal.data.encoded_transactions,
119                        "Received DA proposal for view {view} but we already have a payload for \
120                         that view and they are not identical.  Throwing it away",
121                    );
122                }
123
124                let encoded_transactions_hash = Sha256::digest(&proposal.data.encoded_transactions);
125                let view_leader_key = self
126                    .membership_coordinator
127                    .membership_for_epoch(proposal.data.epoch)
128                    .context(warn!("No stake table for epoch {:?}", proposal.data.epoch))?
129                    .leader(view)?;
130                ensure!(
131                    view_leader_key == sender,
132                    warn!(
133                        "DA proposal doesn't have expected leader key for view {} \n DA proposal \
134                         is: {:?}",
135                        *view,
136                        proposal.data.clone()
137                    )
138                );
139
140                ensure!(
141                    view_leader_key.validate(&proposal.signature, &encoded_transactions_hash),
142                    warn!("Could not verify proposal.")
143                );
144
145                broadcast_event(
146                    Arc::new(HotShotEvent::DaProposalValidated(proposal.clone(), sender)),
147                    &event_stream,
148                )
149                .await;
150            },
151            HotShotEvent::DaProposalValidated(proposal, sender) => {
152                tracing::debug!(
153                    "DA proposal validated for view {}",
154                    proposal.data.view_number()
155                );
156                let cur_view = self.consensus.read().await.cur_view();
157                let view_number = proposal.data.view_number();
158                let epoch_number = proposal.data.epoch;
159                let membership = self
160                    .membership_coordinator
161                    .stake_table_for_epoch(epoch_number)
162                    .context(warn!("No stake table for epoch"))?;
163
164                ensure!(
165                    cur_view <= view_number + 1,
166                    debug!(
167                        "Validated DA proposal for prior view but it's too old now Current view \
168                         {cur_view}, DA Proposal view {}",
169                        proposal.data.view_number()
170                    )
171                );
172
173                // Proposal is fresh and valid, notify the application layer
174                broadcast_event(
175                    Event {
176                        view_number,
177                        event: EventType::DaProposal {
178                            proposal: proposal.clone(),
179                            sender: sender.clone(),
180                        },
181                    },
182                    &self.output_event_stream,
183                )
184                .await;
185
186                ensure!(
187                    membership.has_da_stake(&self.public_key),
188                    debug!(
189                        "We were not chosen for consensus committee for view {view_number} in \
190                         epoch {epoch_number:?}"
191                    )
192                );
193                let total_weight = vid_total_weight(membership.stake_table(), epoch_number);
194
195                let version = self.upgrade_lock.version_infallible(view_number);
196
197                let txns = Arc::clone(&proposal.data.encoded_transactions);
198                let txns_clone = Arc::clone(&txns);
199                let metadata = proposal.data.metadata.encode();
200                let metadata_clone = metadata.clone();
201                let payload_commitment =
202                    spawn_blocking(move || vid_commitment(&txns, &metadata, total_weight, version))
203                        .await;
204                let payload_commitment = payload_commitment.unwrap();
205                let next_epoch_payload_commitment = if matches!(
206                    proposal.data.epoch_transition_indicator,
207                    EpochTransitionIndicator::InTransition
208                ) && self
209                    .upgrade_lock
210                    .epochs_enabled(proposal.data.view_number())
211                    && epoch_number.is_some()
212                {
213                    let next_stake_table = membership.next_epoch_stake_table()?;
214                    let next_epoch_total_weight = vid_total_weight(
215                        next_stake_table.stake_table(),
216                        epoch_number.map(|epoch| epoch + 1),
217                    );
218
219                    let commit_result = spawn_blocking(move || {
220                        vid_commitment(
221                            &txns_clone,
222                            &metadata_clone,
223                            next_epoch_total_weight,
224                            version,
225                        )
226                    })
227                    .await;
228                    Some(commit_result.unwrap())
229                } else {
230                    None
231                };
232
233                let now = Instant::now();
234                self.storage
235                    .append_da2(proposal, payload_commitment)
236                    .await
237                    .wrap()
238                    .context(error!("Failed to append DA proposal to storage"))?;
239                self.storage_metrics
240                    .append_da_duration
241                    .add_point(now.elapsed().as_secs_f64());
242
243                // Generate and send vote
244                let vote = DaVote2::create_signed_vote(
245                    DaData2 {
246                        payload_commit: payload_commitment,
247                        next_epoch_payload_commit: next_epoch_payload_commitment,
248                        epoch: epoch_number,
249                    },
250                    view_number,
251                    &self.public_key,
252                    &self.private_key,
253                    &self.upgrade_lock,
254                )?;
255
256                tracing::debug!("Sending vote to the DA leader {}", vote.view_number());
257
258                broadcast_event(Arc::new(HotShotEvent::DaVoteSend(vote)), &event_stream).await;
259                let mut consensus_writer = self.consensus.write().await;
260
261                // Ensure this view is in the view map for garbage collection.
262
263                if let Err(e) =
264                    consensus_writer.update_da_view(view_number, epoch_number, payload_commitment)
265                {
266                    tracing::trace!("{e:?}");
267                }
268
269                let payload_with_metadata = Arc::new(PayloadWithMetadata {
270                    payload: TYPES::BlockPayload::from_bytes(
271                        proposal.data.encoded_transactions.as_ref(),
272                        &proposal.data.metadata,
273                    ),
274                    metadata: proposal.data.metadata.clone(),
275                });
276
277                // Record the payload we have promised to make available.
278                if let Err(e) =
279                    consensus_writer.update_saved_payloads(view_number, payload_with_metadata)
280                {
281                    tracing::trace!("{e:?}");
282                }
283                drop(consensus_writer);
284
285                // Optimistically calculate and update VID if we know that the primary network is down.
286                if self.network.is_primary_down() {
287                    let my_id = self.id;
288                    let consensus =
289                        OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
290                    let pk = self.private_key.clone();
291                    let public_key = self.public_key.clone();
292                    let chan = event_stream.clone();
293                    let upgrade_lock = self.upgrade_lock.clone();
294                    let next_epoch = epoch_number.map(|epoch| epoch + 1);
295
296                    let mut target_epochs = vec![];
297                    if membership.has_stake(&public_key) {
298                        target_epochs.push(epoch_number);
299                    }
300                    if membership.next_epoch_stake_table()?.has_stake(&public_key) {
301                        target_epochs.push(next_epoch);
302                    }
303                    if target_epochs.is_empty() {
304                        bail!(
305                            "Not calculating VID, the node doesn't belong to the current epoch or \
306                             the next epoch."
307                        );
308                    };
309
310                    tracing::debug!(
311                        "Primary network is down. Optimistically calculate own VID share."
312                    );
313                    let membership = membership.clone();
314                    spawn(async move {
315                        for target_epoch in target_epochs {
316                            Consensus::calculate_and_update_vid(
317                                OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
318                                view_number,
319                                target_epoch,
320                                membership.coordinator.clone(),
321                                &pk,
322                                &upgrade_lock,
323                            )
324                            .await;
325                            if let Some(vid_share) = consensus
326                                .read()
327                                .await
328                                .vid_shares()
329                                .get(&view_number)
330                                .and_then(|key_map| key_map.get(&public_key))
331                                .and_then(|epoch_map| epoch_map.get(&target_epoch))
332                            {
333                                tracing::debug!(
334                                    "Primary network is down. Calculated own VID share for epoch \
335                                     {target_epoch:?}, my id {my_id}"
336                                );
337                                broadcast_event(
338                                    Arc::new(HotShotEvent::VidShareRecv(
339                                        public_key.clone(),
340                                        vid_share.clone(),
341                                    )),
342                                    &chan,
343                                )
344                                .await;
345                            }
346                        }
347                    });
348                }
349            },
350            HotShotEvent::DaVoteRecv(vote) => {
351                tracing::debug!("DA vote recv, Main Task {}", vote.view_number());
352                // Check if we are the leader and the vote is from the sender.
353                let view = vote.view_number();
354                let epoch = vote.data.epoch;
355                let membership = self
356                    .membership_coordinator
357                    .membership_for_epoch(epoch)
358                    .context(warn!("No stake table for epoch"))?;
359
360                ensure!(
361                    membership.leader(view)? == self.public_key,
362                    debug!(
363                        "We are not the DA committee leader for view {} are we leader for next \
364                         view? {}",
365                        *view,
366                        membership.leader(view + 1)? == self.public_key
367                    )
368                );
369
370                handle_vote(
371                    &mut self.vote_collectors,
372                    vote,
373                    self.public_key.clone(),
374                    &membership,
375                    self.id,
376                    &event,
377                    &event_stream,
378                    &self.upgrade_lock,
379                    EpochTransitionIndicator::NotInTransition,
380                )
381                .await?;
382            },
383            HotShotEvent::ViewChange(view, epoch) => {
384                if *epoch > self.cur_epoch {
385                    self.cur_epoch = *epoch;
386                }
387
388                let view = *view;
389                ensure!(
390                    *self.cur_view < *view,
391                    info!("Received a view change to an older view.")
392                );
393
394                if *view - *self.cur_view > 1 {
395                    tracing::info!("View changed by more than 1 going to view {view}");
396                }
397                self.cur_view = view;
398            },
399            HotShotEvent::BlockRecv(packed_bundle) => {
400                let PackedBundle::<TYPES> {
401                    encoded_transactions,
402                    metadata,
403                    view_number,
404                    ..
405                } = packed_bundle;
406                let view_number = *view_number;
407
408                // quick hash the encoded txns with sha256
409                let encoded_transactions_hash = Sha256::digest(encoded_transactions);
410
411                // sign the encoded transactions as opposed to the VID commitment
412                let signature =
413                    TYPES::SignatureKey::sign(&self.private_key, &encoded_transactions_hash)
414                        .wrap()?;
415
416                let epoch = self.cur_epoch;
417                let leader = self
418                    .membership_coordinator
419                    .membership_for_epoch(epoch)
420                    .context(warn!("No stake table for epoch"))?
421                    .leader(view_number)?;
422                if leader != self.public_key {
423                    tracing::debug!(
424                        "We are not the leader in the current epoch. Do not send the DA proposal"
425                    );
426                    return Ok(());
427                }
428                let consensus_reader = self.consensus.read().await;
429                let high_qc_block_number = consensus_reader.high_qc().data.block_number;
430                // We in transition if our high QC is after the epoch root block, not the last block,
431                // And we aren't in an epoch greater than the high qc's epoch.  In other words
432                // we expect to propose to both epochs if the next block after our current high QC is
433                // going to be a transition block.  We most likely will propose the high QC's block height + 1.
434                let epoch_transition_indicator = if self.upgrade_lock.epochs_enabled(view_number) {
435                    match (high_qc_block_number, self.cur_epoch) {
436                        (Some(block_number), Some(cur_epoch)) => {
437                            let epoch = epoch_from_block_number(
438                                block_number,
439                                *self.membership_coordinator.epoch_height(),
440                            );
441                            if epoch < *cur_epoch {
442                                // We are in a new epoch, we can't be in transition
443                                EpochTransitionIndicator::NotInTransition
444                            } else if !is_last_block(
445                                block_number,
446                                *self.membership_coordinator.epoch_height(),
447                            ) && is_ge_epoch_root(
448                                block_number,
449                                *self.membership_coordinator.epoch_height(),
450                            ) {
451                                EpochTransitionIndicator::InTransition
452                            } else {
453                                EpochTransitionIndicator::NotInTransition
454                            }
455                        },
456                        _ => EpochTransitionIndicator::NotInTransition,
457                    }
458                } else {
459                    EpochTransitionIndicator::NotInTransition
460                };
461
462                drop(consensus_reader);
463
464                let data: DaProposal2<TYPES> = DaProposal2 {
465                    encoded_transactions: Arc::clone(encoded_transactions),
466                    metadata: metadata.clone(),
467                    // Upon entering a new view we want to send a DA Proposal for the next view -> Is it always the case that this is cur_view + 1?
468                    view_number,
469                    epoch,
470                    epoch_transition_indicator,
471                };
472
473                let message = Proposal {
474                    data,
475                    signature,
476                    _pd: PhantomData,
477                };
478
479                broadcast_event(
480                    Arc::new(HotShotEvent::DaProposalSend(
481                        message.clone(),
482                        self.public_key.clone(),
483                    )),
484                    &event_stream,
485                )
486                .await;
487                let payload_with_metadata = Arc::new(PayloadWithMetadata {
488                    payload: TYPES::BlockPayload::from_bytes(
489                        encoded_transactions.as_ref(),
490                        metadata,
491                    ),
492                    metadata: metadata.clone(),
493                });
494                // Save the payload early because we might need it to calculate VID for the next epoch nodes.
495                let update_result = self
496                    .consensus
497                    .write()
498                    .await
499                    .update_saved_payloads(view_number, payload_with_metadata);
500                if let Err(e) = update_result {
501                    tracing::trace!("{e:?}");
502                }
503            },
504            _ => {},
505        }
506        Ok(())
507    }
508}
509
510/// task state implementation for DA Task
511#[async_trait]
512impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for DaTaskState<TYPES, I> {
513    type Event = HotShotEvent<TYPES>;
514
515    async fn handle_event(
516        &mut self,
517        event: Arc<Self::Event>,
518        sender: &Sender<Arc<Self::Event>>,
519        _receiver: &Receiver<Arc<Self::Event>>,
520    ) -> Result<()> {
521        if self.upgrade_lock.new_protocol_active(self.cur_view) {
522            return Ok(());
523        }
524        self.handle(event, sender.clone()).await
525    }
526
527    fn cancel_subtasks(&mut self) {}
528}