Skip to main content

hotshot_new_protocol/
block.rs

1use std::{
2    collections::{BTreeMap, HashMap, HashSet, VecDeque},
3    sync::Arc,
4    time::Duration,
5};
6
7use committable::{Commitment, Committable};
8use hotshot::traits::{BlockPayload, ValidatedState as _};
9use hotshot_types::{
10    consensus::PayloadWithMetadata,
11    data::{
12        EpochNumber, Leaf2, VidCommitment, ViewNumber, vid_commitment,
13        vid_disperse::vid_total_weight,
14    },
15    epoch_membership::EpochMembershipCoordinator,
16    message::UpgradeLock,
17    traits::{
18        EncodeBytes,
19        block_contents::{BuilderFee, Transaction},
20        node_implementation::NodeType,
21        signature_key::BuilderSignatureKey,
22    },
23    utils::BuilderCommitment,
24};
25use tokio::{
26    task::{AbortHandle, JoinSet},
27    time::sleep,
28};
29use tracing::{error, warn};
30
31use crate::{
32    consensus::ConsensusInput,
33    helpers::proposal_commitment,
34    message::{DedupManifest, Proposal, TransactionMessage},
35    state::HeaderRequest,
36};
37
38#[derive(Debug, thiserror::Error)]
39pub enum BlockError {
40    #[error("payload construction failed: {0}")]
41    PayloadConstruction(String),
42    #[error("stake table unavailable")]
43    StakeTableUnavailable,
44    #[error("builder signature failed")]
45    BuilderSignature,
46}
47
48#[derive(Clone, Eq, PartialEq, Debug)]
49pub struct BlockAndHeaderRequest<T: NodeType> {
50    pub view: ViewNumber,
51    pub epoch: EpochNumber,
52    pub parent_proposal: Proposal<T>,
53}
54
55pub struct BlockBuilderOutput<T: NodeType> {
56    pub view: ViewNumber,
57    pub epoch: EpochNumber,
58    pub payload: PayloadWithMetadata<T>,
59    pub parent_proposal: Proposal<T>,
60    pub builder_commitment: BuilderCommitment,
61    pub builder_fee: BuilderFee<T>,
62    pub payload_commitment: VidCommitment,
63    pub manifest: DedupManifest<T>,
64}
65
66pub struct BlockBuilderConfig {
67    pub max_retry_bytes: u64,
68    pub max_leader_bytes: u64,
69    pub ttl: u64,
70    pub dedup_window_size: u64,
71}
72
73impl Default for BlockBuilderConfig {
74    fn default() -> Self {
75        Self {
76            max_retry_bytes: 100 * 1024 * 1024,
77            max_leader_bytes: 2 * 1024 * 1024,
78            ttl: 50,
79            dedup_window_size: 10,
80        }
81    }
82}
83
84struct RetryEntry<T: NodeType> {
85    tx: T::Transaction,
86    valid_until: ViewNumber,
87    size: u64,
88}
89
90pub struct BlockBuilder<T: NodeType> {
91    instance: Arc<T::InstanceState>,
92    membership: EpochMembershipCoordinator<T>,
93    retry_pending: HashMap<Commitment<T::Transaction>, RetryEntry<T>>,
94    retry_total_bytes: u64,
95    leader_buffer: HashMap<Commitment<T::Transaction>, T::Transaction>,
96    leader_total_bytes: u64,
97    dedup_set: HashSet<Commitment<T::Transaction>>,
98    dedup_views: VecDeque<(ViewNumber, Vec<Commitment<T::Transaction>>)>,
99    config: BlockBuilderConfig,
100    upgrade_lock: UpgradeLock<T>,
101    current_view: ViewNumber,
102    // Keyed by (view, parent_proposal commitment) so that two requests for
103    // the same view but different parents (e.g. one from
104    // `handle_proposal_with_vid_share` and one from
105    // `handle_timeout_certificate`) don't dedup against each other.
106    calculations: BTreeMap<(ViewNumber, Commitment<Leaf2<T>>), AbortHandle>,
107    tasks: JoinSet<Result<BlockBuilderOutput<T>, BlockError>>,
108}
109
110impl<T: NodeType> BlockBuilder<T> {
111    pub fn new(
112        instance: Arc<T::InstanceState>,
113        membership: EpochMembershipCoordinator<T>,
114        config: BlockBuilderConfig,
115        upgrade_lock: UpgradeLock<T>,
116    ) -> Self {
117        Self {
118            instance,
119            membership,
120            config,
121            upgrade_lock,
122            retry_pending: HashMap::new(),
123            retry_total_bytes: 0,
124            leader_buffer: HashMap::new(),
125            leader_total_bytes: 0,
126            dedup_set: HashSet::new(),
127            dedup_views: VecDeque::new(),
128            current_view: ViewNumber::genesis(),
129            calculations: BTreeMap::new(),
130            tasks: JoinSet::new(),
131        }
132    }
133
134    pub fn request_block(&mut self, request: BlockAndHeaderRequest<T>) {
135        let view = request.view;
136        let parent_commitment = proposal_commitment(&request.parent_proposal);
137        if self.calculations.contains_key(&(view, parent_commitment)) {
138            return;
139        }
140        let Ok(version) = self.upgrade_lock.version(view) else {
141            warn!(%view, "unsupported version");
142            return;
143        };
144        let epoch = request.epoch;
145        let buffer = std::mem::take(&mut self.leader_buffer);
146        self.leader_total_bytes = 0;
147        let instance = self.instance.clone();
148        let membership = self.membership.clone();
149
150        let handle = self.tasks.spawn(async move {
151            // Throttle empty block production: when no transactions are pending,
152            // sleep so the coordinator's event queue doesnot overflow
153            // because if there are no transactions then the block production is way too fast
154            if buffer.is_empty() {
155                sleep(Duration::from_millis(500)).await;
156            }
157            let (hashes, txs): (Vec<_>, Vec<_>) = buffer.into_iter().unzip();
158            let manifest = DedupManifest {
159                view,
160                epoch,
161                hashes,
162            };
163
164            let validated_state =
165                T::ValidatedState::from_header(&request.parent_proposal.block_header);
166            let (payload, metadata) =
167                T::BlockPayload::from_transactions(txs, &validated_state, &instance)
168                    .await
169                    .map_err(|e| BlockError::PayloadConstruction(e.to_string()))?;
170            let payload: PayloadWithMetadata<T> = PayloadWithMetadata { payload, metadata };
171
172            let payload_bytes = payload.payload.encode();
173            let metadata_bytes = payload.metadata.encode();
174
175            let total_weight = {
176                let target_mem = membership
177                    .stake_table_for_epoch(Some(epoch))
178                    .map_err(|_| BlockError::StakeTableUnavailable)?;
179                vid_total_weight(target_mem.stake_table(), Some(epoch))
180            };
181            let payload_commitment = {
182                vid_commitment(
183                    payload_bytes.as_ref(),
184                    metadata_bytes.as_ref(),
185                    total_weight,
186                    version,
187                )
188            };
189
190            let builder_commitment = payload.payload.builder_commitment(&payload.metadata);
191            let (builder_key, builder_private_key) =
192                T::BuilderSignatureKey::generated_from_seed_indexed([0u8; 32], 0);
193            let block_size = payload_bytes.len() as u64;
194            let offered_fee = block_size;
195            let builder_fee = BuilderFee {
196                fee_amount: offered_fee,
197                fee_account: builder_key,
198                fee_signature: T::BuilderSignatureKey::sign_fee(
199                    &builder_private_key,
200                    offered_fee,
201                    &payload.metadata,
202                )
203                .map_err(|_| BlockError::BuilderSignature)?,
204            };
205            Ok(BlockBuilderOutput {
206                view,
207                epoch,
208                payload,
209                parent_proposal: request.parent_proposal,
210                builder_commitment,
211                builder_fee,
212                payload_commitment,
213                manifest,
214            })
215        });
216        self.calculations.insert((view, parent_commitment), handle);
217    }
218
219    pub async fn next(&mut self) -> Option<Result<BlockBuilderOutput<T>, BlockError>> {
220        loop {
221            match self.tasks.join_next().await {
222                Some(Ok(result)) => return Some(result),
223                Some(Err(err)) => {
224                    if err.is_panic() {
225                        error!(%err, "block builder task panicked");
226                    }
227                },
228                None => return None,
229            }
230        }
231    }
232
233    pub fn gc(&mut self, view_number: ViewNumber) {
234        self.calculations.retain(|(view, _), handle| {
235            if *view < view_number {
236                handle.abort();
237                false
238            } else {
239                true
240            }
241        });
242    }
243
244    pub fn on_submit_transaction(&mut self, tx: T::Transaction) {
245        let hash = tx.commit();
246
247        if self.retry_pending.contains_key(&hash) {
248            return;
249        }
250
251        let size = tx.minimum_block_size();
252        if self.retry_total_bytes + size > self.config.max_retry_bytes {
253            warn!("retry buffer full, rejecting {hash}");
254            return;
255        }
256
257        let valid_until = self.current_view + self.config.ttl;
258
259        self.retry_total_bytes += size;
260        self.retry_pending.insert(
261            hash,
262            RetryEntry {
263                tx,
264                valid_until,
265                size,
266            },
267        );
268    }
269
270    pub fn on_transactions(&mut self, msg: TransactionMessage<T>) {
271        for tx in msg.transactions {
272            let hash = tx.commit();
273
274            if self.dedup_set.contains(&hash) {
275                continue;
276            }
277
278            if self.leader_buffer.contains_key(&hash) {
279                continue;
280            }
281
282            let size = tx.minimum_block_size();
283            if self.leader_total_bytes + size > self.config.max_leader_bytes {
284                continue;
285            }
286
287            self.leader_total_bytes += size;
288            self.leader_buffer.insert(hash, tx);
289        }
290    }
291
292    pub fn on_dedup_manifest(&mut self, manifest: DedupManifest<T>) {
293        let DedupManifest { view, hashes, .. } = manifest;
294
295        for hash in &hashes {
296            if let Some(tx) = self.leader_buffer.remove(hash) {
297                self.leader_total_bytes -= tx.minimum_block_size();
298            }
299        }
300
301        self.dedup_set.extend(hashes.iter().copied());
302        self.dedup_views.push_back((view, hashes));
303
304        let current = self.current_view.u64();
305        let window = self.config.dedup_window_size;
306
307        while let Some((view, hashes)) = self.dedup_views.pop_front() {
308            if current.saturating_sub(view.u64()) <= window {
309                self.dedup_views.push_front((view, hashes));
310                break;
311            }
312            for hash in &hashes {
313                self.dedup_set.remove(hash);
314            }
315        }
316    }
317
318    pub fn on_view_changed(
319        &mut self,
320        view: ViewNumber,
321        _epoch: EpochNumber,
322    ) -> Vec<T::Transaction> {
323        self.current_view = view;
324
325        let mut expired_bytes = 0u64;
326        self.retry_pending.retain(|_, entry| {
327            if view > entry.valid_until {
328                expired_bytes += entry.size;
329                false
330            } else {
331                true
332            }
333        });
334        self.retry_total_bytes -= expired_bytes;
335
336        self.retry_pending
337            .values()
338            .map(|entry| entry.tx.clone())
339            .collect()
340    }
341
342    pub fn on_block_reconstructed(&mut self, tx_commitments: Vec<Commitment<T::Transaction>>) {
343        for hash in tx_commitments {
344            if let Some(entry) = self.retry_pending.remove(&hash) {
345                self.retry_total_bytes = self.retry_total_bytes.saturating_sub(entry.size);
346            }
347        }
348    }
349
350    pub fn drain(
351        &mut self,
352        view: ViewNumber,
353        epoch: EpochNumber,
354    ) -> (Vec<T::Transaction>, DedupManifest<T>) {
355        let (hashes, txs) = self.leader_buffer.drain().unzip();
356        self.leader_total_bytes = 0;
357
358        let manifest = DedupManifest {
359            view,
360            epoch,
361            hashes,
362        };
363
364        (txs, manifest)
365    }
366}
367
368impl<T: NodeType> From<&BlockBuilderOutput<T>> for HeaderRequest<T> {
369    fn from(output: &BlockBuilderOutput<T>) -> Self {
370        HeaderRequest {
371            view: output.view,
372            epoch: output.epoch,
373            parent_proposal: output.parent_proposal.clone(),
374            payload_commitment: output.payload_commitment,
375            builder_commitment: output.builder_commitment.clone(),
376            metadata: output.payload.metadata.clone(),
377            builder_fee: output.builder_fee.clone(),
378        }
379    }
380}
381
382impl<T: NodeType> From<BlockBuilderOutput<T>> for ConsensusInput<T> {
383    fn from(output: BlockBuilderOutput<T>) -> Self {
384        ConsensusInput::BlockBuilt {
385            view: output.view,
386            epoch: output.epoch,
387            payload: output.payload.payload,
388            metadata: output.payload.metadata,
389        }
390    }
391}