1use core::fmt::Debug;
2use std::{cmp::max, sync::Arc, time::Duration};
3
4use anyhow::{Context, bail, ensure};
5use async_lock::Mutex;
6use either::Either;
7use espresso_types::{
8 BlockMerkleTree, EpochRewardsCalculator, FeeAccount, FeeMerkleTree, Leaf2, ValidatedState,
9 traits::StateCatchup,
10 v0_3::{ChainConfig, RewardAccountV1, RewardMerkleTreeV1},
11 v0_4::Delta,
12};
13use futures::{StreamExt, future::Future};
14use hotshot::traits::ValidatedState as HotShotState;
15use hotshot_query_service::{
16 availability::{AvailabilityDataSource, LeafQueryData},
17 data_source::{Transaction, VersionedDataSource, storage::pruning::PrunedHeightDataSource},
18 merklized_state::{MerklizedStateHeightPersistence, UpdateStateData},
19 status::StatusDataSource,
20 types::HeightIndexed,
21};
22use jf_merkle_tree_compat::{
23 LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme,
24};
25use tokio::time::sleep;
26use vbs::version::Version;
27use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_VERSION};
28
29use crate::{
30 NodeState, SeqTypes,
31 api::RewardMerkleTreeDataSource,
32 catchup::{CatchupStorage, SqlStateCatchup},
33 persistence::ChainConfigPersistence,
34};
35
36pub(crate) async fn compute_state_update(
37 parent_state: &ValidatedState,
38 instance: &NodeState,
39 peers: &impl StateCatchup,
40 parent_leaf: &Leaf2,
41 proposed_leaf: &Leaf2,
42) -> anyhow::Result<(ValidatedState, Delta)> {
43 let header = proposed_leaf.block_header();
44
45 let mut parent_state = parent_state.clone();
46
47 if proposed_leaf.block_header().version() > parent_leaf.block_header().version() {
55 parent_state.chain_config = proposed_leaf.block_header().chain_config()
56 }
57
58 let (state, delta, total_rewards_distributed) = parent_state
59 .apply_header(
60 instance,
61 peers,
62 parent_leaf,
63 header,
64 header.version(),
65 proposed_leaf.view_number(),
66 )
67 .await?;
68
69 ensure!(
71 state.chain_config.commit() == header.chain_config().commit(),
72 "internal error! in-memory chain config {:?} does not match header {:?}",
73 state.chain_config,
74 header.chain_config(),
75 );
76 ensure!(
77 state.block_merkle_tree.commitment() == header.block_merkle_tree_root(),
78 "internal error! in-memory block tree {} does not match header {}",
79 state.block_merkle_tree.commitment(),
80 header.block_merkle_tree_root()
81 );
82 ensure!(
83 state.fee_merkle_tree.commitment() == header.fee_merkle_tree_root(),
84 "internal error! in-memory fee tree {} does not match header {}",
85 state.fee_merkle_tree.commitment(),
86 header.fee_merkle_tree_root()
87 );
88
89 match header.reward_merkle_tree_root() {
90 Either::Left(v1_root) => {
91 ensure!(
92 state.reward_merkle_tree_v1.commitment() == v1_root,
93 "internal error! in-memory v1 reward tree {} does not match header {}",
94 state.reward_merkle_tree_v1.commitment(),
95 v1_root
96 )
97 },
98 Either::Right(v2_root) => {
99 ensure!(
100 state.reward_merkle_tree_v2.commitment() == v2_root,
101 "internal error! in-memory v2 reward tree {} does not match header {}",
102 state.reward_merkle_tree_v2.commitment(),
103 v2_root
104 )
105 },
106 }
107
108 if header.version() >= DRB_AND_HEADER_UPGRADE_VERSION {
109 let Some(actual_total) = total_rewards_distributed else {
110 bail!(
111 "internal error! total_rewards_distributed is None for version {:?}",
112 header.version()
113 );
114 };
115
116 let Some(proposed_total) = header.total_reward_distributed() else {
117 bail!(
118 "internal error! proposed header.total_reward_distributed() is None for version \
119 {:?}",
120 header.version()
121 );
122 };
123
124 ensure!(
125 proposed_total == actual_total,
126 "Total rewards mismatch: proposed header has {proposed_total} but actual total is \
127 {actual_total}",
128 );
129 }
130
131 Ok((state, delta))
132}
133
134async fn store_state_update(
135 tx: &mut impl SequencerStateUpdate,
136 block_number: u64,
137 version: Version,
138 state: &ValidatedState,
139 delta: &Delta,
140) -> anyhow::Result<()> {
141 let ValidatedState {
142 fee_merkle_tree,
143 block_merkle_tree,
144 reward_merkle_tree_v1,
145 ..
146 } = state;
147 let Delta {
148 fees_delta,
149 rewards_delta,
150 } = delta;
151
152 let fee_proofs: Vec<_> = fees_delta
154 .iter()
155 .map(|delta| {
156 let proof = match fee_merkle_tree.universal_lookup(*delta) {
157 LookupResult::Ok(_, proof) => proof,
158 LookupResult::NotFound(proof) => proof,
159 LookupResult::NotInMemory => bail!("missing merkle path for fee account {delta}"),
160 };
161 let path = FeeAccount::to_traversal_path(delta, fee_merkle_tree.height());
162 Ok((proof, path))
163 })
164 .collect::<anyhow::Result<Vec<_>>>()?;
165
166 tracing::debug!(count = fee_proofs.len(), "inserting fee accounts in batch");
167 UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes_batch(
168 tx,
169 fee_proofs,
170 block_number,
171 )
172 .await
173 .context("failed to store fee merkle nodes")?;
174
175 let (_, proof) = block_merkle_tree
177 .lookup(block_number - 1)
178 .expect_ok()
179 .context("getting blocks frontier")?;
180 let path = <u64 as ToTraversalPath<{ BlockMerkleTree::ARITY }>>::to_traversal_path(
181 &(block_number - 1),
182 block_merkle_tree.height(),
183 );
184
185 {
186 tracing::debug!("inserting blocks frontier");
187 UpdateStateData::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::insert_merkle_nodes(
188 tx,
189 proof,
190 path,
191 block_number,
192 )
193 .await
194 .context("failed to store block merkle nodes")?;
195 }
196
197 if version <= EPOCH_VERSION {
198 let reward_proofs: Vec<_> = rewards_delta
200 .iter()
201 .map(|delta| {
202 let key = RewardAccountV1::from(*delta);
203 let proof = match reward_merkle_tree_v1.universal_lookup(key) {
204 LookupResult::Ok(_, proof) => proof,
205 LookupResult::NotFound(proof) => proof,
206 LookupResult::NotInMemory => {
207 bail!("missing merkle path for reward account {delta}")
208 },
209 };
210 let path = <RewardAccountV1 as ToTraversalPath<
211 { RewardMerkleTreeV1::ARITY },
212 >>::to_traversal_path(
213 &key, reward_merkle_tree_v1.height()
214 );
215 Ok((proof, path))
216 })
217 .collect::<anyhow::Result<Vec<_>>>()?;
218
219 tracing::debug!(
220 count = reward_proofs.len(),
221 "inserting v1 reward accounts in batch"
222 );
223 UpdateStateData::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::insert_merkle_nodes_batch(
224 tx,
225 reward_proofs,
226 block_number,
227 )
228 .await
229 .context("failed to store reward merkle nodes")?;
230 }
231
232 Ok(())
233}
234
235#[tracing::instrument(
236 skip_all,
237 fields(
238 node_id = instance.node_id,
239 view = ?parent_leaf.leaf().view_number(),
240 height = parent_leaf.height(),
241 ),
242)]
243async fn update_state_storage<T>(
244 parent_state: &ValidatedState,
245 storage: &Arc<T>,
246 instance: &NodeState,
247 peers: &impl StateCatchup,
248 parent_leaf: &LeafQueryData<SeqTypes>,
249 proposed_leaf: &LeafQueryData<SeqTypes>,
250) -> anyhow::Result<ValidatedState>
251where
252 T: SequencerStateDataSource,
253 for<'a> T::Transaction<'a>: SequencerStateUpdate,
254{
255 let parent_chain_config = parent_state.chain_config;
256 let block_number = proposed_leaf.height();
257 let version = proposed_leaf.header().version();
258
259 let (state, delta) = compute_state_update(
260 parent_state,
261 instance,
262 peers,
263 &parent_leaf.leaf().clone(),
264 &proposed_leaf.leaf().clone(),
265 )
266 .await
267 .context("computing state update")?;
268
269 if version > EPOCH_VERSION && !delta.rewards_delta.is_empty() {
270 storage
271 .save_and_gc_reward_tree_v2(
272 instance,
273 block_number,
274 version,
275 &state.reward_merkle_tree_v2,
276 )
277 .await
278 .context("failed to save and gc reward merkle tree v2")?;
279
280 storage
281 .persist_reward_proofs(instance, block_number, version)
282 .await
283 .context("failed to persist reward proofs")?;
284 }
285
286 tracing::debug!("storing state update");
287 let mut tx = storage
288 .write()
289 .await
290 .context("opening transaction for state update")?;
291
292 store_state_update(&mut tx, block_number, version, &state, &delta).await?;
293
294 tx.commit().await?;
295
296 let mut tx = storage
297 .write()
298 .await
299 .context("opening transaction for state update")?;
300
301 if parent_chain_config != state.chain_config {
302 let cf = state
303 .chain_config
304 .resolve()
305 .context("failed to resolve to chain config")?;
306
307 tx.insert_chain_config(cf).await?;
308 }
309
310 tracing::debug!(block_number, "updating state height");
311 UpdateStateData::<SeqTypes, _, { BlockMerkleTree::ARITY }>::set_last_state_height(
312 &mut tx,
313 block_number as usize,
314 )
315 .await
316 .context("setting state height")?;
317
318 tx.commit().await?;
319
320 Ok(state)
321}
322
323async fn store_genesis_state<T>(
324 mut tx: T,
325 chain_config: ChainConfig,
326 state: &ValidatedState,
327) -> anyhow::Result<()>
328where
329 T: SequencerStateUpdate,
330{
331 ensure!(
332 state.block_merkle_tree.num_leaves() == 0,
333 "genesis state with non-empty block tree is unsupported"
334 );
335
336 for (account, _) in state.fee_merkle_tree.iter() {
338 let proof = match state.fee_merkle_tree.universal_lookup(account) {
339 LookupResult::Ok(_, proof) => proof,
340 LookupResult::NotFound(proof) => proof,
341 LookupResult::NotInMemory => bail!("missing merkle path for fee account {account}"),
342 };
343 let path: Vec<usize> =
344 <FeeAccount as ToTraversalPath<{ FeeMerkleTree::ARITY }>>::to_traversal_path(
345 account,
346 state.fee_merkle_tree.height(),
347 );
348
349 UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes(
350 &mut tx, proof, path, 0,
351 )
352 .await
353 .context("failed to store fee merkle nodes")?;
354 }
355
356 tx.insert_chain_config(chain_config).await?;
357
358 tx.commit().await?;
359 Ok(())
360}
361
362#[tracing::instrument(skip_all)]
363pub(crate) async fn update_state_storage_loop<T>(
364 storage: Arc<T>,
365 instance: impl Future<Output = NodeState>,
366) -> anyhow::Result<()>
367where
368 T: SequencerStateDataSource,
369 for<'a> T::Transaction<'a>: SequencerStateUpdate,
370{
371 let mut instance = instance.await;
372 instance.epoch_rewards_calculator = Arc::new(Mutex::new(EpochRewardsCalculator::new()));
375 let peers = SqlStateCatchup::new(storage.clone(), Default::default());
376
377 let (last_height, parent_leaf, mut leaves) = {
379 let last_height = storage.get_last_state_height().await?;
380 let pruned_height = storage.load_pruned_height().await?;
381
382 let height = match pruned_height {
383 Some(pruned_height) => max(last_height, pruned_height as usize + 1),
388 None => last_height,
390 };
391
392 let current_height = storage.block_height().await?;
393 tracing::info!(
394 node_id = instance.node_id,
395 last_height,
396 current_height,
397 "updating state storage"
398 );
399
400 let parent_leaf = storage.get_leaf(height).await;
401 let leaves = storage.subscribe_leaves(height + 1).await;
402 (last_height, parent_leaf, leaves)
403 };
404 let mut parent_leaf = parent_leaf.await;
407 let mut parent_state = ValidatedState::from_header(parent_leaf.header());
408
409 if last_height == 0 {
410 tracing::info!("storing genesis merklized state");
413 let tx = storage
414 .write()
415 .await
416 .context("starting transaction for genesis state")?;
417 store_genesis_state(tx, instance.chain_config, &instance.genesis_state)
418 .await
419 .context("storing genesis state")?;
420 }
421
422 while let Some(leaf) = leaves.next().await {
423 loop {
424 tracing::debug!(
425 height = leaf.height(),
426 node_id = instance.node_id,
427 ?leaf,
428 "updating persistent merklized state"
429 );
430 match update_state_storage(
431 &parent_state,
432 &storage,
433 &instance,
434 &peers,
435 &parent_leaf,
436 &leaf,
437 )
438 .await
439 {
440 Ok(state) => {
441 parent_leaf = leaf;
442 parent_state = state;
443 break;
444 },
445 Err(err) => {
446 tracing::error!(height = leaf.height(), "failed to update state: {err:#}");
447 sleep(Duration::from_secs(1)).await;
449 },
450 }
451 }
452 }
453
454 Ok(())
455}
456
457pub(crate) trait SequencerStateDataSource:
458 'static
459 + Debug
460 + AvailabilityDataSource<SeqTypes>
461 + StatusDataSource
462 + VersionedDataSource
463 + CatchupStorage
464 + RewardMerkleTreeDataSource
465 + PrunedHeightDataSource
466 + MerklizedStateHeightPersistence
467{
468}
469
470impl<T> SequencerStateDataSource for T where
471 T: 'static
472 + Debug
473 + AvailabilityDataSource<SeqTypes>
474 + StatusDataSource
475 + VersionedDataSource
476 + CatchupStorage
477 + RewardMerkleTreeDataSource
478 + PrunedHeightDataSource
479 + MerklizedStateHeightPersistence
480{
481}
482
483pub(crate) trait SequencerStateUpdate:
484 Transaction
485 + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
486 + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
487 + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
488 + ChainConfigPersistence
489{
490}
491
492impl<T> SequencerStateUpdate for T where
493 T: Transaction
494 + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
495 + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
496 + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
497 + ChainConfigPersistence
498{
499}