1use core::fmt::Debug;
2use std::{cmp::max, sync::Arc, time::Duration};
3
4use anyhow::{Context, bail, ensure};
5use async_lock::Mutex;
6use either::Either;
7use espresso_types::{
8 BlockMerkleTree, EpochRewardsCalculator, FeeAccount, FeeMerkleTree, Leaf2, ValidatedState,
9 traits::StateCatchup,
10 v0_3::{ChainConfig, RewardAccountV1, RewardMerkleTreeV1},
11 v0_4::Delta,
12};
13use futures::{StreamExt, future::Future};
14use hotshot::traits::ValidatedState as HotShotState;
15use hotshot_query_service::{
16 availability::{AvailabilityDataSource, LeafQueryData},
17 data_source::{Transaction, VersionedDataSource, storage::pruning::PrunedHeightDataSource},
18 merklized_state::{MerklizedStateHeightPersistence, UpdateStateData},
19 status::StatusDataSource,
20 types::HeightIndexed,
21};
22use hotshot_types::utils::is_last_block;
23use jf_merkle_tree_compat::{
24 LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme,
25};
26use tokio::time::sleep;
27use vbs::version::Version;
28use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION, EPOCH_VERSION};
29
30use crate::{
31 NodeState, SeqTypes,
32 api::{RewardMerkleTreeDataSource, RewardMerkleTreeV2Data},
33 catchup::{CatchupStorage, SqlStateCatchup},
34 persistence::ChainConfigPersistence,
35};
36
37pub(crate) async fn compute_state_update(
38 parent_state: &ValidatedState,
39 instance: &NodeState,
40 peers: &impl StateCatchup,
41 parent_leaf: &Leaf2,
42 proposed_leaf: &Leaf2,
43) -> anyhow::Result<(ValidatedState, Delta)> {
44 let header = proposed_leaf.block_header();
45
46 let mut parent_state = parent_state.clone();
47
48 if proposed_leaf.block_header().version() > parent_leaf.block_header().version() {
56 parent_state.chain_config = proposed_leaf.block_header().chain_config()
57 }
58
59 let (state, delta, total_rewards_distributed) = parent_state
60 .apply_header(
61 instance,
62 peers,
63 parent_leaf,
64 header,
65 header.version(),
66 proposed_leaf.view_number(),
67 )
68 .await?;
69
70 ensure!(
72 state.chain_config.commit() == header.chain_config().commit(),
73 "internal error! in-memory chain config {:?} does not match header {:?}",
74 state.chain_config,
75 header.chain_config(),
76 );
77 ensure!(
78 state.block_merkle_tree.commitment() == header.block_merkle_tree_root(),
79 "internal error! in-memory block tree {} does not match header {}",
80 state.block_merkle_tree.commitment(),
81 header.block_merkle_tree_root()
82 );
83 ensure!(
84 state.fee_merkle_tree.commitment() == header.fee_merkle_tree_root(),
85 "internal error! in-memory fee tree {} does not match header {}",
86 state.fee_merkle_tree.commitment(),
87 header.fee_merkle_tree_root()
88 );
89
90 match header.reward_merkle_tree_root() {
91 Either::Left(v1_root) => {
92 ensure!(
93 state.reward_merkle_tree_v1.commitment() == v1_root,
94 "internal error! in-memory v1 reward tree {} does not match header {}",
95 state.reward_merkle_tree_v1.commitment(),
96 v1_root
97 )
98 },
99 Either::Right(v2_root) => {
100 ensure!(
101 state.reward_merkle_tree_v2.commitment() == v2_root,
102 "internal error! in-memory v2 reward tree {} does not match header {}",
103 state.reward_merkle_tree_v2.commitment(),
104 v2_root
105 )
106 },
107 }
108
109 if header.version() >= DRB_AND_HEADER_UPGRADE_VERSION {
110 let Some(actual_total) = total_rewards_distributed else {
111 bail!(
112 "internal error! total_rewards_distributed is None for version {:?}",
113 header.version()
114 );
115 };
116
117 let Some(proposed_total) = header.total_reward_distributed() else {
118 bail!(
119 "internal error! proposed header.total_reward_distributed() is None for version \
120 {:?}",
121 header.version()
122 );
123 };
124
125 ensure!(
126 proposed_total == actual_total,
127 "Total rewards mismatch: proposed header has {proposed_total} but actual total is \
128 {actual_total}",
129 );
130 }
131
132 Ok((state, delta))
133}
134
135async fn store_state_update(
136 tx: &mut impl SequencerStateUpdate,
137 block_number: u64,
138 version: Version,
139 state: &ValidatedState,
140 delta: &Delta,
141) -> anyhow::Result<()> {
142 let ValidatedState {
143 fee_merkle_tree,
144 block_merkle_tree,
145 reward_merkle_tree_v1,
146 ..
147 } = state;
148 let Delta {
149 fees_delta,
150 rewards_delta,
151 } = delta;
152
153 let fee_proofs: Vec<_> = fees_delta
155 .iter()
156 .map(|delta| {
157 let proof = match fee_merkle_tree.universal_lookup(*delta) {
158 LookupResult::Ok(_, proof) => proof,
159 LookupResult::NotFound(proof) => proof,
160 LookupResult::NotInMemory => bail!("missing merkle path for fee account {delta}"),
161 };
162 let path = FeeAccount::to_traversal_path(delta, fee_merkle_tree.height());
163 Ok((proof, path))
164 })
165 .collect::<anyhow::Result<Vec<_>>>()?;
166
167 tracing::debug!(count = fee_proofs.len(), "inserting fee accounts in batch");
168 UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes_batch(
169 tx,
170 fee_proofs,
171 block_number,
172 )
173 .await
174 .context("failed to store fee merkle nodes")?;
175
176 let (_, proof) = block_merkle_tree
178 .lookup(block_number - 1)
179 .expect_ok()
180 .context("getting blocks frontier")?;
181 let path = <u64 as ToTraversalPath<{ BlockMerkleTree::ARITY }>>::to_traversal_path(
182 &(block_number - 1),
183 block_merkle_tree.height(),
184 );
185
186 {
187 tracing::debug!("inserting blocks frontier");
188 UpdateStateData::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::insert_merkle_nodes(
189 tx,
190 proof,
191 path,
192 block_number,
193 )
194 .await
195 .context("failed to store block merkle nodes")?;
196 }
197
198 if version <= EPOCH_VERSION {
199 let reward_proofs: Vec<_> = rewards_delta
201 .iter()
202 .map(|delta| {
203 let key = RewardAccountV1::from(*delta);
204 let proof = match reward_merkle_tree_v1.universal_lookup(key) {
205 LookupResult::Ok(_, proof) => proof,
206 LookupResult::NotFound(proof) => proof,
207 LookupResult::NotInMemory => {
208 bail!("missing merkle path for reward account {delta}")
209 },
210 };
211 let path = <RewardAccountV1 as ToTraversalPath<
212 { RewardMerkleTreeV1::ARITY },
213 >>::to_traversal_path(
214 &key, reward_merkle_tree_v1.height()
215 );
216 Ok((proof, path))
217 })
218 .collect::<anyhow::Result<Vec<_>>>()?;
219
220 tracing::debug!(
221 count = reward_proofs.len(),
222 "inserting v1 reward accounts in batch"
223 );
224 UpdateStateData::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::insert_merkle_nodes_batch(
225 tx,
226 reward_proofs,
227 block_number,
228 )
229 .await
230 .context("failed to store reward merkle nodes")?;
231 }
232
233 Ok(())
234}
235
236#[tracing::instrument(
237 skip_all,
238 fields(
239 node_id = instance.node_id,
240 view = ?parent_leaf.leaf().view_number(),
241 height = parent_leaf.height(),
242 ),
243)]
244async fn update_state_storage<T>(
245 parent_state: &ValidatedState,
246 storage: &Arc<T>,
247 instance: &NodeState,
248 peers: &impl StateCatchup,
249 parent_leaf: &LeafQueryData<SeqTypes>,
250 proposed_leaf: &LeafQueryData<SeqTypes>,
251) -> anyhow::Result<ValidatedState>
252where
253 T: SequencerStateDataSource,
254 for<'a> T::Transaction<'a>: SequencerStateUpdate,
255{
256 let parent_chain_config = parent_state.chain_config;
257 let block_number = proposed_leaf.height();
258 let version = proposed_leaf.header().version();
259
260 let (state, delta) = compute_state_update(
261 parent_state,
262 instance,
263 peers,
264 &parent_leaf.leaf().clone(),
265 &proposed_leaf.leaf().clone(),
266 )
267 .await
268 .context("computing state update")?;
269
270 let has_changed_accounts = version > EPOCH_VERSION && !delta.rewards_delta.is_empty();
271 let is_epoch_boundary = version >= EPOCH_REWARD_VERSION
286 && is_last_block(
287 block_number,
288 instance
289 .epoch_height
290 .expect("epoch_height should be set for version > V3"),
291 );
292
293 if has_changed_accounts || is_epoch_boundary {
294 storage
295 .save_and_gc_reward_tree_v2(
296 instance,
297 block_number,
298 version,
299 &state.reward_merkle_tree_v2,
300 )
301 .await
302 .context("failed to save and gc reward merkle tree v2")?;
303 }
304
305 storage
306 .persist_reward_proofs(instance, block_number, version)
307 .await
308 .context("failed to persist reward proofs")?;
309
310 tracing::debug!("storing state update");
311 let mut tx = storage
312 .write()
313 .await
314 .context("opening transaction for state update")?;
315
316 store_state_update(&mut tx, block_number, version, &state, &delta).await?;
317
318 tx.commit().await?;
319
320 let mut tx = storage
321 .write()
322 .await
323 .context("opening transaction for state update")?;
324
325 if parent_chain_config != state.chain_config {
326 let cf = state
327 .chain_config
328 .resolve()
329 .context("failed to resolve to chain config")?;
330
331 tx.insert_chain_config(cf).await?;
332 }
333
334 tracing::debug!(block_number, "updating state height");
335 UpdateStateData::<SeqTypes, _, { BlockMerkleTree::ARITY }>::set_last_state_height(
336 &mut tx,
337 block_number as usize,
338 )
339 .await
340 .context("setting state height")?;
341
342 tx.commit().await?;
343
344 Ok(state)
345}
346
347async fn store_genesis_state<S>(
348 storage: &S,
349 chain_config: ChainConfig,
350 state: &ValidatedState,
351) -> anyhow::Result<()>
352where
353 S: SequencerStateDataSource,
354 for<'a> S::Transaction<'a>: SequencerStateUpdate,
355{
356 ensure!(
357 state.block_merkle_tree.num_leaves() == 0,
358 "genesis state with non-empty block tree is unsupported"
359 );
360
361 let mut tx = storage
362 .write()
363 .await
364 .context("starting transaction for genesis state")?;
365
366 for (account, _) in state.fee_merkle_tree.iter() {
368 let proof = match state.fee_merkle_tree.universal_lookup(account) {
369 LookupResult::Ok(_, proof) => proof,
370 LookupResult::NotFound(proof) => proof,
371 LookupResult::NotInMemory => bail!("missing merkle path for fee account {account}"),
372 };
373 let path: Vec<usize> =
374 <FeeAccount as ToTraversalPath<{ FeeMerkleTree::ARITY }>>::to_traversal_path(
375 account,
376 state.fee_merkle_tree.height(),
377 );
378
379 UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes(
380 &mut tx, proof, path, 0,
381 )
382 .await
383 .context("failed to store fee merkle nodes")?;
384 }
385
386 tx.insert_chain_config(chain_config).await?;
387
388 tx.commit().await?;
389
390 let tree_data: RewardMerkleTreeV2Data = (&state.reward_merkle_tree_v2)
392 .try_into()
393 .context("serializing genesis reward tree")?;
394 let tree_bytes = bincode::serialize(&tree_data).context("serializing genesis reward tree")?;
395 storage
396 .persist_tree(0, tree_bytes)
397 .await
398 .context("storing genesis reward merkle tree")?;
399
400 Ok(())
401}
402
403#[tracing::instrument(skip_all)]
404pub(crate) async fn update_state_storage_loop<T>(
405 storage: Arc<T>,
406 instance: impl Future<Output = NodeState>,
407) -> anyhow::Result<()>
408where
409 T: SequencerStateDataSource,
410 for<'a> T::Transaction<'a>: SequencerStateUpdate,
411{
412 let mut instance = instance.await;
413 instance.epoch_rewards_calculator = Arc::new(Mutex::new(EpochRewardsCalculator::new()));
416 let peers = SqlStateCatchup::new(storage.clone(), Default::default());
417
418 let (last_height, parent_leaf, mut leaves) = {
420 let last_height = storage.get_last_state_height().await?;
421 let pruned_height = storage.load_pruned_height().await?;
422
423 let height = match pruned_height {
424 Some(pruned_height) => max(last_height, pruned_height as usize + 1),
429 None => last_height,
431 };
432
433 let height =
435 if let Ok(env_height) = std::env::var("ESPRESSO_NODE_STATE_STORAGE_INITIAL_HEIGHT") {
436 match env_height.parse::<usize>() {
437 Ok(override_height) => {
438 tracing::error!(
439 node_id = instance.node_id,
440 calculated_height = height,
441 override_height,
442 "overriding initial state storage height from environment variable"
443 );
444 override_height
445 },
446 Err(e) => {
447 tracing::error!(
448 "failed to parse ESPRESSO_NODE_STATE_STORAGE_INITIAL_HEIGHT: {e}, \
449 using calculated height {height}"
450 );
451 height
452 },
453 }
454 } else {
455 height
456 };
457
458 let current_height = storage.block_height().await?;
459 tracing::info!(
460 node_id = instance.node_id,
461 last_height,
462 height,
463 current_height,
464 "updating state storage"
465 );
466
467 let parent_leaf = AvailabilityDataSource::get_leaf(&*storage, height).await;
468 let leaves = storage.subscribe_leaves(height + 1).await;
469 (last_height, parent_leaf, leaves)
470 };
471 let mut parent_leaf = parent_leaf.await;
474 let mut parent_state = ValidatedState::from_header(parent_leaf.header());
475
476 if last_height == 0 {
477 tracing::info!("storing genesis merklized state");
480 store_genesis_state(&*storage, instance.chain_config, &instance.genesis_state)
481 .await
482 .context("storing genesis state")?;
483 }
484
485 while let Some(leaf) = leaves.next().await {
486 loop {
487 tracing::debug!(
488 height = leaf.height(),
489 node_id = instance.node_id,
490 ?leaf,
491 "updating persistent merklized state"
492 );
493 match update_state_storage(
494 &parent_state,
495 &storage,
496 &instance,
497 &peers,
498 &parent_leaf,
499 &leaf,
500 )
501 .await
502 {
503 Ok(state) => {
504 parent_leaf = leaf;
505 parent_state = state;
506 break;
507 },
508 Err(err) => {
509 tracing::error!(height = leaf.height(), "failed to update state: {err:#}");
510 sleep(Duration::from_secs(1)).await;
512 },
513 }
514 }
515 }
516
517 Ok(())
518}
519
520pub(crate) trait SequencerStateDataSource:
521 'static
522 + Debug
523 + AvailabilityDataSource<SeqTypes>
524 + StatusDataSource
525 + VersionedDataSource
526 + CatchupStorage
527 + RewardMerkleTreeDataSource
528 + PrunedHeightDataSource
529 + MerklizedStateHeightPersistence
530{
531}
532
533impl<T> SequencerStateDataSource for T where
534 T: 'static
535 + Debug
536 + AvailabilityDataSource<SeqTypes>
537 + StatusDataSource
538 + VersionedDataSource
539 + CatchupStorage
540 + RewardMerkleTreeDataSource
541 + PrunedHeightDataSource
542 + MerklizedStateHeightPersistence
543{
544}
545
546pub(crate) trait SequencerStateUpdate:
547 Transaction
548 + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
549 + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
550 + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
551 + ChainConfigPersistence
552{
553}
554
555impl<T> SequencerStateUpdate for T where
556 T: Transaction
557 + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
558 + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
559 + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
560 + ChainConfigPersistence
561{
562}