1use std::{marker::PhantomData, sync::Arc};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13 consensus::{OuterConsensus, PayloadWithMetadata},
14 data::{EpochNumber, PackedBundle, VidDisperse, VidDisperseAndDuration, ViewNumber},
15 epoch_membership::EpochMembershipCoordinator,
16 message::{Proposal, UpgradeLock},
17 simple_vote::HasEpoch,
18 traits::{
19 BlockPayload,
20 block_contents::BlockHeader,
21 node_implementation::{NodeImplementation, NodeType},
22 signature_key::SignatureKey,
23 },
24 utils::{is_epoch_transition, option_epoch_from_block_number},
25};
26use hotshot_utils::anytrace::Result;
27use tracing::{debug, error, info, instrument};
28
29use crate::{
30 events::{HotShotEvent, HotShotTaskCompleted},
31 helpers::broadcast_event,
32};
33
34pub struct VidTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
36 pub cur_view: ViewNumber,
38
39 pub cur_epoch: Option<EpochNumber>,
41
42 pub consensus: OuterConsensus<TYPES>,
44
45 pub network: Arc<I::Network>,
47
48 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
50
51 pub public_key: TYPES::SignatureKey,
53
54 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
56
57 pub id: u64,
59
60 pub upgrade_lock: UpgradeLock<TYPES>,
62
63 pub epoch_height: u64,
65}
66
67impl<TYPES: NodeType, I: NodeImplementation<TYPES>> VidTaskState<TYPES, I> {
68 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "VID Main Task", level = "error", target = "VidTaskState")]
70 pub async fn handle(
71 &mut self,
72 event: Arc<HotShotEvent<TYPES>>,
73 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
74 ) -> Option<HotShotTaskCompleted> {
75 match event.as_ref() {
76 HotShotEvent::BlockRecv(packed_bundle) => {
77 let PackedBundle::<TYPES> {
78 encoded_transactions,
79 metadata,
80 view_number,
81 sequencing_fees,
82 ..
83 } = packed_bundle;
84 let payload =
85 <TYPES as NodeType>::BlockPayload::from_bytes(encoded_transactions, metadata);
86 let builder_commitment = payload.builder_commitment(metadata);
87 let epoch = self.cur_epoch;
88 if self
89 .membership_coordinator
90 .membership_for_epoch(epoch)
91 .ok()?
92 .leader(*view_number)
93 .ok()?
94 != self.public_key
95 {
96 tracing::debug!(
97 "We are not the leader in the current epoch. Do not send the VID \
98 dispersal."
99 );
100 return None;
101 }
102 let VidDisperseAndDuration {
103 disperse: vid_disperse,
104 duration: disperse_duration,
105 } = VidDisperse::calculate_vid_disperse(
106 &payload,
107 &self.membership_coordinator,
108 *view_number,
109 epoch,
110 epoch,
111 metadata,
112 &self.upgrade_lock,
113 )
114 .await
115 .ok()?;
116 let payload_commitment = vid_disperse.payload_commitment();
117 let payload_with_metadata = Arc::new(PayloadWithMetadata {
118 payload,
119 metadata: metadata.clone(),
120 });
121
122 let mut consensus_writer = self.consensus.write().await;
123 consensus_writer
124 .metrics
125 .vid_disperse_duration
126 .add_point(disperse_duration.as_secs_f64());
127 if let Err(e) =
129 consensus_writer.update_saved_payloads(*view_number, payload_with_metadata)
130 {
131 tracing::debug!(error=?e);
132 }
133 for share in vid_disperse.clone().to_shares() {
134 if let Some(share) = share.to_proposal(&self.private_key) {
135 consensus_writer.update_vid_shares(*view_number, share);
136 }
137 }
138 drop(consensus_writer);
139
140 broadcast_event(
142 Arc::new(HotShotEvent::SendPayloadCommitmentAndMetadata(
143 payload_commitment,
144 builder_commitment,
145 metadata.clone(),
146 *view_number,
147 sequencing_fees.clone(),
148 )),
149 &event_stream,
150 )
151 .await;
152
153 let view_number = *view_number;
154 let Ok(signature) = TYPES::SignatureKey::sign(
155 &self.private_key,
156 vid_disperse.payload_commitment_ref(),
157 ) else {
158 error!("VID: failed to sign dispersal payload");
159 return None;
160 };
161 debug!("publishing VID disperse for view {view_number} and epoch {epoch:?}");
162 broadcast_event(
163 Arc::new(HotShotEvent::VidDisperseSend(
164 Proposal {
165 signature,
166 data: vid_disperse,
167 _pd: PhantomData,
168 },
169 self.public_key.clone(),
170 )),
171 &event_stream,
172 )
173 .await;
174 },
175
176 HotShotEvent::ViewChange(view, epoch) => {
177 if *epoch > self.cur_epoch {
178 self.cur_epoch = *epoch;
179 }
180
181 let view = *view;
182 if (*view != 0 || *self.cur_view > 0) && *self.cur_view >= *view {
183 return None;
184 }
185
186 if *view - *self.cur_view > 1 {
187 info!("View changed by more than 1 going to view {view}");
188 }
189 self.cur_view = view;
190
191 return None;
192 },
193
194 HotShotEvent::QuorumProposalSend(proposal, _) => {
195 let proposed_block_number = proposal.data.block_header().block_number();
196 if proposal.data.epoch().is_none()
197 || !is_epoch_transition(proposed_block_number, self.epoch_height)
198 {
199 return None;
201 }
202 let proposal_view_number = proposal.data.view_number();
205 let sender_epoch =
206 option_epoch_from_block_number(true, proposed_block_number, self.epoch_height);
207 let target_epoch = sender_epoch.map(|x| x + 1);
208
209 let consensus_reader = self.consensus.read().await;
210 let Some(payload) = consensus_reader.saved_payloads().get(&proposal_view_number)
211 else {
212 tracing::warn!(
213 "We need to calculate VID for the nodes in the next epoch but we don't \
214 have the transactions"
215 );
216 return None;
217 };
218 let payload = Arc::clone(payload);
219 drop(consensus_reader);
220
221 let VidDisperseAndDuration {
222 disperse: next_epoch_vid_disperse,
223 duration: _,
224 } = VidDisperse::calculate_vid_disperse(
225 &payload.payload,
226 &self.membership_coordinator,
227 proposal_view_number,
228 target_epoch,
229 sender_epoch,
230 &payload.metadata,
231 &self.upgrade_lock,
232 )
233 .await
234 .ok()?;
235 let Ok(next_epoch_signature) = TYPES::SignatureKey::sign(
236 &self.private_key,
237 next_epoch_vid_disperse.payload_commitment().as_ref(),
238 ) else {
239 error!("VID: failed to sign dispersal payload for the next epoch");
240 return None;
241 };
242 debug!(
243 "publishing VID disperse for view {proposal_view_number} and epoch \
244 {target_epoch:?}"
245 );
246 broadcast_event(
247 Arc::new(HotShotEvent::VidDisperseSend(
248 Proposal {
249 signature: next_epoch_signature,
250 data: next_epoch_vid_disperse.clone(),
251 _pd: PhantomData,
252 },
253 self.public_key.clone(),
254 )),
255 &event_stream,
256 )
257 .await;
258 },
259 HotShotEvent::Shutdown => {
260 return Some(HotShotTaskCompleted);
261 },
262 _ => {},
263 }
264 None
265 }
266}
267
268#[async_trait]
270impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for VidTaskState<TYPES, I> {
271 type Event = HotShotEvent<TYPES>;
272
273 async fn handle_event(
274 &mut self,
275 event: Arc<Self::Event>,
276 sender: &Sender<Arc<Self::Event>>,
277 _receiver: &Receiver<Arc<Self::Event>>,
278 ) -> Result<()> {
279 if self.upgrade_lock.new_protocol_active(self.cur_view) {
280 return Ok(());
281 }
282 self.handle(event, sender.clone()).await;
283 Ok(())
284 }
285
286 fn cancel_subtasks(&mut self) {}
287}