Skip to main content

hotshot_task_impls/
vid.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{marker::PhantomData, sync::Arc};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13    consensus::{OuterConsensus, PayloadWithMetadata},
14    data::{EpochNumber, PackedBundle, VidDisperse, VidDisperseAndDuration, ViewNumber},
15    epoch_membership::EpochMembershipCoordinator,
16    message::{Proposal, UpgradeLock},
17    simple_vote::HasEpoch,
18    traits::{
19        BlockPayload,
20        block_contents::BlockHeader,
21        node_implementation::{NodeImplementation, NodeType},
22        signature_key::SignatureKey,
23    },
24    utils::{is_epoch_transition, option_epoch_from_block_number},
25};
26use hotshot_utils::anytrace::Result;
27use tracing::{debug, error, info, instrument};
28
29use crate::{
30    events::{HotShotEvent, HotShotTaskCompleted},
31    helpers::broadcast_event,
32};
33
34/// Tracks state of a VID task
35pub struct VidTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
36    /// View number this view is executing in.
37    pub cur_view: ViewNumber,
38
39    /// Epoch number this node is executing in.
40    pub cur_epoch: Option<EpochNumber>,
41
42    /// Reference to consensus. Leader will require a read lock on this.
43    pub consensus: OuterConsensus<TYPES>,
44
45    /// The underlying network
46    pub network: Arc<I::Network>,
47
48    /// Membership for the quorum
49    pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
50
51    /// This Nodes Public Key
52    pub public_key: TYPES::SignatureKey,
53
54    /// Our Private Key
55    pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
56
57    /// This state's ID
58    pub id: u64,
59
60    /// Lock for a decided upgrade
61    pub upgrade_lock: UpgradeLock<TYPES>,
62
63    /// Number of blocks in an epoch, zero means there are no epochs
64    pub epoch_height: u64,
65}
66
67impl<TYPES: NodeType, I: NodeImplementation<TYPES>> VidTaskState<TYPES, I> {
68    /// main task event handler
69    #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "VID Main Task", level = "error", target = "VidTaskState")]
70    pub async fn handle(
71        &mut self,
72        event: Arc<HotShotEvent<TYPES>>,
73        event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
74    ) -> Option<HotShotTaskCompleted> {
75        match event.as_ref() {
76            HotShotEvent::BlockRecv(packed_bundle) => {
77                let PackedBundle::<TYPES> {
78                    encoded_transactions,
79                    metadata,
80                    view_number,
81                    sequencing_fees,
82                    ..
83                } = packed_bundle;
84                let payload =
85                    <TYPES as NodeType>::BlockPayload::from_bytes(encoded_transactions, metadata);
86                let builder_commitment = payload.builder_commitment(metadata);
87                let epoch = self.cur_epoch;
88                if self
89                    .membership_coordinator
90                    .membership_for_epoch(epoch)
91                    .ok()?
92                    .leader(*view_number)
93                    .ok()?
94                    != self.public_key
95                {
96                    tracing::debug!(
97                        "We are not the leader in the current epoch. Do not send the VID \
98                         dispersal."
99                    );
100                    return None;
101                }
102                let VidDisperseAndDuration {
103                    disperse: vid_disperse,
104                    duration: disperse_duration,
105                } = VidDisperse::calculate_vid_disperse(
106                    &payload,
107                    &self.membership_coordinator,
108                    *view_number,
109                    epoch,
110                    epoch,
111                    metadata,
112                    &self.upgrade_lock,
113                )
114                .await
115                .ok()?;
116                let payload_commitment = vid_disperse.payload_commitment();
117                let payload_with_metadata = Arc::new(PayloadWithMetadata {
118                    payload,
119                    metadata: metadata.clone(),
120                });
121
122                let mut consensus_writer = self.consensus.write().await;
123                consensus_writer
124                    .metrics
125                    .vid_disperse_duration
126                    .add_point(disperse_duration.as_secs_f64());
127                // Make sure we save the payload; we might need it to send the next epoch VID shares.
128                if let Err(e) =
129                    consensus_writer.update_saved_payloads(*view_number, payload_with_metadata)
130                {
131                    tracing::debug!(error=?e);
132                }
133                for share in vid_disperse.clone().to_shares() {
134                    if let Some(share) = share.to_proposal(&self.private_key) {
135                        consensus_writer.update_vid_shares(*view_number, share);
136                    }
137                }
138                drop(consensus_writer);
139
140                // send the commitment and metadata to consensus for block building
141                broadcast_event(
142                    Arc::new(HotShotEvent::SendPayloadCommitmentAndMetadata(
143                        payload_commitment,
144                        builder_commitment,
145                        metadata.clone(),
146                        *view_number,
147                        sequencing_fees.clone(),
148                    )),
149                    &event_stream,
150                )
151                .await;
152
153                let view_number = *view_number;
154                let Ok(signature) = TYPES::SignatureKey::sign(
155                    &self.private_key,
156                    vid_disperse.payload_commitment_ref(),
157                ) else {
158                    error!("VID: failed to sign dispersal payload");
159                    return None;
160                };
161                debug!("publishing VID disperse for view {view_number} and epoch {epoch:?}");
162                broadcast_event(
163                    Arc::new(HotShotEvent::VidDisperseSend(
164                        Proposal {
165                            signature,
166                            data: vid_disperse,
167                            _pd: PhantomData,
168                        },
169                        self.public_key.clone(),
170                    )),
171                    &event_stream,
172                )
173                .await;
174            },
175
176            HotShotEvent::ViewChange(view, epoch) => {
177                if *epoch > self.cur_epoch {
178                    self.cur_epoch = *epoch;
179                }
180
181                let view = *view;
182                if (*view != 0 || *self.cur_view > 0) && *self.cur_view >= *view {
183                    return None;
184                }
185
186                if *view - *self.cur_view > 1 {
187                    info!("View changed by more than 1 going to view {view}");
188                }
189                self.cur_view = view;
190
191                return None;
192            },
193
194            HotShotEvent::QuorumProposalSend(proposal, _) => {
195                let proposed_block_number = proposal.data.block_header().block_number();
196                if proposal.data.epoch().is_none()
197                    || !is_epoch_transition(proposed_block_number, self.epoch_height)
198                {
199                    // This is not the last block in the epoch, do nothing.
200                    return None;
201                }
202                // We just sent a proposal for the last block in the epoch. We need to calculate
203                // and send VID for the nodes in the next epoch so that they can vote.
204                let proposal_view_number = proposal.data.view_number();
205                let sender_epoch =
206                    option_epoch_from_block_number(true, proposed_block_number, self.epoch_height);
207                let target_epoch = sender_epoch.map(|x| x + 1);
208
209                let consensus_reader = self.consensus.read().await;
210                let Some(payload) = consensus_reader.saved_payloads().get(&proposal_view_number)
211                else {
212                    tracing::warn!(
213                        "We need to calculate VID for the nodes in the next epoch but we don't \
214                         have the transactions"
215                    );
216                    return None;
217                };
218                let payload = Arc::clone(payload);
219                drop(consensus_reader);
220
221                let VidDisperseAndDuration {
222                    disperse: next_epoch_vid_disperse,
223                    duration: _,
224                } = VidDisperse::calculate_vid_disperse(
225                    &payload.payload,
226                    &self.membership_coordinator,
227                    proposal_view_number,
228                    target_epoch,
229                    sender_epoch,
230                    &payload.metadata,
231                    &self.upgrade_lock,
232                )
233                .await
234                .ok()?;
235                let Ok(next_epoch_signature) = TYPES::SignatureKey::sign(
236                    &self.private_key,
237                    next_epoch_vid_disperse.payload_commitment().as_ref(),
238                ) else {
239                    error!("VID: failed to sign dispersal payload for the next epoch");
240                    return None;
241                };
242                debug!(
243                    "publishing VID disperse for view {proposal_view_number} and epoch \
244                     {target_epoch:?}"
245                );
246                broadcast_event(
247                    Arc::new(HotShotEvent::VidDisperseSend(
248                        Proposal {
249                            signature: next_epoch_signature,
250                            data: next_epoch_vid_disperse.clone(),
251                            _pd: PhantomData,
252                        },
253                        self.public_key.clone(),
254                    )),
255                    &event_stream,
256                )
257                .await;
258            },
259            HotShotEvent::Shutdown => {
260                return Some(HotShotTaskCompleted);
261            },
262            _ => {},
263        }
264        None
265    }
266}
267
268/// task state implementation for VID Task
269#[async_trait]
270impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for VidTaskState<TYPES, I> {
271    type Event = HotShotEvent<TYPES>;
272
273    async fn handle_event(
274        &mut self,
275        event: Arc<Self::Event>,
276        sender: &Sender<Arc<Self::Event>>,
277        _receiver: &Receiver<Arc<Self::Event>>,
278    ) -> Result<()> {
279        if self.upgrade_lock.new_protocol_active(self.cur_view) {
280            return Ok(());
281        }
282        self.handle(event, sender.clone()).await;
283        Ok(())
284    }
285
286    fn cancel_subtasks(&mut self) {}
287}