1use std::{
2 collections::{BTreeMap, HashMap},
3 sync::Arc,
4};
5
6use committable::{Commitment, Committable};
7use hotshot::traits::{BlockPayload, ValidatedState};
8use hotshot_types::{
9 data::{BlockNumber, EpochNumber, Leaf2, VidCommitment, ViewNumber},
10 message::UpgradeLock,
11 traits::{
12 block_contents::{BlockHeader, BuilderFee},
13 node_implementation::NodeType,
14 },
15 utils::BuilderCommitment,
16 vote::HasViewNumber,
17};
18use tokio::task::{AbortHandle, JoinSet};
19use tracing::{error, warn};
20
21use crate::{helpers::proposal_commitment, message::Proposal};
22
23pub struct UpdateLeaf<T: NodeType> {
24 pub view: ViewNumber,
25 pub leaf: Leaf2<T>,
26 pub state: Arc<T::ValidatedState>,
27 pub delta: Option<Delta<T>>,
28}
29
30#[derive(Clone, Debug, Eq, PartialEq)]
31pub struct StateRequest<T: NodeType> {
32 pub view: ViewNumber,
33 pub parent_view: ViewNumber,
34 pub epoch: EpochNumber,
35 pub block: BlockNumber,
36 pub proposal: Proposal<T>,
37 pub parent_commitment: Commitment<Leaf2<T>>,
38 pub payload_size: u32,
39}
40
41#[derive(Clone, Debug, Eq, PartialEq)]
42pub struct HeaderRequest<T: NodeType> {
43 pub view: ViewNumber,
44 pub epoch: EpochNumber,
45 pub parent_proposal: Proposal<T>,
46 pub payload_commitment: VidCommitment,
47 pub builder_commitment: BuilderCommitment,
48 pub metadata: <T::BlockPayload as BlockPayload<T>>::Metadata,
49 pub builder_fee: BuilderFee<T>,
50}
51
52#[derive(Clone, Debug, Eq, PartialEq)]
53pub struct StateResponse<T: NodeType> {
54 pub view: ViewNumber,
55 pub commitment: Commitment<Leaf2<T>>,
56 pub state: Arc<T::ValidatedState>,
57 pub delta: Option<Delta<T>>,
58}
59
60#[derive(Clone, Debug, Eq, PartialEq)]
61pub struct HeaderResponse<T: NodeType> {
62 pub view: ViewNumber,
63 pub epoch: EpochNumber,
64 pub parent_proposal: Proposal<T>,
65}
66
67#[derive(Clone, Debug, Eq, PartialEq)]
68#[allow(clippy::large_enum_variant)]
69pub enum StateManagerOutput<T: NodeType> {
70 State {
71 response: StateResponse<T>,
72 validated: bool,
73 },
74 Header {
75 response: HeaderResponse<T>,
76 header: Option<T::BlockHeader>,
77 },
78}
79
80type Delta<T> = Arc<<<T as NodeType>::ValidatedState as ValidatedState<T>>::Delta>;
81
82#[derive(Clone)]
83pub struct StateEntry<T: NodeType> {
84 pub state: Arc<T::ValidatedState>,
85 pub delta: Option<Delta<T>>,
86 pub leaf: Leaf2<T>,
87}
88
89pub struct StateManager<T: NodeType> {
90 instance: Arc<T::InstanceState>,
91 validated_states: BTreeMap<Commitment<Leaf2<T>>, StateEntry<T>>,
92 state_requests: HashMap<Commitment<Leaf2<T>>, (AbortHandle, ViewNumber)>,
93 header_requests: HashMap<(ViewNumber, Commitment<Leaf2<T>>), AbortHandle>,
94 pending_requests: HashMap<Commitment<Leaf2<T>>, Vec<Pending<T>>>,
95 upgrade_lock: UpgradeLock<T>,
96 tasks: JoinSet<Completed<T>>,
97}
98
99enum Pending<T: NodeType> {
100 State(StateRequest<T>),
101 Header(HeaderRequest<T>),
102}
103
104enum Completed<T: NodeType> {
105 State {
106 response: StateResponse<T>,
107 leaf: Option<Leaf2<T>>,
108 },
109 Header {
110 response: HeaderResponse<T>,
111 header: Option<T::BlockHeader>,
112 },
113}
114
115impl<T: NodeType> StateManager<T> {
116 pub fn new(instance: Arc<T::InstanceState>, upgrade_lock: UpgradeLock<T>) -> Self {
117 Self {
118 instance,
119 validated_states: BTreeMap::new(),
120 state_requests: HashMap::new(),
121 header_requests: HashMap::new(),
122 pending_requests: HashMap::new(),
123 upgrade_lock,
124 tasks: JoinSet::new(),
125 }
126 }
127
128 pub fn get_state(&self, view: ViewNumber) -> Option<&StateEntry<T>> {
130 self.validated_states
131 .iter()
132 .find(|(_, entry)| entry.leaf.view_number() == view)
133 .map(|(_, entry)| entry)
134 }
135
136 pub fn get_leaf(&self, view: ViewNumber) -> Option<Leaf2<T>> {
138 self.validated_states
139 .iter()
140 .find(|(_, entry)| entry.leaf.view_number() == view)
141 .map(|(_, entry)| entry.leaf.clone())
142 }
143
144 pub fn seed_state(&mut self, view: ViewNumber, state: Arc<T::ValidatedState>, leaf: Leaf2<T>) {
145 self.insert_state(view, state, None, leaf);
146 }
147
148 pub fn request_state(&mut self, request: StateRequest<T>) {
149 let commitment = proposal_commitment(&request.proposal);
150 if self.state_requests.contains_key(&commitment) {
151 return;
152 }
153
154 if self.state_requests.contains_key(&request.parent_commitment) {
155 self.pending_requests
156 .entry(request.parent_commitment)
157 .or_default()
158 .push(Pending::State(request));
159 return;
160 }
161
162 let Some(parent_entry) = self
163 .validated_states
164 .get(&request.parent_commitment)
165 .cloned()
166 else {
167 self.insert_empty_state(request.proposal);
168 self.start_pending(commitment);
169 return;
170 };
171
172 let instance = self.instance.clone();
173 let header = request.proposal.block_header.clone();
174 let view = request.view;
175 let payload_size = request.payload_size;
176
177 let Ok(upgrade_lock) = self.upgrade_lock.version(view) else {
178 error!(%view, "unsupported version");
179 return;
180 };
181
182 let handle = self.tasks.spawn(async move {
183 let result = parent_entry
184 .state
185 .validate_and_apply_header(
186 &instance,
187 &parent_entry.leaf,
188 &header,
189 payload_size,
190 upgrade_lock,
191 *view,
192 )
193 .await
194 .map(|(state, delta)| StateResponse {
195 view,
196 commitment,
197 state: Arc::new(state),
198 delta: Some(Arc::new(delta)),
199 });
200 match result {
201 Ok(response) => Completed::State {
202 response,
203 leaf: Some(request.proposal.into()),
204 },
205 Err(err) => {
206 warn!(%err, "state validation failed");
207 Completed::State {
208 response: StateResponse {
209 view,
210 commitment,
211 state: Arc::new(T::ValidatedState::from_header(&header)),
212 delta: None,
213 },
214 leaf: None,
215 }
216 },
217 }
218 });
219
220 self.state_requests.insert(commitment, (handle, view));
221 }
222
223 pub fn request_header(&mut self, request: HeaderRequest<T>) {
224 let parent_commitment = proposal_commitment(&request.parent_proposal);
225 if self
226 .header_requests
227 .contains_key(&(request.view, parent_commitment))
228 {
229 return;
230 }
231
232 if self.state_requests.contains_key(&parent_commitment) {
233 self.pending_requests
234 .entry(parent_commitment)
235 .or_default()
236 .push(Pending::Header(request));
237 return;
238 }
239
240 let Some(parent_entry) = self.validated_states.get(&parent_commitment).cloned() else {
241 self.pending_requests
245 .entry(parent_commitment)
246 .or_default()
247 .push(Pending::Header(request));
248 return;
249 };
250
251 let instance = self.instance.clone();
252 let view = request.view;
253 let epoch = request.epoch;
254 let parent_proposal = request.parent_proposal;
255
256 let Ok(version) = self.upgrade_lock.version(view) else {
257 error!(%view, "unsupported version");
258 return;
259 };
260
261 let handle = self.tasks.spawn(async move {
262 let result = T::BlockHeader::new(
263 &parent_entry.state,
264 &instance,
265 &parent_entry.leaf,
266 request.payload_commitment,
267 request.builder_commitment,
268 request.metadata,
269 request.builder_fee,
270 version,
271 *view,
272 )
273 .await;
274 match result {
275 Ok(header) => Completed::Header {
276 response: HeaderResponse {
277 view,
278 epoch,
279 parent_proposal,
280 },
281 header: Some(header),
282 },
283 Err(err) => {
284 warn!(%err, "header creation failed");
285 Completed::Header {
286 response: HeaderResponse {
287 view,
288 epoch,
289 parent_proposal,
290 },
291 header: None,
292 }
293 },
294 }
295 });
296
297 self.header_requests
298 .insert((view, parent_commitment), handle);
299 }
300
301 pub fn update_state(&mut self, update: UpdateLeaf<T>) {
303 let UpdateLeaf {
304 view,
305 leaf,
306 state,
307 delta,
308 } = update;
309 let commitment = leaf.commit();
310 self.insert_state(view, state, delta, leaf);
311 if let Some((task, _)) = self.state_requests.remove(&commitment) {
312 task.abort();
313 }
314 self.start_pending(commitment);
315 }
316
317 pub async fn next(&mut self) -> Option<StateManagerOutput<T>> {
319 loop {
320 match self.tasks.join_next().await {
321 Some(Ok(result)) => match result {
322 Completed::State {
323 response,
324 leaf: leaf2,
325 } => {
326 if self.state_requests.remove(&response.commitment).is_none() {
327 continue;
328 }
329 if let Some(leaf) = leaf2 {
330 self.insert_state(
331 response.view,
332 response.state.clone(),
333 response.delta.clone(),
334 leaf,
335 );
336 self.start_pending(response.commitment);
337 return Some(StateManagerOutput::State {
338 response,
339 validated: true,
340 });
341 } else {
342 self.pending_requests.remove(&response.commitment);
343 return Some(StateManagerOutput::State {
344 response,
345 validated: false,
346 });
347 }
348 },
349 Completed::Header { response, header } => {
350 let key = (
351 response.view,
352 proposal_commitment(&response.parent_proposal),
353 );
354 if self.header_requests.remove(&key).is_none() {
355 continue;
356 }
357 return Some(StateManagerOutput::Header { response, header });
358 },
359 },
360 Some(Err(err)) => {
361 if err.is_panic() {
362 error!(%err, "task panicked");
363 }
364 },
365 None => return None,
366 }
367 }
368 }
369
370 pub fn gc(&mut self, view_number: ViewNumber) {
371 self.validated_states
372 .retain(|_, entry| entry.leaf.view_number() >= view_number);
373 for (task, view) in self.state_requests.values() {
374 if *view < view_number {
375 task.abort();
376 }
377 }
378 self.state_requests
379 .retain(|_, (_, view)| *view >= view_number);
380 }
381
382 fn start_pending(&mut self, finished_commitment: Commitment<Leaf2<T>>) {
383 let Some(pending) = self.pending_requests.remove(&finished_commitment) else {
384 return;
385 };
386 for p in pending {
387 match p {
388 Pending::State(r) => self.request_state(r),
389 Pending::Header(r) => self.request_header(r),
390 }
391 }
392 }
393
394 fn insert_state(
401 &mut self,
402 view: ViewNumber,
403 state: Arc<T::ValidatedState>,
404 delta: Option<Delta<T>>,
405 leaf: Leaf2<T>,
406 ) {
407 if let Some(existing) = self.validated_states.get(&leaf.commit())
408 && existing.delta.is_some()
409 && delta.is_none()
410 {
411 warn!(
412 ?view,
413 "Skipping state update to not override a state with a delta"
414 );
415 return;
416 }
417 self.validated_states
418 .insert(leaf.commit(), StateEntry { state, delta, leaf });
419 }
420
421 fn insert_empty_state(&mut self, proposal: Proposal<T>) {
422 let state = T::ValidatedState::from_header(&proposal.block_header);
423 self.insert_state(
424 proposal.view_number(),
425 Arc::new(state),
426 None,
427 proposal.into(),
428 );
429 }
430
431 #[cfg(test)]
432 pub(crate) fn validated_contains_view(&self, v: ViewNumber) -> bool {
433 self.validated_states
434 .iter()
435 .any(|(_, entry)| entry.leaf.view_number() == v)
436 }
437
438 #[cfg(test)]
439 pub(crate) fn pending_contains_commitment(&self, c: &Commitment<Leaf2<T>>) -> bool {
440 self.pending_requests.contains_key(c)
441 }
442}