1use std::{marker::PhantomData, sync::Arc, time::Instant};
8
9use async_broadcast::{Receiver, Sender};
10use async_trait::async_trait;
11use hotshot_task::task::TaskState;
12use hotshot_types::{
13 consensus::{Consensus, OuterConsensus, PayloadWithMetadata},
14 data::{
15 DaProposal2, EpochNumber, PackedBundle, ViewNumber, vid_commitment,
16 vid_disperse::vid_total_weight,
17 },
18 epoch_membership::EpochMembershipCoordinator,
19 event::{Event, EventType},
20 message::{Proposal, UpgradeLock},
21 simple_certificate::DaCertificate2,
22 simple_vote::{DaData2, DaVote2},
23 storage_metrics::StorageMetricsValue,
24 traits::{
25 BlockPayload, EncodeBytes,
26 network::ConnectedNetwork,
27 node_implementation::{NodeImplementation, NodeType},
28 signature_key::SignatureKey,
29 storage::Storage,
30 },
31 utils::{EpochTransitionIndicator, epoch_from_block_number, is_ge_epoch_root, is_last_block},
32 vote::HasViewNumber,
33};
34use hotshot_utils::anytrace::*;
35use sha2::{Digest, Sha256};
36use tokio::{spawn, task::spawn_blocking};
37use tracing::instrument;
38
39use crate::{
40 events::HotShotEvent,
41 helpers::broadcast_event,
42 vote_collection::{VoteCollectorsMap, handle_vote},
43};
44
45pub struct DaTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
47 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
49
50 pub cur_view: ViewNumber,
52
53 pub cur_epoch: Option<EpochNumber>,
55
56 pub consensus: OuterConsensus<TYPES>,
58
59 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
63
64 pub network: Arc<I::Network>,
66
67 pub vote_collectors: VoteCollectorsMap<TYPES, DaVote2<TYPES>, DaCertificate2<TYPES>>,
69
70 pub public_key: TYPES::SignatureKey,
72
73 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
75
76 pub id: u64,
78
79 pub storage: I::Storage,
81
82 pub storage_metrics: Arc<StorageMetricsValue>,
84
85 pub upgrade_lock: UpgradeLock<TYPES>,
87}
88
89impl<TYPES: NodeType, I: NodeImplementation<TYPES>> DaTaskState<TYPES, I> {
90 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "DA Main Task", level = "error", target = "DaTaskState")]
92 pub async fn handle(
93 &mut self,
94 event: Arc<HotShotEvent<TYPES>>,
95 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
96 ) -> Result<()> {
97 match event.as_ref() {
98 HotShotEvent::DaProposalRecv(proposal, sender) => {
99 let sender = sender.clone();
100 tracing::debug!(
101 "DA proposal received for view: {}",
102 proposal.data.view_number()
103 );
104 let view = proposal.data.view_number();
106
107 ensure!(
112 self.cur_view <= view + 1,
113 "Throwing away DA proposal that is more than one view older"
114 );
115
116 if let Some(entry) = self.consensus.read().await.saved_payloads().get(&view) {
117 ensure!(
118 entry.payload.encode() == proposal.data.encoded_transactions,
119 "Received DA proposal for view {view} but we already have a payload for \
120 that view and they are not identical. Throwing it away",
121 );
122 }
123
124 let encoded_transactions_hash = Sha256::digest(&proposal.data.encoded_transactions);
125 let view_leader_key = self
126 .membership_coordinator
127 .membership_for_epoch(proposal.data.epoch)
128 .context(warn!("No stake table for epoch {:?}", proposal.data.epoch))?
129 .leader(view)?;
130 ensure!(
131 view_leader_key == sender,
132 warn!(
133 "DA proposal doesn't have expected leader key for view {} \n DA proposal \
134 is: {:?}",
135 *view,
136 proposal.data.clone()
137 )
138 );
139
140 ensure!(
141 view_leader_key.validate(&proposal.signature, &encoded_transactions_hash),
142 warn!("Could not verify proposal.")
143 );
144
145 broadcast_event(
146 Arc::new(HotShotEvent::DaProposalValidated(proposal.clone(), sender)),
147 &event_stream,
148 )
149 .await;
150 },
151 HotShotEvent::DaProposalValidated(proposal, sender) => {
152 tracing::debug!(
153 "DA proposal validated for view {}",
154 proposal.data.view_number()
155 );
156 let cur_view = self.consensus.read().await.cur_view();
157 let view_number = proposal.data.view_number();
158 let epoch_number = proposal.data.epoch;
159 let membership = self
160 .membership_coordinator
161 .stake_table_for_epoch(epoch_number)
162 .context(warn!("No stake table for epoch"))?;
163
164 ensure!(
165 cur_view <= view_number + 1,
166 debug!(
167 "Validated DA proposal for prior view but it's too old now Current view \
168 {cur_view}, DA Proposal view {}",
169 proposal.data.view_number()
170 )
171 );
172
173 broadcast_event(
175 Event {
176 view_number,
177 event: EventType::DaProposal {
178 proposal: proposal.clone(),
179 sender: sender.clone(),
180 },
181 },
182 &self.output_event_stream,
183 )
184 .await;
185
186 ensure!(
187 membership.has_da_stake(&self.public_key),
188 debug!(
189 "We were not chosen for consensus committee for view {view_number} in \
190 epoch {epoch_number:?}"
191 )
192 );
193 let total_weight = vid_total_weight(membership.stake_table(), epoch_number);
194
195 let version = self.upgrade_lock.version_infallible(view_number);
196
197 let txns = Arc::clone(&proposal.data.encoded_transactions);
198 let txns_clone = Arc::clone(&txns);
199 let metadata = proposal.data.metadata.encode();
200 let metadata_clone = metadata.clone();
201 let payload_commitment =
202 spawn_blocking(move || vid_commitment(&txns, &metadata, total_weight, version))
203 .await;
204 let payload_commitment = payload_commitment.unwrap();
205 let next_epoch_payload_commitment = if matches!(
206 proposal.data.epoch_transition_indicator,
207 EpochTransitionIndicator::InTransition
208 ) && self
209 .upgrade_lock
210 .epochs_enabled(proposal.data.view_number())
211 && epoch_number.is_some()
212 {
213 let next_stake_table = membership.next_epoch_stake_table()?;
214 let next_epoch_total_weight = vid_total_weight(
215 next_stake_table.stake_table(),
216 epoch_number.map(|epoch| epoch + 1),
217 );
218
219 let commit_result = spawn_blocking(move || {
220 vid_commitment(
221 &txns_clone,
222 &metadata_clone,
223 next_epoch_total_weight,
224 version,
225 )
226 })
227 .await;
228 Some(commit_result.unwrap())
229 } else {
230 None
231 };
232
233 let now = Instant::now();
234 self.storage
235 .append_da2(proposal, payload_commitment)
236 .await
237 .wrap()
238 .context(error!("Failed to append DA proposal to storage"))?;
239 self.storage_metrics
240 .append_da_duration
241 .add_point(now.elapsed().as_secs_f64());
242
243 let vote = DaVote2::create_signed_vote(
245 DaData2 {
246 payload_commit: payload_commitment,
247 next_epoch_payload_commit: next_epoch_payload_commitment,
248 epoch: epoch_number,
249 },
250 view_number,
251 &self.public_key,
252 &self.private_key,
253 &self.upgrade_lock,
254 )?;
255
256 tracing::debug!("Sending vote to the DA leader {}", vote.view_number());
257
258 broadcast_event(Arc::new(HotShotEvent::DaVoteSend(vote)), &event_stream).await;
259 let mut consensus_writer = self.consensus.write().await;
260
261 if let Err(e) =
264 consensus_writer.update_da_view(view_number, epoch_number, payload_commitment)
265 {
266 tracing::trace!("{e:?}");
267 }
268
269 let payload_with_metadata = Arc::new(PayloadWithMetadata {
270 payload: TYPES::BlockPayload::from_bytes(
271 proposal.data.encoded_transactions.as_ref(),
272 &proposal.data.metadata,
273 ),
274 metadata: proposal.data.metadata.clone(),
275 });
276
277 if let Err(e) =
279 consensus_writer.update_saved_payloads(view_number, payload_with_metadata)
280 {
281 tracing::trace!("{e:?}");
282 }
283 drop(consensus_writer);
284
285 if self.network.is_primary_down() {
287 let my_id = self.id;
288 let consensus =
289 OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
290 let pk = self.private_key.clone();
291 let public_key = self.public_key.clone();
292 let chan = event_stream.clone();
293 let upgrade_lock = self.upgrade_lock.clone();
294 let next_epoch = epoch_number.map(|epoch| epoch + 1);
295
296 let mut target_epochs = vec![];
297 if membership.has_stake(&public_key) {
298 target_epochs.push(epoch_number);
299 }
300 if membership.next_epoch_stake_table()?.has_stake(&public_key) {
301 target_epochs.push(next_epoch);
302 }
303 if target_epochs.is_empty() {
304 bail!(
305 "Not calculating VID, the node doesn't belong to the current epoch or \
306 the next epoch."
307 );
308 };
309
310 tracing::debug!(
311 "Primary network is down. Optimistically calculate own VID share."
312 );
313 let membership = membership.clone();
314 spawn(async move {
315 for target_epoch in target_epochs {
316 Consensus::calculate_and_update_vid(
317 OuterConsensus::new(Arc::clone(&consensus.inner_consensus)),
318 view_number,
319 target_epoch,
320 membership.coordinator.clone(),
321 &pk,
322 &upgrade_lock,
323 )
324 .await;
325 if let Some(vid_share) = consensus
326 .read()
327 .await
328 .vid_shares()
329 .get(&view_number)
330 .and_then(|key_map| key_map.get(&public_key))
331 .and_then(|epoch_map| epoch_map.get(&target_epoch))
332 {
333 tracing::debug!(
334 "Primary network is down. Calculated own VID share for epoch \
335 {target_epoch:?}, my id {my_id}"
336 );
337 broadcast_event(
338 Arc::new(HotShotEvent::VidShareRecv(
339 public_key.clone(),
340 vid_share.clone(),
341 )),
342 &chan,
343 )
344 .await;
345 }
346 }
347 });
348 }
349 },
350 HotShotEvent::DaVoteRecv(vote) => {
351 tracing::debug!("DA vote recv, Main Task {}", vote.view_number());
352 let view = vote.view_number();
354 let epoch = vote.data.epoch;
355 let membership = self
356 .membership_coordinator
357 .membership_for_epoch(epoch)
358 .context(warn!("No stake table for epoch"))?;
359
360 ensure!(
361 membership.leader(view)? == self.public_key,
362 debug!(
363 "We are not the DA committee leader for view {} are we leader for next \
364 view? {}",
365 *view,
366 membership.leader(view + 1)? == self.public_key
367 )
368 );
369
370 handle_vote(
371 &mut self.vote_collectors,
372 vote,
373 self.public_key.clone(),
374 &membership,
375 self.id,
376 &event,
377 &event_stream,
378 &self.upgrade_lock,
379 EpochTransitionIndicator::NotInTransition,
380 )
381 .await?;
382 },
383 HotShotEvent::ViewChange(view, epoch) => {
384 if *epoch > self.cur_epoch {
385 self.cur_epoch = *epoch;
386 }
387
388 let view = *view;
389 ensure!(
390 *self.cur_view < *view,
391 info!("Received a view change to an older view.")
392 );
393
394 if *view - *self.cur_view > 1 {
395 tracing::info!("View changed by more than 1 going to view {view}");
396 }
397 self.cur_view = view;
398 },
399 HotShotEvent::BlockRecv(packed_bundle) => {
400 let PackedBundle::<TYPES> {
401 encoded_transactions,
402 metadata,
403 view_number,
404 ..
405 } = packed_bundle;
406 let view_number = *view_number;
407
408 let encoded_transactions_hash = Sha256::digest(encoded_transactions);
410
411 let signature =
413 TYPES::SignatureKey::sign(&self.private_key, &encoded_transactions_hash)
414 .wrap()?;
415
416 let epoch = self.cur_epoch;
417 let leader = self
418 .membership_coordinator
419 .membership_for_epoch(epoch)
420 .context(warn!("No stake table for epoch"))?
421 .leader(view_number)?;
422 if leader != self.public_key {
423 tracing::debug!(
424 "We are not the leader in the current epoch. Do not send the DA proposal"
425 );
426 return Ok(());
427 }
428 let consensus_reader = self.consensus.read().await;
429 let high_qc_block_number = consensus_reader.high_qc().data.block_number;
430 let epoch_transition_indicator = if self.upgrade_lock.epochs_enabled(view_number) {
435 match (high_qc_block_number, self.cur_epoch) {
436 (Some(block_number), Some(cur_epoch)) => {
437 let epoch = epoch_from_block_number(
438 block_number,
439 *self.membership_coordinator.epoch_height(),
440 );
441 if epoch < *cur_epoch {
442 EpochTransitionIndicator::NotInTransition
444 } else if !is_last_block(
445 block_number,
446 *self.membership_coordinator.epoch_height(),
447 ) && is_ge_epoch_root(
448 block_number,
449 *self.membership_coordinator.epoch_height(),
450 ) {
451 EpochTransitionIndicator::InTransition
452 } else {
453 EpochTransitionIndicator::NotInTransition
454 }
455 },
456 _ => EpochTransitionIndicator::NotInTransition,
457 }
458 } else {
459 EpochTransitionIndicator::NotInTransition
460 };
461
462 drop(consensus_reader);
463
464 let data: DaProposal2<TYPES> = DaProposal2 {
465 encoded_transactions: Arc::clone(encoded_transactions),
466 metadata: metadata.clone(),
467 view_number,
469 epoch,
470 epoch_transition_indicator,
471 };
472
473 let message = Proposal {
474 data,
475 signature,
476 _pd: PhantomData,
477 };
478
479 broadcast_event(
480 Arc::new(HotShotEvent::DaProposalSend(
481 message.clone(),
482 self.public_key.clone(),
483 )),
484 &event_stream,
485 )
486 .await;
487 let payload_with_metadata = Arc::new(PayloadWithMetadata {
488 payload: TYPES::BlockPayload::from_bytes(
489 encoded_transactions.as_ref(),
490 metadata,
491 ),
492 metadata: metadata.clone(),
493 });
494 let update_result = self
496 .consensus
497 .write()
498 .await
499 .update_saved_payloads(view_number, payload_with_metadata);
500 if let Err(e) = update_result {
501 tracing::trace!("{e:?}");
502 }
503 },
504 _ => {},
505 }
506 Ok(())
507 }
508}
509
510#[async_trait]
512impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for DaTaskState<TYPES, I> {
513 type Event = HotShotEvent<TYPES>;
514
515 async fn handle_event(
516 &mut self,
517 event: Arc<Self::Event>,
518 sender: &Sender<Arc<Self::Event>>,
519 _receiver: &Receiver<Arc<Self::Event>>,
520 ) -> Result<()> {
521 if self.upgrade_lock.new_protocol_active(self.cur_view) {
522 return Ok(());
523 }
524 self.handle(event, sender.clone()).await
525 }
526
527 fn cancel_subtasks(&mut self) {}
528}