espresso_node/
state.rs

1use core::fmt::Debug;
2use std::{cmp::max, sync::Arc, time::Duration};
3
4use anyhow::{Context, bail, ensure};
5use async_lock::Mutex;
6use either::Either;
7use espresso_types::{
8    BlockMerkleTree, EpochRewardsCalculator, FeeAccount, FeeMerkleTree, Leaf2, ValidatedState,
9    traits::StateCatchup,
10    v0_3::{ChainConfig, RewardAccountV1, RewardMerkleTreeV1},
11    v0_4::Delta,
12};
13use futures::{StreamExt, future::Future};
14use hotshot::traits::ValidatedState as HotShotState;
15use hotshot_query_service::{
16    availability::{AvailabilityDataSource, LeafQueryData},
17    data_source::{Transaction, VersionedDataSource, storage::pruning::PrunedHeightDataSource},
18    merklized_state::{MerklizedStateHeightPersistence, UpdateStateData},
19    status::StatusDataSource,
20    types::HeightIndexed,
21};
22use jf_merkle_tree_compat::{
23    LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme,
24};
25use tokio::time::sleep;
26use vbs::version::Version;
27use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_VERSION};
28
29use crate::{
30    NodeState, SeqTypes,
31    api::RewardMerkleTreeDataSource,
32    catchup::{CatchupStorage, SqlStateCatchup},
33    persistence::ChainConfigPersistence,
34};
35
36pub(crate) async fn compute_state_update(
37    parent_state: &ValidatedState,
38    instance: &NodeState,
39    peers: &impl StateCatchup,
40    parent_leaf: &Leaf2,
41    proposed_leaf: &Leaf2,
42) -> anyhow::Result<(ValidatedState, Delta)> {
43    let header = proposed_leaf.block_header();
44
45    let mut parent_state = parent_state.clone();
46
47    // if the protocol has been upgraded, the new chain_config should be used
48    // as the base chain config for the call to `apply_header`. This mirrors the
49    // `apply_upgrade` step at the start of `apply_header`
50    //
51    // We need to do this here because this loop may need to process historical upgrades
52    // that are no longer recorded in our genesis file. but it's safe, because this loop
53    // only handles decided leaves
54    if proposed_leaf.block_header().version() > parent_leaf.block_header().version() {
55        parent_state.chain_config = proposed_leaf.block_header().chain_config()
56    }
57
58    let (state, delta, total_rewards_distributed) = parent_state
59        .apply_header(
60            instance,
61            peers,
62            parent_leaf,
63            header,
64            header.version(),
65            proposed_leaf.view_number(),
66        )
67        .await?;
68
69    // Check internal consistency.
70    ensure!(
71        state.chain_config.commit() == header.chain_config().commit(),
72        "internal error! in-memory chain config {:?} does not match header {:?}",
73        state.chain_config,
74        header.chain_config(),
75    );
76    ensure!(
77        state.block_merkle_tree.commitment() == header.block_merkle_tree_root(),
78        "internal error! in-memory block tree {} does not match header {}",
79        state.block_merkle_tree.commitment(),
80        header.block_merkle_tree_root()
81    );
82    ensure!(
83        state.fee_merkle_tree.commitment() == header.fee_merkle_tree_root(),
84        "internal error! in-memory fee tree {} does not match header {}",
85        state.fee_merkle_tree.commitment(),
86        header.fee_merkle_tree_root()
87    );
88
89    match header.reward_merkle_tree_root() {
90        Either::Left(v1_root) => {
91            ensure!(
92                state.reward_merkle_tree_v1.commitment() == v1_root,
93                "internal error! in-memory v1 reward tree {} does not match header {}",
94                state.reward_merkle_tree_v1.commitment(),
95                v1_root
96            )
97        },
98        Either::Right(v2_root) => {
99            ensure!(
100                state.reward_merkle_tree_v2.commitment() == v2_root,
101                "internal error! in-memory v2 reward tree {} does not match header {}",
102                state.reward_merkle_tree_v2.commitment(),
103                v2_root
104            )
105        },
106    }
107
108    if header.version() >= DRB_AND_HEADER_UPGRADE_VERSION {
109        let Some(actual_total) = total_rewards_distributed else {
110            bail!(
111                "internal error! total_rewards_distributed is None for version {:?}",
112                header.version()
113            );
114        };
115
116        let Some(proposed_total) = header.total_reward_distributed() else {
117            bail!(
118                "internal error! proposed header.total_reward_distributed() is None for version \
119                 {:?}",
120                header.version()
121            );
122        };
123
124        ensure!(
125            proposed_total == actual_total,
126            "Total rewards mismatch: proposed header has {proposed_total} but actual total is \
127             {actual_total}",
128        );
129    }
130
131    Ok((state, delta))
132}
133
134async fn store_state_update(
135    tx: &mut impl SequencerStateUpdate,
136    block_number: u64,
137    version: Version,
138    state: &ValidatedState,
139    delta: &Delta,
140) -> anyhow::Result<()> {
141    let ValidatedState {
142        fee_merkle_tree,
143        block_merkle_tree,
144        reward_merkle_tree_v1,
145        ..
146    } = state;
147    let Delta {
148        fees_delta,
149        rewards_delta,
150    } = delta;
151
152    // Collect fee merkle tree proofs for batch insertion
153    let fee_proofs: Vec<_> = fees_delta
154        .iter()
155        .map(|delta| {
156            let proof = match fee_merkle_tree.universal_lookup(*delta) {
157                LookupResult::Ok(_, proof) => proof,
158                LookupResult::NotFound(proof) => proof,
159                LookupResult::NotInMemory => bail!("missing merkle path for fee account {delta}"),
160            };
161            let path = FeeAccount::to_traversal_path(delta, fee_merkle_tree.height());
162            Ok((proof, path))
163        })
164        .collect::<anyhow::Result<Vec<_>>>()?;
165
166    tracing::debug!(count = fee_proofs.len(), "inserting fee accounts in batch");
167    UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes_batch(
168        tx,
169        fee_proofs,
170        block_number,
171    )
172    .await
173    .context("failed to store fee merkle nodes")?;
174
175    // Insert block merkle tree nodes
176    let (_, proof) = block_merkle_tree
177        .lookup(block_number - 1)
178        .expect_ok()
179        .context("getting blocks frontier")?;
180    let path = <u64 as ToTraversalPath<{ BlockMerkleTree::ARITY }>>::to_traversal_path(
181        &(block_number - 1),
182        block_merkle_tree.height(),
183    );
184
185    {
186        tracing::debug!("inserting blocks frontier");
187        UpdateStateData::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::insert_merkle_nodes(
188            tx,
189            proof,
190            path,
191            block_number,
192        )
193        .await
194        .context("failed to store block merkle nodes")?;
195    }
196
197    if version <= EPOCH_VERSION {
198        // Collect reward merkle tree v1 proofs for batch insertion
199        let reward_proofs: Vec<_> = rewards_delta
200            .iter()
201            .map(|delta| {
202                let key = RewardAccountV1::from(*delta);
203                let proof = match reward_merkle_tree_v1.universal_lookup(key) {
204                    LookupResult::Ok(_, proof) => proof,
205                    LookupResult::NotFound(proof) => proof,
206                    LookupResult::NotInMemory => {
207                        bail!("missing merkle path for reward account {delta}")
208                    },
209                };
210                let path = <RewardAccountV1 as ToTraversalPath<
211                        { RewardMerkleTreeV1::ARITY },
212                    >>::to_traversal_path(
213                        &key, reward_merkle_tree_v1.height()
214                    );
215                Ok((proof, path))
216            })
217            .collect::<anyhow::Result<Vec<_>>>()?;
218
219        tracing::debug!(
220            count = reward_proofs.len(),
221            "inserting v1 reward accounts in batch"
222        );
223        UpdateStateData::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::insert_merkle_nodes_batch(
224            tx,
225            reward_proofs,
226            block_number,
227        )
228        .await
229        .context("failed to store reward merkle nodes")?;
230    }
231
232    Ok(())
233}
234
235#[tracing::instrument(
236    skip_all,
237    fields(
238        node_id = instance.node_id,
239        view = ?parent_leaf.leaf().view_number(),
240        height = parent_leaf.height(),
241    ),
242)]
243async fn update_state_storage<T>(
244    parent_state: &ValidatedState,
245    storage: &Arc<T>,
246    instance: &NodeState,
247    peers: &impl StateCatchup,
248    parent_leaf: &LeafQueryData<SeqTypes>,
249    proposed_leaf: &LeafQueryData<SeqTypes>,
250) -> anyhow::Result<ValidatedState>
251where
252    T: SequencerStateDataSource,
253    for<'a> T::Transaction<'a>: SequencerStateUpdate,
254{
255    let parent_chain_config = parent_state.chain_config;
256    let block_number = proposed_leaf.height();
257    let version = proposed_leaf.header().version();
258
259    let (state, delta) = compute_state_update(
260        parent_state,
261        instance,
262        peers,
263        &parent_leaf.leaf().clone(),
264        &proposed_leaf.leaf().clone(),
265    )
266    .await
267    .context("computing state update")?;
268
269    if version > EPOCH_VERSION && !delta.rewards_delta.is_empty() {
270        storage
271            .save_and_gc_reward_tree_v2(
272                instance,
273                block_number,
274                version,
275                &state.reward_merkle_tree_v2,
276            )
277            .await
278            .context("failed to save and gc reward merkle tree v2")?;
279
280        storage
281            .persist_reward_proofs(instance, block_number, version)
282            .await
283            .context("failed to persist reward proofs")?;
284    }
285
286    tracing::debug!("storing state update");
287    let mut tx = storage
288        .write()
289        .await
290        .context("opening transaction for state update")?;
291
292    store_state_update(&mut tx, block_number, version, &state, &delta).await?;
293
294    tx.commit().await?;
295
296    let mut tx = storage
297        .write()
298        .await
299        .context("opening transaction for state update")?;
300
301    if parent_chain_config != state.chain_config {
302        let cf = state
303            .chain_config
304            .resolve()
305            .context("failed to resolve to chain config")?;
306
307        tx.insert_chain_config(cf).await?;
308    }
309
310    tracing::debug!(block_number, "updating state height");
311    UpdateStateData::<SeqTypes, _, { BlockMerkleTree::ARITY }>::set_last_state_height(
312        &mut tx,
313        block_number as usize,
314    )
315    .await
316    .context("setting state height")?;
317
318    tx.commit().await?;
319
320    Ok(state)
321}
322
323async fn store_genesis_state<T>(
324    mut tx: T,
325    chain_config: ChainConfig,
326    state: &ValidatedState,
327) -> anyhow::Result<()>
328where
329    T: SequencerStateUpdate,
330{
331    ensure!(
332        state.block_merkle_tree.num_leaves() == 0,
333        "genesis state with non-empty block tree is unsupported"
334    );
335
336    // Insert fee merkle tree nodes
337    for (account, _) in state.fee_merkle_tree.iter() {
338        let proof = match state.fee_merkle_tree.universal_lookup(account) {
339            LookupResult::Ok(_, proof) => proof,
340            LookupResult::NotFound(proof) => proof,
341            LookupResult::NotInMemory => bail!("missing merkle path for fee account {account}"),
342        };
343        let path: Vec<usize> =
344            <FeeAccount as ToTraversalPath<{ FeeMerkleTree::ARITY }>>::to_traversal_path(
345                account,
346                state.fee_merkle_tree.height(),
347            );
348
349        UpdateStateData::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::insert_merkle_nodes(
350            &mut tx, proof, path, 0,
351        )
352        .await
353        .context("failed to store fee merkle nodes")?;
354    }
355
356    tx.insert_chain_config(chain_config).await?;
357
358    tx.commit().await?;
359    Ok(())
360}
361
362#[tracing::instrument(skip_all)]
363pub(crate) async fn update_state_storage_loop<T>(
364    storage: Arc<T>,
365    instance: impl Future<Output = NodeState>,
366) -> anyhow::Result<()>
367where
368    T: SequencerStateDataSource,
369    for<'a> T::Transaction<'a>: SequencerStateUpdate,
370{
371    let mut instance = instance.await;
372    // Use a separate rewards calculator for the state loop so it doesn't
373    // interfere with consensus, which may be on a very different epoch.
374    instance.epoch_rewards_calculator = Arc::new(Mutex::new(EpochRewardsCalculator::new()));
375    let peers = SqlStateCatchup::new(storage.clone(), Default::default());
376
377    // get last saved merklized state
378    let (last_height, parent_leaf, mut leaves) = {
379        let last_height = storage.get_last_state_height().await?;
380        let pruned_height = storage.load_pruned_height().await?;
381
382        let height = match pruned_height {
383            // If `last_height > pruned_height`, start from `last_height`
384            // as it represents the latest state in storage.
385            // If `pruned_height > last_height`, start from `pruned_height`
386            // as data below this height is no longer needed and will be pruned again during the next pruner run.
387            Some(pruned_height) => max(last_height, pruned_height as usize + 1),
388            // if we have not pruned any data then just start from last_height
389            None => last_height,
390        };
391
392        let current_height = storage.block_height().await?;
393        tracing::info!(
394            node_id = instance.node_id,
395            last_height,
396            current_height,
397            "updating state storage"
398        );
399
400        let parent_leaf = storage.get_leaf(height).await;
401        let leaves = storage.subscribe_leaves(height + 1).await;
402        (last_height, parent_leaf, leaves)
403    };
404    // resolve the parent leaf future _after_ dropping our lock on the state, in case it is not
405    // ready yet and another task needs a mutable lock on the state to produce the parent leaf.
406    let mut parent_leaf = parent_leaf.await;
407    let mut parent_state = ValidatedState::from_header(parent_leaf.header());
408
409    if last_height == 0 {
410        // If the last height is 0, we need to insert the genesis state, since this state is
411        // never the result of a state update and thus is not inserted in the loop below.
412        tracing::info!("storing genesis merklized state");
413        let tx = storage
414            .write()
415            .await
416            .context("starting transaction for genesis state")?;
417        store_genesis_state(tx, instance.chain_config, &instance.genesis_state)
418            .await
419            .context("storing genesis state")?;
420    }
421
422    while let Some(leaf) = leaves.next().await {
423        loop {
424            tracing::debug!(
425                height = leaf.height(),
426                node_id = instance.node_id,
427                ?leaf,
428                "updating persistent merklized state"
429            );
430            match update_state_storage(
431                &parent_state,
432                &storage,
433                &instance,
434                &peers,
435                &parent_leaf,
436                &leaf,
437            )
438            .await
439            {
440                Ok(state) => {
441                    parent_leaf = leaf;
442                    parent_state = state;
443                    break;
444                },
445                Err(err) => {
446                    tracing::error!(height = leaf.height(), "failed to update state: {err:#}");
447                    // If we fail, delay for a second and retry.
448                    sleep(Duration::from_secs(1)).await;
449                },
450            }
451        }
452    }
453
454    Ok(())
455}
456
457pub(crate) trait SequencerStateDataSource:
458    'static
459    + Debug
460    + AvailabilityDataSource<SeqTypes>
461    + StatusDataSource
462    + VersionedDataSource
463    + CatchupStorage
464    + RewardMerkleTreeDataSource
465    + PrunedHeightDataSource
466    + MerklizedStateHeightPersistence
467{
468}
469
470impl<T> SequencerStateDataSource for T where
471    T: 'static
472        + Debug
473        + AvailabilityDataSource<SeqTypes>
474        + StatusDataSource
475        + VersionedDataSource
476        + CatchupStorage
477        + RewardMerkleTreeDataSource
478        + PrunedHeightDataSource
479        + MerklizedStateHeightPersistence
480{
481}
482
483pub(crate) trait SequencerStateUpdate:
484    Transaction
485    + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
486    + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
487    + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
488    + ChainConfigPersistence
489{
490}
491
492impl<T> SequencerStateUpdate for T where
493    T: Transaction
494        + UpdateStateData<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>
495        + UpdateStateData<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
496        + UpdateStateData<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>
497        + ChainConfigPersistence
498{
499}