Skip to main content

espresso_node/
state.rs

1use core::fmt::Debug;
2use std::{cmp::max, sync::Arc, time::Duration};
3
4use anyhow::{Context, bail, ensure};
5use async_lock::Mutex;
6use either::Either;
7use espresso_types::{
8    BlockMerkleTree, EpochRewardsCalculator, FeeAccount, FeeMerkleTree, Leaf2, ValidatedState,
9    traits::StateCatchup,
10    v0_3::{ChainConfig, RewardAccountV1, RewardMerkleTreeV1},
11    v0_4::Delta,
12};
13use futures::{StreamExt, future::Future};
14use hotshot::traits::ValidatedState as HotShotState;
15use hotshot_query_service::{
16    availability::{AvailabilityDataSource, LeafQueryData},
17    data_source::{Transaction, VersionedDataSource, storage::pruning::PrunedHeightDataSource},
18    merklized_state::{MerklizedStateHeightPersistence, UpdateStateData},
19    status::StatusDataSource,
20    types::HeightIndexed,
21};
22use hotshot_types::utils::is_last_block;
23use jf_merkle_tree_compat::{
24    LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme,
25};
26use tokio::time::sleep;
27use vbs::version::Version;
28use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION, EPOCH_VERSION};
29
30use crate::{
31    NodeState, SeqTypes,
32    api::{RewardMerkleTreeDataSource, RewardMerkleTreeV2Data},
33    catchup::{CatchupStorage, SqlStateCatchup},
34    persistence::ChainConfigPersistence,
35};
36
37pub(crate) async fn compute_state_update(
38    parent_state: &ValidatedState,
39    instance: &NodeState,
40    peers: &impl StateCatchup,
41    parent_leaf: &Leaf2,
42    proposed_leaf: &Leaf2,
43) -> anyhow::Result<(ValidatedState, Delta)> {
44    let header = proposed_leaf.block_header();
45
46    let mut parent_state = parent_state.clone();
47
48    // if the protocol has been upgraded, the new chain_config should be used
49    // as the base chain config for the call to `apply_header`. This mirrors the
50    // `apply_upgrade` step at the start of `apply_header`
51    //
52    // We need to do this here because this loop may need to process historical upgrades
53    // that are no longer recorded in our genesis file. but it's safe, because this loop
54    // only handles decided leaves
55    if proposed_leaf.block_header().version() > parent_leaf.block_header().version() {
56        parent_state.chain_config = proposed_leaf.block_header().chain_config()
57    }
58
59    let (state, delta, total_rewards_distributed) = parent_state
60        .apply_header(
61            instance,
62            peers,
63            parent_leaf,
64            header,
65            header.version(),
66            proposed_leaf.view_number(),
67        )
68        .await?;
69
70    // Check internal consistency.
71    ensure!(
72        state.chain_config.commit() == header.chain_config().commit(),
73        "internal error! in-memory chain config {:?} does not match header {:?}",
74        state.chain_config,
75        header.chain_config(),
76    );
77    ensure!(
78        state.block_merkle_tree.commitment() == header.block_merkle_tree_root(),
79        "internal error! in-memory block tree {} does not match header {}",
80        state.block_merkle_tree.commitment(),
81        header.block_merkle_tree_root()
82    );
83    ensure!(
84        state.fee_merkle_tree.commitment() == header.fee_merkle_tree_root(),
85        "internal error! in-memory fee tree {} does not match header {}",
86        state.fee_merkle_tree.commitment(),
87        header.fee_merkle_tree_root()
88    );
89
90    match header.reward_merkle_tree_root() {
91        Either::Left(v1_root) => {
92            ensure!(
93                state.reward_merkle_tree_v1.commitment() == v1_root,
94                "internal error! in-memory v1 reward tree {} does not match header {}",
95                state.reward_merkle_tree_v1.commitment(),
96                v1_root
97            )
98        },
99        Either::Right(v2_root) => {
100            ensure!(
101                state.reward_merkle_tree_v2.commitment() == v2_root,
102                "internal error! in-memory v2 reward tree {} does not match header {}",
103                state.reward_merkle_tree_v2.commitment(),
104                v2_root
105            )
106        },
107    }
108
109    if header.version() >= DRB_AND_HEADER_UPGRADE_VERSION {
110        let Some(actual_total) = total_rewards_distributed else {
111            bail!(
112                "internal error! total_rewards_distributed is None for version {:?}",
113                header.version()
114            );
115        };
116
117        let Some(proposed_total) = header.total_reward_distributed() else {
118            bail!(
119                "internal error! proposed header.total_reward_distributed() is None for version \
120                 {:?}",
121                header.version()
122            );
123        };
124
125        ensure!(
126            proposed_total == actual_total,
127            "Total rewards mismatch: proposed header has {proposed_total} but actual total is \
128             {actual_total}",
129        );
130    }
131
132    Ok((state, delta))
133}
134
135async fn store_state_update(
136    tx: &mut impl SequencerStateUpdate,
137    block_number: u64,
138    version: Version,
139    state: &ValidatedState,
140    delta: &Delta,
141) -> anyhow::Result<()> {
142    let ValidatedState {
143        fee_merkle_tree,
144        block_merkle_tree,
145        reward_merkle_tree_v1,
146        ..
147    } = state;
148    let Delta {
149        fees_delta,
150        rewards_delta,
151    } = delta;
152
153    // Collect fee merkle tree proofs for batch insertion
154    let fee_proofs: Vec<_> = fees_delta
155        .iter()
156        .map(|delta| {
157            let proof = match fee_merkle_tree.universal_lookup(*delta) {
158                LookupResult::Ok(_, proof) => proof,
159                LookupResult::NotFound(proof) => proof,
160                LookupResult::NotInMemory => bail!("missing merkle path for fee account {delta}"),
161            };
162            let path = FeeAccount::to_traversal_path(delta, fee_merkle_tree.height());
163            Ok((proof, path))
164        })
165        .collect::<anyhow::Result<Vec<_>>>()?;
166
167    tracing::debug!(count = fee_proofs.len(), "inserting fee accounts in batch");
168    UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes_batch(
169        tx,
170        fee_proofs,
171        block_number,
172    )
173    .await
174    .context("failed to store fee merkle nodes")?;
175
176    // Insert block merkle tree nodes
177    let (_, proof) = block_merkle_tree
178        .lookup(block_number - 1)
179        .expect_ok()
180        .context("getting blocks frontier")?;
181    let path = <u64 as ToTraversalPath<{ BlockMerkleTree::ARITY }>>::to_traversal_path(
182        &(block_number - 1),
183        block_merkle_tree.height(),
184    );
185
186    {
187        tracing::debug!("inserting blocks frontier");
188        UpdateStateData::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::insert_merkle_nodes(
189            tx,
190            proof,
191            path,
192            block_number,
193        )
194        .await
195        .context("failed to store block merkle nodes")?;
196    }
197
198    if version <= EPOCH_VERSION {
199        // Collect reward merkle tree v1 proofs for batch insertion
200        let reward_proofs: Vec<_> = rewards_delta
201            .iter()
202            .map(|delta| {
203                let key = RewardAccountV1::from(*delta);
204                let proof = match reward_merkle_tree_v1.universal_lookup(key) {
205                    LookupResult::Ok(_, proof) => proof,
206                    LookupResult::NotFound(proof) => proof,
207                    LookupResult::NotInMemory => {
208                        bail!("missing merkle path for reward account {delta}")
209                    },
210                };
211                let path = <RewardAccountV1 as ToTraversalPath<
212                        { RewardMerkleTreeV1::ARITY },
213                    >>::to_traversal_path(
214                        &key, reward_merkle_tree_v1.height()
215                    );
216                Ok((proof, path))
217            })
218            .collect::<anyhow::Result<Vec<_>>>()?;
219
220        tracing::debug!(
221            count = reward_proofs.len(),
222            "inserting v1 reward accounts in batch"
223        );
224        UpdateStateData::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::insert_merkle_nodes_batch(
225            tx,
226            reward_proofs,
227            block_number,
228        )
229        .await
230        .context("failed to store reward merkle nodes")?;
231    }
232
233    Ok(())
234}
235
236#[tracing::instrument(
237    skip_all,
238    fields(
239        node_id = instance.node_id,
240        view = ?parent_leaf.leaf().view_number(),
241        height = parent_leaf.height(),
242    ),
243)]
244async fn update_state_storage<T>(
245    parent_state: &ValidatedState,
246    storage: &Arc<T>,
247    instance: &NodeState,
248    peers: &impl StateCatchup,
249    parent_leaf: &LeafQueryData<SeqTypes>,
250    proposed_leaf: &LeafQueryData<SeqTypes>,
251) -> anyhow::Result<ValidatedState>
252where
253    T: SequencerStateDataSource,
254    for<'a> T::Transaction<'a>: SequencerStateUpdate,
255{
256    let parent_chain_config = parent_state.chain_config;
257    let block_number = proposed_leaf.height();
258    let version = proposed_leaf.header().version();
259
260    let (state, delta) = compute_state_update(
261        parent_state,
262        instance,
263        peers,
264        &parent_leaf.leaf().clone(),
265        &proposed_leaf.leaf().clone(),
266    )
267    .await
268    .context("computing state update")?;
269
270    let has_changed_accounts = version > EPOCH_VERSION && !delta.rewards_delta.is_empty();
271    // For EPOCH_REWARD_VERSION+ we must persist the reward tree at every epoch
272    // boundary, even when no rewards were distributed. During a V4→V5 upgrade
273    // the first post upgrade epoch boundary skips rewards (the previous epoch's
274    // header is pre-V5), leaving rewards_delta empty. Without saving here, the
275    // tree would be missing from storage and catchup requests from peers or
276    // subsequent epoch reward calculations would fail.
277    //
278    // Example: V4→V5 upgrade at block 9756, epoch_height=3000.
279    // At block 12000 (first epoch boundary post upgrade), handle_epoch_rewards
280    // skips rewards because the previous epoch's boundary (block 9000) is pre-V5,
281    // so rewards_delta is empty. Without saving here, the tree is never persisted
282    // at height 12000. Later at block 15000, the epoch 4 reward calculation calls
283    // fetch_reward_merkle_tree_v2(height=12000) to catch up missing accounts and
284    // fails because no tree exists in storage at that height.
285    let is_epoch_boundary = version >= EPOCH_REWARD_VERSION
286        && is_last_block(
287            block_number,
288            instance
289                .epoch_height
290                .expect("epoch_height should be set for version > V3"),
291        );
292
293    if has_changed_accounts || is_epoch_boundary {
294        storage
295            .save_and_gc_reward_tree_v2(
296                instance,
297                block_number,
298                version,
299                &state.reward_merkle_tree_v2,
300            )
301            .await
302            .context("failed to save and gc reward merkle tree v2")?;
303    }
304
305    storage
306        .persist_reward_proofs(instance, block_number, version)
307        .await
308        .context("failed to persist reward proofs")?;
309
310    tracing::debug!("storing state update");
311    let mut tx = storage
312        .write()
313        .await
314        .context("opening transaction for state update")?;
315
316    store_state_update(&mut tx, block_number, version, &state, &delta).await?;
317
318    tx.commit().await?;
319
320    let mut tx = storage
321        .write()
322        .await
323        .context("opening transaction for state update")?;
324
325    if parent_chain_config != state.chain_config {
326        let cf = state
327            .chain_config
328            .resolve()
329            .context("failed to resolve to chain config")?;
330
331        tx.insert_chain_config(cf).await?;
332    }
333
334    tracing::debug!(block_number, "updating state height");
335    UpdateStateData::<SeqTypes, _, { BlockMerkleTree::ARITY }>::set_last_state_height(
336        &mut tx,
337        block_number as usize,
338    )
339    .await
340    .context("setting state height")?;
341
342    tx.commit().await?;
343
344    Ok(state)
345}
346
347async fn store_genesis_state<S>(
348    storage: &S,
349    chain_config: ChainConfig,
350    state: &ValidatedState,
351) -> anyhow::Result<()>
352where
353    S: SequencerStateDataSource,
354    for<'a> S::Transaction<'a>: SequencerStateUpdate,
355{
356    ensure!(
357        state.block_merkle_tree.num_leaves() == 0,
358        "genesis state with non-empty block tree is unsupported"
359    );
360
361    let mut tx = storage
362        .write()
363        .await
364        .context("starting transaction for genesis state")?;
365
366    // Insert fee merkle tree nodes
367    for (account, _) in state.fee_merkle_tree.iter() {
368        let proof = match state.fee_merkle_tree.universal_lookup(account) {
369            LookupResult::Ok(_, proof) => proof,
370            LookupResult::NotFound(proof) => proof,
371            LookupResult::NotInMemory => bail!("missing merkle path for fee account {account}"),
372        };
373        let path: Vec<usize> =
374            <FeeAccount as ToTraversalPath<{ FeeMerkleTree::ARITY }>>::to_traversal_path(
375                account,
376                state.fee_merkle_tree.height(),
377            );
378
379        UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes(
380            &mut tx, proof, path, 0,
381        )
382        .await
383        .context("failed to store fee merkle nodes")?;
384    }
385
386    tx.insert_chain_config(chain_config).await?;
387
388    tx.commit().await?;
389
390    // Store the genesis reward tree at height 0 so catchup can find it.
391    let tree_data: RewardMerkleTreeV2Data = (&state.reward_merkle_tree_v2)
392        .try_into()
393        .context("serializing genesis reward tree")?;
394    let tree_bytes = bincode::serialize(&tree_data).context("serializing genesis reward tree")?;
395    storage
396        .persist_tree(0, tree_bytes)
397        .await
398        .context("storing genesis reward merkle tree")?;
399
400    Ok(())
401}
402
403#[tracing::instrument(skip_all)]
404pub(crate) async fn update_state_storage_loop<T>(
405    storage: Arc<T>,
406    instance: impl Future<Output = NodeState>,
407) -> anyhow::Result<()>
408where
409    T: SequencerStateDataSource,
410    for<'a> T::Transaction<'a>: SequencerStateUpdate,
411{
412    let mut instance = instance.await;
413    // Use a separate rewards calculator for the state loop so it doesn't
414    // interfere with consensus, which may be on a very different epoch.
415    instance.epoch_rewards_calculator = Arc::new(Mutex::new(EpochRewardsCalculator::new()));
416    let peers = SqlStateCatchup::new(storage.clone(), Default::default());
417
418    // get last saved merklized state
419    let (last_height, parent_leaf, mut leaves) = {
420        let last_height = storage.get_last_state_height().await?;
421        let pruned_height = storage.load_pruned_height().await?;
422
423        let height = match pruned_height {
424            // If `last_height > pruned_height`, start from `last_height`
425            // as it represents the latest state in storage.
426            // If `pruned_height > last_height`, start from `pruned_height`
427            // as data below this height is no longer needed and will be pruned again during the next pruner run.
428            Some(pruned_height) => max(last_height, pruned_height as usize + 1),
429            // if we have not pruned any data then just start from last_height
430            None => last_height,
431        };
432
433        // Check for environment variable override
434        let height =
435            if let Ok(env_height) = std::env::var("ESPRESSO_NODE_STATE_STORAGE_INITIAL_HEIGHT") {
436                match env_height.parse::<usize>() {
437                    Ok(override_height) => {
438                        tracing::error!(
439                            node_id = instance.node_id,
440                            calculated_height = height,
441                            override_height,
442                            "overriding initial state storage height from environment variable"
443                        );
444                        override_height
445                    },
446                    Err(e) => {
447                        tracing::error!(
448                            "failed to parse ESPRESSO_NODE_STATE_STORAGE_INITIAL_HEIGHT: {e}, \
449                             using calculated height {height}"
450                        );
451                        height
452                    },
453                }
454            } else {
455                height
456            };
457
458        let current_height = storage.block_height().await?;
459        tracing::info!(
460            node_id = instance.node_id,
461            last_height,
462            height,
463            current_height,
464            "updating state storage"
465        );
466
467        let parent_leaf = AvailabilityDataSource::get_leaf(&*storage, height).await;
468        let leaves = storage.subscribe_leaves(height + 1).await;
469        (last_height, parent_leaf, leaves)
470    };
471    // resolve the parent leaf future _after_ dropping our lock on the state, in case it is not
472    // ready yet and another task needs a mutable lock on the state to produce the parent leaf.
473    let mut parent_leaf = parent_leaf.await;
474    let mut parent_state = ValidatedState::from_header(parent_leaf.header());
475
476    if last_height == 0 {
477        // If the last height is 0, we need to insert the genesis state, since this state is
478        // never the result of a state update and thus is not inserted in the loop below.
479        tracing::info!("storing genesis merklized state");
480        store_genesis_state(&*storage, instance.chain_config, &instance.genesis_state)
481            .await
482            .context("storing genesis state")?;
483    }
484
485    while let Some(leaf) = leaves.next().await {
486        loop {
487            tracing::debug!(
488                height = leaf.height(),
489                node_id = instance.node_id,
490                ?leaf,
491                "updating persistent merklized state"
492            );
493            match update_state_storage(
494                &parent_state,
495                &storage,
496                &instance,
497                &peers,
498                &parent_leaf,
499                &leaf,
500            )
501            .await
502            {
503                Ok(state) => {
504                    parent_leaf = leaf;
505                    parent_state = state;
506                    break;
507                },
508                Err(err) => {
509                    tracing::error!(height = leaf.height(), "failed to update state: {err:#}");
510                    // If we fail, delay for a second and retry.
511                    sleep(Duration::from_secs(1)).await;
512                },
513            }
514        }
515    }
516
517    Ok(())
518}
519
520pub(crate) trait SequencerStateDataSource:
521    'static
522    + Debug
523    + AvailabilityDataSource<SeqTypes>
524    + StatusDataSource
525    + VersionedDataSource
526    + CatchupStorage
527    + RewardMerkleTreeDataSource
528    + PrunedHeightDataSource
529    + MerklizedStateHeightPersistence
530{
531}
532
533impl<T> SequencerStateDataSource for T where
534    T: 'static
535        + Debug
536        + AvailabilityDataSource<SeqTypes>
537        + StatusDataSource
538        + VersionedDataSource
539        + CatchupStorage
540        + RewardMerkleTreeDataSource
541        + PrunedHeightDataSource
542        + MerklizedStateHeightPersistence
543{
544}
545
546pub(crate) trait SequencerStateUpdate:
547    Transaction
548    + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
549    + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
550    + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
551    + ChainConfigPersistence
552{
553}
554
555impl<T> SequencerStateUpdate for T where
556    T: Transaction
557        + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
558        + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
559        + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
560        + ChainConfigPersistence
561{
562}