Skip to main content

hotshot_new_protocol/
state.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    sync::Arc,
4};
5
6use committable::{Commitment, Committable};
7use hotshot::traits::{BlockPayload, ValidatedState};
8use hotshot_types::{
9    data::{BlockNumber, EpochNumber, Leaf2, VidCommitment, ViewNumber},
10    message::UpgradeLock,
11    traits::{
12        block_contents::{BlockHeader, BuilderFee},
13        node_implementation::NodeType,
14    },
15    utils::BuilderCommitment,
16    vote::HasViewNumber,
17};
18use tokio::task::{AbortHandle, JoinSet};
19use tracing::{error, warn};
20
21use crate::{helpers::proposal_commitment, message::Proposal};
22
23pub struct UpdateLeaf<T: NodeType> {
24    pub view: ViewNumber,
25    pub leaf: Leaf2<T>,
26    pub state: Arc<T::ValidatedState>,
27    pub delta: Option<Delta<T>>,
28}
29
30#[derive(Clone, Debug, Eq, PartialEq)]
31pub struct StateRequest<T: NodeType> {
32    pub view: ViewNumber,
33    pub parent_view: ViewNumber,
34    pub epoch: EpochNumber,
35    pub block: BlockNumber,
36    pub proposal: Proposal<T>,
37    pub parent_commitment: Commitment<Leaf2<T>>,
38    pub payload_size: u32,
39}
40
41#[derive(Clone, Debug, Eq, PartialEq)]
42pub struct HeaderRequest<T: NodeType> {
43    pub view: ViewNumber,
44    pub epoch: EpochNumber,
45    pub parent_proposal: Proposal<T>,
46    pub payload_commitment: VidCommitment,
47    pub builder_commitment: BuilderCommitment,
48    pub metadata: <T::BlockPayload as BlockPayload<T>>::Metadata,
49    pub builder_fee: BuilderFee<T>,
50}
51
52#[derive(Clone, Debug, Eq, PartialEq)]
53pub struct StateResponse<T: NodeType> {
54    pub view: ViewNumber,
55    pub commitment: Commitment<Leaf2<T>>,
56    pub state: Arc<T::ValidatedState>,
57    pub delta: Option<Delta<T>>,
58}
59
60#[derive(Clone, Debug, Eq, PartialEq)]
61pub struct HeaderResponse<T: NodeType> {
62    pub view: ViewNumber,
63    pub epoch: EpochNumber,
64    pub parent_proposal: Proposal<T>,
65}
66
67#[derive(Clone, Debug, Eq, PartialEq)]
68#[allow(clippy::large_enum_variant)]
69pub enum StateManagerOutput<T: NodeType> {
70    State {
71        response: StateResponse<T>,
72        validated: bool,
73    },
74    Header {
75        response: HeaderResponse<T>,
76        header: Option<T::BlockHeader>,
77    },
78}
79
80type Delta<T> = Arc<<<T as NodeType>::ValidatedState as ValidatedState<T>>::Delta>;
81
82#[derive(Clone)]
83pub struct StateEntry<T: NodeType> {
84    pub state: Arc<T::ValidatedState>,
85    pub delta: Option<Delta<T>>,
86    pub leaf: Leaf2<T>,
87}
88
89pub struct StateManager<T: NodeType> {
90    instance: Arc<T::InstanceState>,
91    validated_states: BTreeMap<Commitment<Leaf2<T>>, StateEntry<T>>,
92    state_requests: HashMap<Commitment<Leaf2<T>>, (AbortHandle, ViewNumber)>,
93    header_requests: HashMap<(ViewNumber, Commitment<Leaf2<T>>), AbortHandle>,
94    pending_requests: HashMap<Commitment<Leaf2<T>>, Vec<Pending<T>>>,
95    upgrade_lock: UpgradeLock<T>,
96    tasks: JoinSet<Completed<T>>,
97}
98
99enum Pending<T: NodeType> {
100    State(StateRequest<T>),
101    Header(HeaderRequest<T>),
102}
103
104enum Completed<T: NodeType> {
105    State {
106        response: StateResponse<T>,
107        leaf: Option<Leaf2<T>>,
108    },
109    Header {
110        response: HeaderResponse<T>,
111        header: Option<T::BlockHeader>,
112    },
113}
114
115impl<T: NodeType> StateManager<T> {
116    pub fn new(instance: Arc<T::InstanceState>, upgrade_lock: UpgradeLock<T>) -> Self {
117        Self {
118            instance,
119            validated_states: BTreeMap::new(),
120            state_requests: HashMap::new(),
121            header_requests: HashMap::new(),
122            pending_requests: HashMap::new(),
123            upgrade_lock,
124            tasks: JoinSet::new(),
125        }
126    }
127
128    /// Get the validated state for a given view.
129    pub fn get_state(&self, view: ViewNumber) -> Option<&StateEntry<T>> {
130        self.validated_states
131            .iter()
132            .find(|(_, entry)| entry.leaf.view_number() == view)
133            .map(|(_, entry)| entry)
134    }
135
136    /// Get the leaf for a given view
137    pub fn get_leaf(&self, view: ViewNumber) -> Option<Leaf2<T>> {
138        self.validated_states
139            .iter()
140            .find(|(_, entry)| entry.leaf.view_number() == view)
141            .map(|(_, entry)| entry.leaf.clone())
142    }
143
144    pub fn seed_state(&mut self, view: ViewNumber, state: Arc<T::ValidatedState>, leaf: Leaf2<T>) {
145        self.insert_state(view, state, None, leaf);
146    }
147
148    pub fn request_state(&mut self, request: StateRequest<T>) {
149        let commitment = proposal_commitment(&request.proposal);
150        if self.state_requests.contains_key(&commitment) {
151            return;
152        }
153
154        if self.state_requests.contains_key(&request.parent_commitment) {
155            self.pending_requests
156                .entry(request.parent_commitment)
157                .or_default()
158                .push(Pending::State(request));
159            return;
160        }
161
162        let Some(parent_entry) = self
163            .validated_states
164            .get(&request.parent_commitment)
165            .cloned()
166        else {
167            self.insert_empty_state(request.proposal);
168            self.start_pending(commitment);
169            return;
170        };
171
172        let instance = self.instance.clone();
173        let header = request.proposal.block_header.clone();
174        let view = request.view;
175        let payload_size = request.payload_size;
176
177        let Ok(upgrade_lock) = self.upgrade_lock.version(view) else {
178            error!(%view, "unsupported version");
179            return;
180        };
181
182        let handle = self.tasks.spawn(async move {
183            let result = parent_entry
184                .state
185                .validate_and_apply_header(
186                    &instance,
187                    &parent_entry.leaf,
188                    &header,
189                    payload_size,
190                    upgrade_lock,
191                    *view,
192                )
193                .await
194                .map(|(state, delta)| StateResponse {
195                    view,
196                    commitment,
197                    state: Arc::new(state),
198                    delta: Some(Arc::new(delta)),
199                });
200            match result {
201                Ok(response) => Completed::State {
202                    response,
203                    leaf: Some(request.proposal.into()),
204                },
205                Err(err) => {
206                    warn!(%err, "state validation failed");
207                    Completed::State {
208                        response: StateResponse {
209                            view,
210                            commitment,
211                            state: Arc::new(T::ValidatedState::from_header(&header)),
212                            delta: None,
213                        },
214                        leaf: None,
215                    }
216                },
217            }
218        });
219
220        self.state_requests.insert(commitment, (handle, view));
221    }
222
223    pub fn request_header(&mut self, request: HeaderRequest<T>) {
224        let parent_commitment = proposal_commitment(&request.parent_proposal);
225        if self
226            .header_requests
227            .contains_key(&(request.view, parent_commitment))
228        {
229            return;
230        }
231
232        if self.state_requests.contains_key(&parent_commitment) {
233            self.pending_requests
234                .entry(parent_commitment)
235                .or_default()
236                .push(Pending::Header(request));
237            return;
238        }
239
240        let Some(parent_entry) = self.validated_states.get(&parent_commitment).cloned() else {
241            // Parent state not available yet (e.g. its proposal is still
242            // being validated).  Queue the request so it is retried once
243            // the state for the parent view is inserted.
244            self.pending_requests
245                .entry(parent_commitment)
246                .or_default()
247                .push(Pending::Header(request));
248            return;
249        };
250
251        let instance = self.instance.clone();
252        let view = request.view;
253        let epoch = request.epoch;
254        let parent_proposal = request.parent_proposal;
255
256        let Ok(version) = self.upgrade_lock.version(view) else {
257            error!(%view, "unsupported version");
258            return;
259        };
260
261        let handle = self.tasks.spawn(async move {
262            let result = T::BlockHeader::new(
263                &parent_entry.state,
264                &instance,
265                &parent_entry.leaf,
266                request.payload_commitment,
267                request.builder_commitment,
268                request.metadata,
269                request.builder_fee,
270                version,
271                *view,
272            )
273            .await;
274            match result {
275                Ok(header) => Completed::Header {
276                    response: HeaderResponse {
277                        view,
278                        epoch,
279                        parent_proposal,
280                    },
281                    header: Some(header),
282                },
283                Err(err) => {
284                    warn!(%err, "header creation failed");
285                    Completed::Header {
286                        response: HeaderResponse {
287                            view,
288                            epoch,
289                            parent_proposal,
290                        },
291                        header: None,
292                    }
293                },
294            }
295        });
296
297        self.header_requests
298            .insert((view, parent_commitment), handle);
299    }
300
301    /// Provide an externally-obtained validated state.
302    pub fn update_state(&mut self, update: UpdateLeaf<T>) {
303        let UpdateLeaf {
304            view,
305            leaf,
306            state,
307            delta,
308        } = update;
309        let commitment = leaf.commit();
310        self.insert_state(view, state, delta, leaf);
311        if let Some((task, _)) = self.state_requests.remove(&commitment) {
312            task.abort();
313        }
314        self.start_pending(commitment);
315    }
316
317    /// Get the next output.
318    pub async fn next(&mut self) -> Option<StateManagerOutput<T>> {
319        loop {
320            match self.tasks.join_next().await {
321                Some(Ok(result)) => match result {
322                    Completed::State {
323                        response,
324                        leaf: leaf2,
325                    } => {
326                        if self.state_requests.remove(&response.commitment).is_none() {
327                            continue;
328                        }
329                        if let Some(leaf) = leaf2 {
330                            self.insert_state(
331                                response.view,
332                                response.state.clone(),
333                                response.delta.clone(),
334                                leaf,
335                            );
336                            self.start_pending(response.commitment);
337                            return Some(StateManagerOutput::State {
338                                response,
339                                validated: true,
340                            });
341                        } else {
342                            self.pending_requests.remove(&response.commitment);
343                            return Some(StateManagerOutput::State {
344                                response,
345                                validated: false,
346                            });
347                        }
348                    },
349                    Completed::Header { response, header } => {
350                        let key = (
351                            response.view,
352                            proposal_commitment(&response.parent_proposal),
353                        );
354                        if self.header_requests.remove(&key).is_none() {
355                            continue;
356                        }
357                        return Some(StateManagerOutput::Header { response, header });
358                    },
359                },
360                Some(Err(err)) => {
361                    if err.is_panic() {
362                        error!(%err, "task panicked");
363                    }
364                },
365                None => return None,
366            }
367        }
368    }
369
370    pub fn gc(&mut self, view_number: ViewNumber) {
371        self.validated_states
372            .retain(|_, entry| entry.leaf.view_number() >= view_number);
373        for (task, view) in self.state_requests.values() {
374            if *view < view_number {
375                task.abort();
376            }
377        }
378        self.state_requests
379            .retain(|_, (_, view)| *view >= view_number);
380    }
381
382    fn start_pending(&mut self, finished_commitment: Commitment<Leaf2<T>>) {
383        let Some(pending) = self.pending_requests.remove(&finished_commitment) else {
384            return;
385        };
386        for p in pending {
387            match p {
388                Pending::State(r) => self.request_state(r),
389                Pending::Header(r) => self.request_header(r),
390            }
391        }
392    }
393
394    /// Insert a state into the validated states map.
395    ///
396    /// States created via `from_header`
397    /// have no delta. States produced by `validate_and_apply_header` carry a delta representing
398    /// the state transition. This method prevents a `from_header` state from overwriting a
399    /// fully validated state that already has a delta.
400    fn insert_state(
401        &mut self,
402        view: ViewNumber,
403        state: Arc<T::ValidatedState>,
404        delta: Option<Delta<T>>,
405        leaf: Leaf2<T>,
406    ) {
407        if let Some(existing) = self.validated_states.get(&leaf.commit())
408            && existing.delta.is_some()
409            && delta.is_none()
410        {
411            warn!(
412                ?view,
413                "Skipping state update to not override a state with a delta"
414            );
415            return;
416        }
417        self.validated_states
418            .insert(leaf.commit(), StateEntry { state, delta, leaf });
419    }
420
421    fn insert_empty_state(&mut self, proposal: Proposal<T>) {
422        let state = T::ValidatedState::from_header(&proposal.block_header);
423        self.insert_state(
424            proposal.view_number(),
425            Arc::new(state),
426            None,
427            proposal.into(),
428        );
429    }
430
431    #[cfg(test)]
432    pub(crate) fn validated_contains_view(&self, v: ViewNumber) -> bool {
433        self.validated_states
434            .iter()
435            .any(|(_, entry)| entry.leaf.view_number() == v)
436    }
437
438    #[cfg(test)]
439    pub(crate) fn pending_contains_commitment(&self, c: &Commitment<Leaf2<T>>) -> bool {
440        self.pending_requests.contains_key(c)
441    }
442}