1use std::{
8 sync::Arc,
9 time::{Duration, Instant},
10};
11
12use async_broadcast::{Receiver, Sender};
13use async_trait::async_trait;
14use futures::{StreamExt, stream::FuturesUnordered};
15use hotshot_builder_api::v0_1::block_info::AvailableBlockInfo;
16use hotshot_task::task::TaskState;
17use hotshot_types::{
18 consensus::OuterConsensus,
19 data::{EpochNumber, PackedBundle, VidCommitment, ViewNumber, null_block},
20 epoch_membership::EpochMembershipCoordinator,
21 event::{Event, EventType},
22 message::UpgradeLock,
23 traits::{
24 BlockPayload,
25 block_contents::{BlockHeader, BuilderFee, EncodeBytes},
26 node_implementation::NodeType,
27 signature_key::{BuilderSignatureKey, SignatureKey},
28 },
29 utils::{ViewInner, is_epoch_transition, is_last_block},
30};
31use hotshot_utils::anytrace::*;
32use tokio::time::{sleep, timeout};
33use tracing::instrument;
34use vbs::version::Version;
35use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_VERSION};
36
37use crate::{
38 builder::v0_1::BuilderClient as BuilderClientBase,
39 events::{HotShotEvent, HotShotTaskCompleted},
40 helpers::broadcast_event,
41};
42
43const BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND: usize = 2;
47const BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR: usize = 3;
49const BUILDER_MAIN_BATCH_CUTOFF: Duration = Duration::from_millis(700);
51const BUILDER_ADDITIONAL_TIME_MULTIPLIER: f32 = 0.2;
53const BUILDER_MINIMUM_QUERY_TIME: Duration = Duration::from_millis(300);
56const RETRY_DELAY: Duration = Duration::from_millis(100);
58
59pub struct BuilderResponse<TYPES: NodeType> {
61 pub fee: BuilderFee<TYPES>,
63
64 pub block_payload: TYPES::BlockPayload,
66
67 pub metadata: <TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
69}
70
71pub struct TransactionTaskState<TYPES: NodeType> {
73 pub builder_timeout: Duration,
75
76 pub output_event_stream: async_broadcast::Sender<Event<TYPES>>,
78
79 pub cur_view: ViewNumber,
81
82 pub cur_epoch: Option<EpochNumber>,
84
85 pub consensus: OuterConsensus<TYPES>,
87
88 pub membership_coordinator: EpochMembershipCoordinator<TYPES>,
90
91 pub builder_clients: Vec<BuilderClientBase<TYPES>>,
93
94 pub public_key: TYPES::SignatureKey,
96
97 pub private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
99
100 pub instance_state: Arc<TYPES::InstanceState>,
102
103 pub id: u64,
105
106 pub upgrade_lock: UpgradeLock<TYPES>,
108
109 pub epoch_height: u64,
111}
112
113impl<TYPES: NodeType> TransactionTaskState<TYPES> {
114 pub async fn handle_view_change(
116 &mut self,
117 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
118 block_view: ViewNumber,
119 block_epoch: Option<EpochNumber>,
120 vid: Option<VidCommitment>,
121 ) -> Option<HotShotTaskCompleted> {
122 self.handle_view_change_legacy(event_stream, block_view, block_epoch, vid)
123 .await
124 }
125
126 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Transaction task", level = "error", target = "TransactionTaskState")]
128 pub async fn handle_view_change_legacy(
129 &mut self,
130 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
131 block_view: ViewNumber,
132 block_epoch: Option<EpochNumber>,
133 vid: Option<VidCommitment>,
134 ) -> Option<HotShotTaskCompleted> {
135 let version = match self.upgrade_lock.version(block_view) {
136 Ok(v) => v,
137 Err(err) => {
138 tracing::error!(
139 "Upgrade certificate requires unsupported version, refusing to request \
140 blocks: {err}"
141 );
142 return None;
143 },
144 };
145
146 if version >= EPOCH_VERSION {
149 let Some(epoch) = block_epoch else {
150 tracing::error!("Epoch is required for epoch-based view change");
151 return None;
152 };
153 let high_qc = self.consensus.read().await.high_qc().clone();
154 let mut high_qc_block_number = if let Some(bn) = high_qc.data.block_number {
155 bn
156 } else {
157 if block_view
160 > self
161 .upgrade_lock
162 .upgrade_view()
163 .unwrap_or(ViewNumber::new(0))
164 + 1
165 {
166 tracing::warn!("High QC in epoch version and not the first QC after upgrade");
167 self.send_empty_block(event_stream, block_view, block_epoch, version)
168 .await;
169 return None;
170 }
171 0
173 };
174 high_qc_block_number = std::cmp::max(
175 high_qc_block_number,
176 self.consensus.read().await.highest_block,
177 );
178 if self
179 .consensus
180 .read()
181 .await
182 .transition_qc()
183 .is_some_and(|qc| {
184 let Some(e) = qc.0.data.epoch else {
185 return false;
186 };
187 e == epoch
188 })
189 || is_epoch_transition(high_qc_block_number, self.epoch_height)
190 {
191 if !is_last_block(high_qc_block_number, self.epoch_height) {
193 tracing::info!(
194 "Sending empty block event. View number: {block_view}. Parent Block \
195 number: {high_qc_block_number}"
196 );
197 self.send_empty_block(event_stream, block_view, block_epoch, version)
198 .await;
199 return None;
200 }
201 }
202 }
203
204 let block = {
206 if self
207 .upgrade_lock
208 .decided_upgrade_cert()
209 .as_ref()
210 .is_some_and(|cert| cert.upgrading_in(block_view))
211 {
212 None
213 } else {
214 self.wait_for_block(block_view, vid).await
215 }
216 };
217
218 if let Some(BuilderResponse {
219 block_payload,
220 metadata,
221 fee,
222 }) = block
223 {
224 broadcast_event(
225 Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
226 block_payload.encode(),
227 metadata,
228 block_view,
229 block_epoch,
230 vec1::vec1![fee],
231 ))),
232 event_stream,
233 )
234 .await;
235 } else {
236 self.send_empty_block(event_stream, block_view, block_epoch, version)
237 .await;
238 };
239
240 return None;
241 }
242
243 async fn send_empty_block(
245 &self,
246 event_stream: &Sender<Arc<HotShotEvent<TYPES>>>,
247 block_view: ViewNumber,
248 block_epoch: Option<EpochNumber>,
249 version: Version,
250 ) {
251 tracing::info!("Failed to get a block for view {block_view}, proposing empty block");
253
254 self.consensus
256 .write()
257 .await
258 .metrics
259 .number_of_empty_blocks_proposed
260 .add(1);
261
262 let num_storage_nodes = match self
263 .membership_coordinator
264 .stake_table_for_epoch(block_epoch)
265 {
266 Ok(epoch_stake_table) => epoch_stake_table.total_nodes(),
267 Err(e) => {
268 tracing::warn!("Failed to get num_storage_nodes for epoch {block_epoch:?}: {e}");
269 return;
270 },
271 };
272
273 let Some(null_fee) = null_block::builder_fee::<TYPES>(num_storage_nodes, version) else {
274 tracing::error!("Failed to get null fee");
275 return;
276 };
277
278 let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
280
281 broadcast_event(
283 Arc::new(HotShotEvent::BlockRecv(PackedBundle::new(
284 vec![].into(),
285 metadata,
286 block_view,
287 block_epoch,
288 vec1::vec1![null_fee],
289 ))),
290 event_stream,
291 )
292 .await;
293 }
294
295 pub async fn null_block(
297 &self,
298 block_view: ViewNumber,
299 block_epoch: Option<EpochNumber>,
300 version: Version,
301 num_storage_nodes: usize,
302 ) -> Option<PackedBundle<TYPES>> {
303 let Some(null_fee) = null_block::builder_fee::<TYPES>(num_storage_nodes, version) else {
304 tracing::error!("Failed to calculate null block fee.");
305 return None;
306 };
307
308 let (_, metadata) = <TYPES as NodeType>::BlockPayload::empty();
310
311 Some(PackedBundle::new(
312 vec![].into(),
313 metadata,
314 block_view,
315 block_epoch,
316 vec1::vec1![null_fee],
317 ))
318 }
319
320 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view, epoch = self.cur_epoch.map(|x| *x)), name = "Transaction task", level = "error", target = "TransactionTaskState")]
322 pub async fn handle(
323 &mut self,
324 event: Arc<HotShotEvent<TYPES>>,
325 event_stream: Sender<Arc<HotShotEvent<TYPES>>>,
326 ) -> Result<()> {
327 match event.as_ref() {
328 HotShotEvent::TransactionsRecv(transactions) => {
329 broadcast_event(
330 Event {
331 view_number: self.cur_view,
332 event: EventType::Transactions {
333 transactions: transactions.clone(),
334 },
335 },
336 &self.output_event_stream,
337 )
338 .await;
339 },
340 HotShotEvent::ViewChange(view, epoch) => {
341 let view = ViewNumber::new(std::cmp::max(1, **view));
342 ensure!(
343 *view > *self.cur_view && *epoch >= self.cur_epoch,
344 debug!(
345 "Received a view change to an older view and epoch: tried to change view \
346 to {view}and epoch {epoch:?} though we are at view {} and epoch {:?}",
347 self.cur_view, self.cur_epoch
348 )
349 );
350 self.cur_view = view;
351 self.cur_epoch = *epoch;
352
353 let leader = self
354 .membership_coordinator
355 .membership_for_epoch(*epoch)?
356 .leader(view)?;
357 if leader == self.public_key {
358 self.handle_view_change(&event_stream, view, *epoch, None)
359 .await;
360 return Ok(());
361 }
362 },
363 HotShotEvent::QuorumProposalValidated(proposal, _leaf) => {
364 let view_number = proposal.data.view_number();
365 let next_view = view_number + 1;
366
367 let version = match self.upgrade_lock.version(next_view) {
368 Ok(v) => v,
369 Err(e) => {
370 tracing::error!("Failed to calculate version: {e:?}");
371 return Ok(());
372 },
373 };
374
375 if version < DRB_AND_HEADER_UPGRADE_VERSION {
376 return Ok(());
377 }
378
379 let vid = proposal.data.block_header().payload_commitment();
380 let block_height = proposal.data.block_header().block_number();
381 if is_epoch_transition(block_height, self.epoch_height) {
382 return Ok(());
383 }
384 if next_view <= self.cur_view {
385 return Ok(());
386 }
387 self.cur_view = next_view;
389
390 let leader = self
391 .membership_coordinator
392 .membership_for_epoch(self.cur_epoch)?
393 .leader(next_view)?;
394 if leader == self.public_key {
395 self.handle_view_change(&event_stream, next_view, self.cur_epoch, Some(vid))
396 .await;
397 return Ok(());
398 }
399 },
400 _ => {},
401 }
402 Ok(())
403 }
404
405 #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
408 async fn last_vid_commitment_retry(
409 &self,
410 block_view: ViewNumber,
411 task_start_time: Instant,
412 ) -> Result<(ViewNumber, VidCommitment)> {
413 loop {
414 match self.last_vid_commitment(block_view).await {
415 Ok((view, comm)) => break Ok((view, comm)),
416 Err(e) if task_start_time.elapsed() >= self.builder_timeout => break Err(e),
417 _ => {
418 sleep(RETRY_DELAY).await;
420 continue;
421 },
422 }
423 }
424 }
425
426 #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))]
429 async fn last_vid_commitment(
430 &self,
431 block_view: ViewNumber,
432 ) -> Result<(ViewNumber, VidCommitment)> {
433 let consensus_reader = self.consensus.read().await;
434 let mut target_view = ViewNumber::new(block_view.saturating_sub(1));
435
436 loop {
437 let view_data = consensus_reader
438 .validated_state_map()
439 .get(&target_view)
440 .context(info!(
441 "Missing record for view {target_view} in validated state",
442 ))?;
443
444 match &view_data.view_inner {
445 ViewInner::Da {
446 payload_commitment, ..
447 } => return Ok((target_view, *payload_commitment)),
448 ViewInner::Leaf {
449 leaf: leaf_commitment,
450 ..
451 } => {
452 let leaf = consensus_reader
453 .saved_leaves()
454 .get(leaf_commitment)
455 .context(info!(
456 "Missing leaf with commitment {leaf_commitment} for view \
457 {target_view} in saved_leaves",
458 ))?;
459 return Ok((target_view, leaf.payload_commitment()));
460 },
461 ViewInner::Failed => {
462 target_view = ViewNumber::new(target_view.checked_sub(1).context(warn!(
464 "Reached genesis. Something is wrong -- have we not decided any blocks \
465 since genesis?"
466 ))?);
467 continue;
468 },
469 }
470 }
471 }
472
473 #[instrument(skip_all, fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view), name = "wait_for_block", level = "error")]
474 async fn wait_for_block(
475 &self,
476 block_view: ViewNumber,
477 vid: Option<VidCommitment>,
478 ) -> Option<BuilderResponse<TYPES>> {
479 let task_start_time = Instant::now();
480
481 let (parent_view, parent_comm) = if let Some(vid) = vid {
483 (block_view - 1, vid)
484 } else {
485 match self
486 .last_vid_commitment_retry(block_view, task_start_time)
487 .await
488 {
489 Ok((v, c)) => (v, c),
490 Err(e) => {
491 tracing::warn!("Failed to find last vid commitment in time: {e}");
492 return None;
493 },
494 }
495 };
496
497 let parent_comm_sig = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
498 &self.private_key,
499 parent_comm.as_ref(),
500 ) {
501 Ok(sig) => sig,
502 Err(err) => {
503 tracing::error!(%err, "Failed to sign block hash");
504 return None;
505 },
506 };
507
508 while task_start_time.elapsed() < self.builder_timeout {
509 match timeout(
510 self.builder_timeout
511 .saturating_sub(task_start_time.elapsed()),
512 self.block_from_builder(parent_comm, parent_view, &parent_comm_sig),
513 )
514 .await
515 {
516 Ok(Ok(block)) => {
518 return Some(block);
519 },
520
521 Ok(Err(err)) => {
523 tracing::info!("Couldn't get a block: {err:#}");
524 sleep(RETRY_DELAY).await;
526 continue;
527 },
528
529 Err(err) => {
531 tracing::info!(%err, "Timeout while getting available blocks");
532 return None;
533 },
534 }
535 }
536
537 tracing::warn!("could not get a block from the builder in time");
538 None
539 }
540
541 async fn get_available_blocks(
544 &self,
545 parent_comm: VidCommitment,
546 view_number: ViewNumber,
547 parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
548 ) -> Vec<(AvailableBlockInfo<TYPES>, usize)> {
549 let tasks = self
550 .builder_clients
551 .iter()
552 .enumerate()
553 .map(|(builder_idx, client)| async move {
554 client
555 .available_blocks(
556 parent_comm,
557 view_number.u64(),
558 self.public_key.clone(),
559 parent_comm_sig,
560 )
561 .await
562 .map(move |blocks| {
563 blocks
564 .into_iter()
565 .map(move |block_info| (block_info, builder_idx))
566 })
567 })
568 .collect::<FuturesUnordered<_>>();
569 let mut results = Vec::with_capacity(self.builder_clients.len());
570 let query_start = Instant::now();
571 let threshold = (self.builder_clients.len() * BUILDER_MAIN_BATCH_THRESHOLD_DIVIDEND)
572 .div_ceil(BUILDER_MAIN_BATCH_THRESHOLD_DIVISOR);
573 let mut tasks = tasks.take(threshold);
574 while let Some(result) = tasks.next().await {
575 results.push(result);
576 if query_start.elapsed() > BUILDER_MAIN_BATCH_CUTOFF {
577 break;
578 }
579 }
580 let timeout = sleep(std::cmp::max(
581 query_start
582 .elapsed()
583 .mul_f32(BUILDER_ADDITIONAL_TIME_MULTIPLIER),
584 BUILDER_MINIMUM_QUERY_TIME.saturating_sub(query_start.elapsed()),
585 ));
586 futures::pin_mut!(timeout);
587 let mut tasks = tasks.into_inner().take_until(timeout);
588 while let Some(result) = tasks.next().await {
589 results.push(result);
590 }
591 results
592 .into_iter()
593 .filter_map(|result| result.ok())
594 .flatten()
595 .collect::<Vec<_>>()
596 }
597
598 #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "block_from_builder", level = "error")]
606 async fn block_from_builder(
607 &self,
608 parent_comm: VidCommitment,
609 view_number: ViewNumber,
610 parent_comm_sig: &<<TYPES as NodeType>::SignatureKey as SignatureKey>::PureAssembledSignatureType,
611 ) -> Result<BuilderResponse<TYPES>> {
612 let mut available_blocks = self
613 .get_available_blocks(parent_comm, view_number, parent_comm_sig)
614 .await;
615
616 available_blocks.sort_by(|(l, _), (r, _)| {
617 (u128::from(l.offered_fee) * u128::from(r.block_size))
625 .cmp(&(u128::from(r.offered_fee) * u128::from(l.block_size)))
626 });
627
628 if available_blocks.is_empty() {
629 tracing::info!("No available blocks");
630 bail!("No available blocks");
631 }
632
633 for (block_info, builder_idx) in available_blocks {
634 if !block_info.sender.validate_block_info_signature(
636 &block_info.signature,
637 block_info.block_size,
638 block_info.offered_fee,
639 &block_info.block_hash,
640 ) {
641 tracing::warn!("Failed to verify available block info response message signature");
642 continue;
643 }
644
645 let request_signature = match <<TYPES as NodeType>::SignatureKey as SignatureKey>::sign(
646 &self.private_key,
647 block_info.block_hash.as_ref(),
648 ) {
649 Ok(request_signature) => request_signature,
650 Err(err) => {
651 tracing::error!(%err, "Failed to sign block hash");
652 continue;
653 },
654 };
655
656 let response = {
657 let client = &self.builder_clients[builder_idx];
658
659 let (block, either_header_input) = futures::join! {
660 client.claim_block(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature),
661 client.claim_either_block_header_input(block_info.block_hash.clone(), view_number.u64(), self.public_key.clone(), &request_signature)
662 };
663
664 let block_data = match block {
665 Ok(block_data) => block_data,
666 Err(err) => {
667 tracing::warn!(%err, "Error claiming block data");
668 continue;
669 },
670 };
671
672 let Ok(either_header_input) = either_header_input
673 .inspect_err(|err| tracing::warn!(%err, "Error claiming header input"))
674 else {
675 continue;
676 };
677
678 let Some(header_input) = either_header_input
679 .validate_signature_and_get_input(block_info.offered_fee, &block_data.metadata)
680 else {
681 tracing::warn!(
682 "Failed to verify available new or legacy block header input data \
683 response message signature"
684 );
685 continue;
686 };
687
688 if !block_data.validate_signature() {
690 tracing::warn!(
691 "Failed to verify available block data response message signature"
692 );
693 continue;
694 }
695
696 let fee = BuilderFee {
697 fee_amount: block_info.offered_fee,
698 fee_account: header_input.sender,
699 fee_signature: header_input.fee_signature,
700 };
701
702 BuilderResponse {
703 fee,
704 block_payload: block_data.block_payload,
705 metadata: block_data.metadata,
706 }
707 };
708
709 return Ok(response);
710 }
711
712 bail!("Couldn't claim a block from any of the builders");
713 }
714}
715
716#[async_trait]
717impl<TYPES: NodeType> TaskState for TransactionTaskState<TYPES> {
719 type Event = HotShotEvent<TYPES>;
720
721 async fn handle_event(
722 &mut self,
723 event: Arc<Self::Event>,
724 sender: &Sender<Arc<Self::Event>>,
725 _receiver: &Receiver<Arc<Self::Event>>,
726 ) -> Result<()> {
727 if self.upgrade_lock.new_protocol_active(self.cur_view) {
728 return Ok(());
729 }
730 self.handle(event, sender.clone()).await
731 }
732
733 fn cancel_subtasks(&mut self) {}
734}