1use std::{
2 collections::{BTreeMap, HashMap, HashSet, VecDeque},
3 sync::Arc,
4 time::Duration,
5};
6
7use committable::{Commitment, Committable};
8use hotshot::traits::{BlockPayload, ValidatedState as _};
9use hotshot_types::{
10 consensus::PayloadWithMetadata,
11 data::{
12 EpochNumber, Leaf2, VidCommitment, ViewNumber, vid_commitment,
13 vid_disperse::vid_total_weight,
14 },
15 epoch_membership::EpochMembershipCoordinator,
16 message::UpgradeLock,
17 traits::{
18 EncodeBytes,
19 block_contents::{BuilderFee, Transaction},
20 node_implementation::NodeType,
21 signature_key::BuilderSignatureKey,
22 },
23 utils::BuilderCommitment,
24};
25use tokio::{
26 task::{AbortHandle, JoinSet},
27 time::sleep,
28};
29use tracing::{error, warn};
30
31use crate::{
32 consensus::ConsensusInput,
33 helpers::proposal_commitment,
34 message::{DedupManifest, Proposal, TransactionMessage},
35 state::HeaderRequest,
36};
37
38#[derive(Debug, thiserror::Error)]
39pub enum BlockError {
40 #[error("payload construction failed: {0}")]
41 PayloadConstruction(String),
42 #[error("stake table unavailable")]
43 StakeTableUnavailable,
44 #[error("builder signature failed")]
45 BuilderSignature,
46}
47
48#[derive(Clone, Eq, PartialEq, Debug)]
49pub struct BlockAndHeaderRequest<T: NodeType> {
50 pub view: ViewNumber,
51 pub epoch: EpochNumber,
52 pub parent_proposal: Proposal<T>,
53}
54
55pub struct BlockBuilderOutput<T: NodeType> {
56 pub view: ViewNumber,
57 pub epoch: EpochNumber,
58 pub payload: PayloadWithMetadata<T>,
59 pub parent_proposal: Proposal<T>,
60 pub builder_commitment: BuilderCommitment,
61 pub builder_fee: BuilderFee<T>,
62 pub payload_commitment: VidCommitment,
63 pub manifest: DedupManifest<T>,
64}
65
66pub struct BlockBuilderConfig {
67 pub max_retry_bytes: u64,
68 pub max_leader_bytes: u64,
69 pub ttl: u64,
70 pub dedup_window_size: u64,
71}
72
73impl Default for BlockBuilderConfig {
74 fn default() -> Self {
75 Self {
76 max_retry_bytes: 100 * 1024 * 1024,
77 max_leader_bytes: 2 * 1024 * 1024,
78 ttl: 50,
79 dedup_window_size: 10,
80 }
81 }
82}
83
84struct RetryEntry<T: NodeType> {
85 tx: T::Transaction,
86 valid_until: ViewNumber,
87 size: u64,
88}
89
90pub struct BlockBuilder<T: NodeType> {
91 instance: Arc<T::InstanceState>,
92 membership: EpochMembershipCoordinator<T>,
93 retry_pending: HashMap<Commitment<T::Transaction>, RetryEntry<T>>,
94 retry_total_bytes: u64,
95 leader_buffer: HashMap<Commitment<T::Transaction>, T::Transaction>,
96 leader_total_bytes: u64,
97 dedup_set: HashSet<Commitment<T::Transaction>>,
98 dedup_views: VecDeque<(ViewNumber, Vec<Commitment<T::Transaction>>)>,
99 config: BlockBuilderConfig,
100 upgrade_lock: UpgradeLock<T>,
101 current_view: ViewNumber,
102 calculations: BTreeMap<(ViewNumber, Commitment<Leaf2<T>>), AbortHandle>,
107 tasks: JoinSet<Result<BlockBuilderOutput<T>, BlockError>>,
108}
109
110impl<T: NodeType> BlockBuilder<T> {
111 pub fn new(
112 instance: Arc<T::InstanceState>,
113 membership: EpochMembershipCoordinator<T>,
114 config: BlockBuilderConfig,
115 upgrade_lock: UpgradeLock<T>,
116 ) -> Self {
117 Self {
118 instance,
119 membership,
120 config,
121 upgrade_lock,
122 retry_pending: HashMap::new(),
123 retry_total_bytes: 0,
124 leader_buffer: HashMap::new(),
125 leader_total_bytes: 0,
126 dedup_set: HashSet::new(),
127 dedup_views: VecDeque::new(),
128 current_view: ViewNumber::genesis(),
129 calculations: BTreeMap::new(),
130 tasks: JoinSet::new(),
131 }
132 }
133
134 pub fn request_block(&mut self, request: BlockAndHeaderRequest<T>) {
135 let view = request.view;
136 let parent_commitment = proposal_commitment(&request.parent_proposal);
137 if self.calculations.contains_key(&(view, parent_commitment)) {
138 return;
139 }
140 let Ok(version) = self.upgrade_lock.version(view) else {
141 warn!(%view, "unsupported version");
142 return;
143 };
144 let epoch = request.epoch;
145 let buffer = std::mem::take(&mut self.leader_buffer);
146 self.leader_total_bytes = 0;
147 let instance = self.instance.clone();
148 let membership = self.membership.clone();
149
150 let handle = self.tasks.spawn(async move {
151 if buffer.is_empty() {
155 sleep(Duration::from_millis(500)).await;
156 }
157 let (hashes, txs): (Vec<_>, Vec<_>) = buffer.into_iter().unzip();
158 let manifest = DedupManifest {
159 view,
160 epoch,
161 hashes,
162 };
163
164 let validated_state =
165 T::ValidatedState::from_header(&request.parent_proposal.block_header);
166 let (payload, metadata) =
167 T::BlockPayload::from_transactions(txs, &validated_state, &instance)
168 .await
169 .map_err(|e| BlockError::PayloadConstruction(e.to_string()))?;
170 let payload: PayloadWithMetadata<T> = PayloadWithMetadata { payload, metadata };
171
172 let payload_bytes = payload.payload.encode();
173 let metadata_bytes = payload.metadata.encode();
174
175 let total_weight = {
176 let target_mem = membership
177 .stake_table_for_epoch(Some(epoch))
178 .map_err(|_| BlockError::StakeTableUnavailable)?;
179 vid_total_weight(target_mem.stake_table(), Some(epoch))
180 };
181 let payload_commitment = {
182 vid_commitment(
183 payload_bytes.as_ref(),
184 metadata_bytes.as_ref(),
185 total_weight,
186 version,
187 )
188 };
189
190 let builder_commitment = payload.payload.builder_commitment(&payload.metadata);
191 let (builder_key, builder_private_key) =
192 T::BuilderSignatureKey::generated_from_seed_indexed([0u8; 32], 0);
193 let block_size = payload_bytes.len() as u64;
194 let offered_fee = block_size;
195 let builder_fee = BuilderFee {
196 fee_amount: offered_fee,
197 fee_account: builder_key,
198 fee_signature: T::BuilderSignatureKey::sign_fee(
199 &builder_private_key,
200 offered_fee,
201 &payload.metadata,
202 )
203 .map_err(|_| BlockError::BuilderSignature)?,
204 };
205 Ok(BlockBuilderOutput {
206 view,
207 epoch,
208 payload,
209 parent_proposal: request.parent_proposal,
210 builder_commitment,
211 builder_fee,
212 payload_commitment,
213 manifest,
214 })
215 });
216 self.calculations.insert((view, parent_commitment), handle);
217 }
218
219 pub async fn next(&mut self) -> Option<Result<BlockBuilderOutput<T>, BlockError>> {
220 loop {
221 match self.tasks.join_next().await {
222 Some(Ok(result)) => return Some(result),
223 Some(Err(err)) => {
224 if err.is_panic() {
225 error!(%err, "block builder task panicked");
226 }
227 },
228 None => return None,
229 }
230 }
231 }
232
233 pub fn gc(&mut self, view_number: ViewNumber) {
234 self.calculations.retain(|(view, _), handle| {
235 if *view < view_number {
236 handle.abort();
237 false
238 } else {
239 true
240 }
241 });
242 }
243
244 pub fn on_submit_transaction(&mut self, tx: T::Transaction) {
245 let hash = tx.commit();
246
247 if self.retry_pending.contains_key(&hash) {
248 return;
249 }
250
251 let size = tx.minimum_block_size();
252 if self.retry_total_bytes + size > self.config.max_retry_bytes {
253 warn!("retry buffer full, rejecting {hash}");
254 return;
255 }
256
257 let valid_until = self.current_view + self.config.ttl;
258
259 self.retry_total_bytes += size;
260 self.retry_pending.insert(
261 hash,
262 RetryEntry {
263 tx,
264 valid_until,
265 size,
266 },
267 );
268 }
269
270 pub fn on_transactions(&mut self, msg: TransactionMessage<T>) {
271 for tx in msg.transactions {
272 let hash = tx.commit();
273
274 if self.dedup_set.contains(&hash) {
275 continue;
276 }
277
278 if self.leader_buffer.contains_key(&hash) {
279 continue;
280 }
281
282 let size = tx.minimum_block_size();
283 if self.leader_total_bytes + size > self.config.max_leader_bytes {
284 continue;
285 }
286
287 self.leader_total_bytes += size;
288 self.leader_buffer.insert(hash, tx);
289 }
290 }
291
292 pub fn on_dedup_manifest(&mut self, manifest: DedupManifest<T>) {
293 let DedupManifest { view, hashes, .. } = manifest;
294
295 for hash in &hashes {
296 if let Some(tx) = self.leader_buffer.remove(hash) {
297 self.leader_total_bytes -= tx.minimum_block_size();
298 }
299 }
300
301 self.dedup_set.extend(hashes.iter().copied());
302 self.dedup_views.push_back((view, hashes));
303
304 let current = self.current_view.u64();
305 let window = self.config.dedup_window_size;
306
307 while let Some((view, hashes)) = self.dedup_views.pop_front() {
308 if current.saturating_sub(view.u64()) <= window {
309 self.dedup_views.push_front((view, hashes));
310 break;
311 }
312 for hash in &hashes {
313 self.dedup_set.remove(hash);
314 }
315 }
316 }
317
318 pub fn on_view_changed(
319 &mut self,
320 view: ViewNumber,
321 _epoch: EpochNumber,
322 ) -> Vec<T::Transaction> {
323 self.current_view = view;
324
325 let mut expired_bytes = 0u64;
326 self.retry_pending.retain(|_, entry| {
327 if view > entry.valid_until {
328 expired_bytes += entry.size;
329 false
330 } else {
331 true
332 }
333 });
334 self.retry_total_bytes -= expired_bytes;
335
336 self.retry_pending
337 .values()
338 .map(|entry| entry.tx.clone())
339 .collect()
340 }
341
342 pub fn on_block_reconstructed(&mut self, tx_commitments: Vec<Commitment<T::Transaction>>) {
343 for hash in tx_commitments {
344 if let Some(entry) = self.retry_pending.remove(&hash) {
345 self.retry_total_bytes = self.retry_total_bytes.saturating_sub(entry.size);
346 }
347 }
348 }
349
350 pub fn drain(
351 &mut self,
352 view: ViewNumber,
353 epoch: EpochNumber,
354 ) -> (Vec<T::Transaction>, DedupManifest<T>) {
355 let (hashes, txs) = self.leader_buffer.drain().unzip();
356 self.leader_total_bytes = 0;
357
358 let manifest = DedupManifest {
359 view,
360 epoch,
361 hashes,
362 };
363
364 (txs, manifest)
365 }
366}
367
368impl<T: NodeType> From<&BlockBuilderOutput<T>> for HeaderRequest<T> {
369 fn from(output: &BlockBuilderOutput<T>) -> Self {
370 HeaderRequest {
371 view: output.view,
372 epoch: output.epoch,
373 parent_proposal: output.parent_proposal.clone(),
374 payload_commitment: output.payload_commitment,
375 builder_commitment: output.builder_commitment.clone(),
376 metadata: output.payload.metadata.clone(),
377 builder_fee: output.builder_fee.clone(),
378 }
379 }
380}
381
382impl<T: NodeType> From<BlockBuilderOutput<T>> for ConsensusInput<T> {
383 fn from(output: BlockBuilderOutput<T>) -> Self {
384 ConsensusInput::BlockBuilt {
385 view: output.view,
386 epoch: output.epoch,
387 payload: output.payload.payload,
388 metadata: output.payload.metadata,
389 }
390 }
391}