Skip to main content

espresso_node/api/
sql.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2
3use anyhow::{Context, bail, ensure};
4use async_trait::async_trait;
5use committable::{Commitment, Committable};
6use espresso_types::{
7    BlockMerkleTree, ChainConfig, FeeAccount, FeeMerkleTree, Leaf2, NodeState, ValidatedState,
8    get_l1_deposits,
9    v0_1::IterableFeeInfo,
10    v0_3::{
11        REWARD_MERKLE_TREE_V1_HEIGHT, RewardAccountProofV1, RewardAccountQueryDataV1,
12        RewardAccountV1, RewardMerkleTreeV1,
13    },
14    v0_4::{PermittedRewardMerkleTreeV2, RewardAccountV2, RewardMerkleTreeV2},
15    v0_6::RewardAccountQueryDataV2,
16};
17use futures::future::Future;
18use hotshot::traits::ValidatedState as _;
19use hotshot_query_service::{
20    Resolvable,
21    availability::LeafId,
22    data_source::{
23        VersionedDataSource,
24        sql::{Config, SqlDataSource, Transaction},
25        storage::{
26            AvailabilityStorage, MerklizedStateStorage, NodeStorage, SqlStorage,
27            pruning::PrunerConfig,
28            sql::{Db, TransactionMode, Write, query_as},
29        },
30    },
31    merklized_state::Snapshot,
32};
33use hotshot_types::{
34    data::{EpochNumber, QuorumProposalWrapper, ViewNumber},
35    message::Proposal,
36    traits::election::MembershipSnapshot,
37    utils::{epoch_from_block_number, is_last_block},
38    vote::HasViewNumber,
39};
40use jf_merkle_tree_compat::{
41    ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, LookupResult,
42    MerkleTreeScheme, prelude::MerkleNode,
43};
44use sqlx::{Encode, Row, Type};
45use vbs::version::Version;
46use versions::{
47    DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION, EPOCH_VERSION, NEW_PROTOCOL_VERSION,
48};
49
50use super::{
51    BlocksFrontier,
52    data_source::{Provider, SequencerDataSource},
53};
54use crate::{
55    SeqTypes,
56    api::RewardMerkleTreeDataSource,
57    catchup::{CatchupStorage, NullStateCatchup},
58    persistence::{ChainConfigPersistence, sql::Options},
59    state::compute_state_update,
60    util::BoundedJoinSet,
61};
62
63pub type DataSource = SqlDataSource<SeqTypes, Provider>;
64
65#[async_trait]
66impl SequencerDataSource for DataSource {
67    type Options = Options;
68
69    async fn create(opt: Self::Options, provider: Provider, reset: bool) -> anyhow::Result<Self> {
70        let fetch_limit = opt.fetch_rate_limit;
71        let active_fetch_delay = opt.active_fetch_delay;
72        let chunk_fetch_delay = opt.chunk_fetch_delay;
73        let mut cfg = Config::try_from(&opt)?;
74
75        if reset {
76            cfg = cfg.reset_schema();
77        }
78
79        let mut builder = cfg.builder(provider).await?;
80
81        if let Some(limit) = fetch_limit {
82            builder = builder.with_rate_limit(limit);
83        }
84
85        if opt.lightweight {
86            tracing::warn!("enabling light weight mode..");
87            builder = builder.leaf_only();
88        }
89
90        if let Some(delay) = active_fetch_delay {
91            builder = builder.with_active_fetch_delay(delay);
92        }
93        if let Some(delay) = chunk_fetch_delay {
94            builder = builder.with_chunk_fetch_delay(delay);
95        }
96        if let Some(chunk_size) = opt.sync_status_chunk_size {
97            builder = builder.with_sync_status_chunk_size(chunk_size);
98        }
99        if let Some(ttl) = opt.sync_status_ttl {
100            builder = builder.with_sync_status_ttl(ttl);
101        }
102        if let Some(chunk_size) = opt.proactive_scan_chunk_size {
103            builder = builder.with_proactive_range_chunk_size(chunk_size);
104        }
105        if let Some(interval) = opt.proactive_scan_interval {
106            builder = builder.with_proactive_interval(interval);
107        }
108        if opt.disable_proactive_fetching {
109            builder = builder.disable_proactive_fetching();
110        }
111
112        builder.build().await
113    }
114}
115
116impl RewardMerkleTreeDataSource for SqlStorage {
117    async fn load_v1_reward_account_proof(
118        &self,
119        height: u64,
120        account: RewardAccountV1,
121    ) -> anyhow::Result<RewardAccountQueryDataV1> {
122        let mut tx = self.read().await.context(format!(
123            "opening transaction to fetch v1 reward account {account:?}; height {height}"
124        ))?;
125
126        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
127            .await
128            .context("getting block height")? as u64;
129        ensure!(
130            block_height > 0,
131            "cannot get accounts for height {height}: no blocks available"
132        );
133
134        // Check if we have the desired state snapshot. If so, we can load the desired accounts
135        // directly.
136        if height < block_height {
137            let (tree, _) = load_v1_reward_accounts(self, height, &[account])
138                .await
139                .with_context(|| {
140                    format!("failed to load v1 reward account {account:?} at height {height}")
141                })?;
142
143            let (proof, balance) = RewardAccountProofV1::prove(&tree, account.into())
144                .with_context(|| {
145                    format!("reward account {account:?} not available at height {height}")
146                })?;
147
148            Ok(RewardAccountQueryDataV1 { balance, proof })
149        } else {
150            bail!(
151                "requested height {height} is not yet available (latest block height: \
152                 {block_height})"
153            );
154        }
155    }
156
157    fn persist_tree(
158        &self,
159        height: u64,
160        merkle_tree: Vec<u8>,
161    ) -> impl Send + Future<Output = anyhow::Result<()>> {
162        async move {
163            let mut tx = self
164                .write()
165                .await
166                .context("opening transaction for reward merkle tree v2")?;
167
168            tx.upsert(
169                "reward_merkle_tree_v2_data",
170                ["height", "balances"],
171                ["height"],
172                [(height as i64, merkle_tree)],
173            )
174            .await?;
175
176            hotshot_query_service::data_source::Transaction::commit(tx)
177                .await
178                .context("Transaction to store reward merkle tree v2 failed.")
179        }
180    }
181
182    fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
183        async move {
184            let mut tx = self
185                .read()
186                .await
187                .context("opening transaction for state update")?;
188
189            let row = sqlx::query(
190                r#"
191                SELECT balances
192                FROM reward_merkle_tree_v2_data
193                WHERE height = $1
194                "#,
195            )
196            .bind(height as i64)
197            .fetch_optional(tx.as_mut())
198            .await?
199            .context(format!(
200                "No reward merkle tree for height {} in storage",
201                height
202            ))?;
203
204            row.try_get::<Vec<u8>, _>("balances")
205                .context("Missing field balances from row; this should never happen")
206        }
207    }
208
209    fn load_latest_tree(
210        &self,
211        height: u64,
212    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
213        async move {
214            let mut tx = self
215                .read()
216                .await
217                .context("opening transaction for load_latest_tree")?;
218
219            let row = sqlx::query(
220                r#"
221                SELECT balances
222                FROM reward_merkle_tree_v2_data
223                WHERE height <= $1
224                ORDER BY height DESC
225                LIMIT 1
226                "#,
227            )
228            .bind(height as i64)
229            .fetch_optional(tx.as_mut())
230            .await?
231            .context(format!(
232                "No reward merkle tree at or below height {} in storage",
233                height
234            ))?;
235
236            row.try_get::<Vec<u8>, _>("balances")
237                .context("Missing field balances from row; this should never happen")
238        }
239    }
240
241    fn persist_proofs(
242        &self,
243        height: u64,
244        proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
245    ) -> impl Send + Future<Output = anyhow::Result<()>> {
246        async move {
247            let mut iter = proofs.map(|(account, proof)| (height as i64, account, proof));
248
249            loop {
250                let mut chunk = Vec::with_capacity(20);
251
252                for _ in 0..20 {
253                    let Some(row) = iter.next() else {
254                        continue;
255                    };
256                    chunk.push(row);
257                }
258
259                if chunk.is_empty() {
260                    break;
261                }
262
263                let mut tx = self
264                    .write()
265                    .await
266                    .context("opening transaction for state update")?;
267
268                tokio::spawn(async move {
269                    tx.upsert(
270                        "reward_merkle_tree_v2_proofs",
271                        ["height", "account", "proof"],
272                        ["height", "account"],
273                        chunk,
274                    )
275                    .await?;
276
277                    hotshot_query_service::data_source::Transaction::commit(tx)
278                        .await
279                        .context("Transaction to store reward merkle tree failed.")?;
280
281                    Ok::<_, anyhow::Error>(())
282                });
283            }
284            Ok(())
285        }
286    }
287
288    /// Load a reward proof for a given height and account.
289    ///
290    /// For V5+ (epoch rewards), if the requested height is not an epoch boundary,
291    /// resolves to the previous epoch's last block. Verifies the boundary block is
292    /// V5+ to handle V4→V5 upgrades.
293    /// For V4 (per-block rewards), proofs are stored at every block height.
294    fn load_proof(
295        &self,
296        height: u64,
297        account: Vec<u8>,
298        epoch_height: u64,
299    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
300        async move {
301            let mut tx = self
302                .read()
303                .await
304                .context("opening transaction for load_proof")?;
305
306            let leaf = tx
307                .get_leaf(LeafId::<SeqTypes>::from(height as usize))
308                .await
309                .context(format!("leaf {height} not available"))?;
310
311            // For V5+, resolve to the last epoch boundary where proofs were stored.
312            // For V4, proofs exist at every height.
313            let proof_height = if leaf.header().version() < EPOCH_REWARD_VERSION
314                || is_last_block(height, epoch_height)
315            {
316                height
317            } else {
318                let prev_epoch_last_block =
319                    epoch_from_block_number(height, epoch_height).saturating_sub(1) * epoch_height;
320                let boundary_leaf = tx
321                    .get_leaf(LeafId::<SeqTypes>::from(prev_epoch_last_block as usize))
322                    .await
323                    .context(format!(
324                        "leaf at epoch boundary {prev_epoch_last_block} not available"
325                    ))?;
326                ensure!(
327                    boundary_leaf.header().version() >= EPOCH_REWARD_VERSION,
328                    "no epoch reward proofs available at boundary {prev_epoch_last_block}"
329                );
330                prev_epoch_last_block
331            };
332
333            sqlx::query_scalar(
334                "SELECT proof FROM reward_merkle_tree_v2_proofs WHERE height = $1 AND account = $2",
335            )
336            .bind(proof_height as i64)
337            .bind(account)
338            .fetch_optional(tx.as_mut())
339            .await?
340            .context(format!(
341                "Missing proofs at height {proof_height} (resolved from {height})"
342            ))
343        }
344    }
345
346    fn load_latest_proof(
347        &self,
348        account: Vec<u8>,
349    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
350        async move {
351            let mut tx = self
352                .read()
353                .await
354                .context("opening transaction for state update")?;
355
356            let row = sqlx::query(
357                r#"
358                SELECT proof
359                FROM reward_merkle_tree_v2_proofs
360                WHERE account = $1
361                ORDER BY height DESC
362                LIMIT 1
363                "#,
364            )
365            .bind(account)
366            .fetch_optional(tx.as_mut())
367            .await?
368            .context("Missing proofs")?;
369
370            row.try_get::<Vec<u8>, _>("proof")
371                .context("Missing field proof from row; this should never happen")
372        }
373    }
374
375    fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
376        async move {
377            let batch_size = self.get_pruning_config().unwrap_or_default().batch_size();
378
379            // Postgres supports improved performance by using `FOR UPDATE` in a sub-select to
380            // acquire exclusive locks on visited rows, which are later deleted. SQLite does not
381            // support this syntax.
382            #[cfg(not(feature = "embedded-db"))]
383            let for_update = "FOR UPDATE";
384            #[cfg(feature = "embedded-db")]
385            let for_update = "";
386
387            // Delete batches from the merkle data table until there is nothing left to delete.
388            loop {
389                let mut tx = self
390                    .write()
391                    .await
392                    .context("opening transaction for state update")?;
393
394                let res = sqlx::query(&format!(
395                    "
396                    WITH delete_batch AS (
397                        SELECT d.height FROM reward_merkle_tree_v2_data AS d
398                            WHERE d.height < $1
399                            ORDER BY d.height DESC
400                            LIMIT $2
401                            {for_update}
402                    )
403                    DELETE FROM reward_merkle_tree_v2_data AS del
404                    WHERE del.height IN (SELECT * FROM delete_batch)
405                "
406                ))
407                .bind(height as i64)
408                .bind(batch_size as i64)
409                .execute(tx.as_mut())
410                .await?;
411
412                hotshot_query_service::data_source::Transaction::commit(tx)
413                    .await
414                    .context(
415                        "Transaction to garbage collect reward merkle trees from storage failed.",
416                    )?;
417
418                if res.rows_affected() == 0 {
419                    break;
420                } else {
421                    tracing::debug!(
422                        "deleted {} rows from reward_merkle_tree_v2_data",
423                        res.rows_affected()
424                    );
425                }
426            }
427
428            // Delete batches from the proofs table until there is nothing left to delete.
429            loop {
430                let mut tx = self
431                    .write()
432                    .await
433                    .context("opening transaction for state update")?;
434
435                let res = sqlx::query(&format!(
436                    "
437                    WITH delete_batch AS (
438                        SELECT d.height, d.account FROM reward_merkle_tree_v2_proofs AS d
439                            WHERE d.height < $1
440                            ORDER BY d.height, d.account DESC
441                            LIMIT $2
442                            {for_update}
443                    )
444                    DELETE FROM reward_merkle_tree_v2_proofs AS del
445                    WHERE (del.height, del.account) IN (SELECT * FROM delete_batch)
446                ",
447                ))
448                .bind(height as i64)
449                .bind(batch_size as i64)
450                .execute(tx.as_mut())
451                .await?;
452
453                hotshot_query_service::data_source::Transaction::commit(tx)
454                    .await
455                    .context("Transaction to garbage collect proofs from storage failed.")?;
456
457                if res.rows_affected() == 0 {
458                    break;
459                } else {
460                    tracing::debug!(
461                        "deleted {} rows from reward_merkle_tree_v2_proofs",
462                        res.rows_affected()
463                    );
464                }
465            }
466
467            Ok(())
468        }
469    }
470
471    fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool> {
472        async move {
473            let Ok(mut tx) = self.write().await else {
474                return false;
475            };
476
477            sqlx::query_as(
478                r#"
479                SELECT EXISTS(
480                SELECT 1 FROM reward_merkle_tree_v2_proofs
481                WHERE height = $1
482                )
483                "#,
484            )
485            .bind(height as i64)
486            .fetch_one(tx.as_mut())
487            .await
488            .ok()
489            .unwrap_or((false,))
490            .0
491        }
492    }
493    /// Generate and persist reward proofs for the current L1-finalized height.
494    ///
495    /// For V5+ (epoch rewards), the reward tree is only stored at epoch boundaries.
496    /// We resolve to the nearest epoch boundary to load the tree, verify it's V5+
497    /// (for V4→V5 upgrades), and store the generated proofs at the same boundary height.
498    /// For V4 (per-block rewards), the tree exists at every block height.
499    fn persist_reward_proofs(
500        &self,
501        node_state: &NodeState,
502        height: u64,
503        version: Version,
504    ) -> impl Send + Future<Output = anyhow::Result<()>> {
505        async move {
506            // Only attempt to persist proofs every 30th block.
507            //
508            // In tests, we persist proofs at every block height so
509            // reward claim tests can query proofs at arbitrary heights. This can be
510            // removed once all reward claim tests are updated
511            if !cfg!(any(test, feature = "testing"))
512                && !(height + node_state.node_id).is_multiple_of(30)
513            {
514                return Ok(());
515            }
516
517            // In tests, we use the current block height directly instead
518            // of querying the L1 finalized HotShot height, because test environments don't
519            // deploy a light client contract
520            let finalized_hotshot_height = if cfg!(any(test, feature = "testing")) {
521                height
522            } else {
523                match node_state.finalized_hotshot_height().await {
524                    Ok(h) => h,
525                    Err(err) => {
526                        tracing::warn!("failed to get finalized hotshot height: {err:#}");
527                        return Ok(());
528                    },
529                }
530            };
531
532            // Resolve which height to load the reward tree from.
533            // V4: tree exists at every height. V5+: only at epoch boundaries.
534            let mut tree_height = finalized_hotshot_height;
535            let mut proof_height = finalized_hotshot_height;
536            if version >= EPOCH_REWARD_VERSION {
537                let epoch_height = node_state
538                    .epoch_height
539                    .context("epoch_height not set in node state")?;
540                if !is_last_block(finalized_hotshot_height, epoch_height) {
541                    tree_height = epoch_from_block_number(finalized_hotshot_height, epoch_height)
542                        .saturating_sub(1)
543                        * epoch_height;
544                    if tree_height == 0 {
545                        return Ok(());
546                    }
547                    // During V4→V5 upgrades the previous epoch boundary may be V4.
548                    let mut tx = self.read().await.context("opening read transaction")?;
549                    let leaf = tx
550                        .get_leaf(LeafId::<SeqTypes>::from(tree_height as usize))
551                        .await
552                        .context(format!(
553                            "leaf at epoch boundary {tree_height} not available"
554                        ))?;
555                    if leaf.header().version() < EPOCH_REWARD_VERSION {
556                        return Ok(());
557                    }
558                }
559                proof_height = tree_height;
560            }
561
562            if self.proof_exists(proof_height).await {
563                return Ok(());
564            }
565
566            let permitted_tree = match self.load_reward_merkle_tree_v2(tree_height).await {
567                Ok(tree) => tree,
568                Err(err) => {
569                    tracing::warn!(tree_height, "failed to load reward merkle tree: {err:#}");
570                    return Ok(());
571                },
572            };
573
574            let tree = permitted_tree.tree;
575            let iter = tree
576                .iter()
577                .filter_map(|(account, balance): (&RewardAccountV2, _)| {
578                    let proof = espresso_types::v0_6::RewardAccountProofV2::prove(
579                        &tree,
580                        (*account).into(),
581                    )?;
582                    let proof = RewardAccountQueryDataV2 {
583                        balance: (*balance).into(),
584                        proof: proof.0,
585                    };
586                    Some((
587                        bincode::serialize(&account).ok()?,
588                        bincode::serialize(&proof).ok()?,
589                    ))
590                });
591
592            if let Err(err) = self.persist_proofs(proof_height, iter).await {
593                tracing::warn!(proof_height, "failed to persist proofs: {err:#}");
594            }
595
596            Ok(())
597        }
598    }
599}
600
601impl CatchupStorage for SqlStorage {
602    async fn get_reward_accounts_v1(
603        &self,
604        instance: &NodeState,
605        height: u64,
606        view: ViewNumber,
607        accounts: &[RewardAccountV1],
608    ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
609        let mut tx = self.read().await.context(format!(
610            "opening transaction to fetch v1 reward account {accounts:?}; height {height}"
611        ))?;
612
613        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
614            .await
615            .context("getting block height")? as u64;
616        ensure!(
617            block_height > 0,
618            "cannot get accounts for height {height}: no blocks available"
619        );
620
621        // Check if we have the desired state snapshot. If so, we can load the desired accounts
622        // directly.
623        if height < block_height {
624            load_v1_reward_accounts(self, height, accounts).await
625        } else {
626            let accounts: Vec<_> = accounts
627                .iter()
628                .map(|acct| RewardAccountV2::from(*acct))
629                .collect();
630            // If we do not have the exact snapshot we need, we can try going back to the last
631            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
632            let (state, leaf) = reconstruct_state(
633                instance,
634                self,
635                &mut tx,
636                block_height - 1,
637                view,
638                &[],
639                &accounts,
640            )
641            .await?;
642            Ok((state.reward_merkle_tree_v1, leaf))
643        }
644    }
645
646    async fn get_reward_accounts_v2(
647        &self,
648        instance: &NodeState,
649        height: u64,
650        view: ViewNumber,
651        accounts: &[RewardAccountV2],
652    ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
653        let mut tx = self.read().await.context(format!(
654            "opening transaction to fetch reward account {accounts:?}; height {height}"
655        ))?;
656
657        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
658            .await
659            .context("getting block height")? as u64;
660        ensure!(
661            block_height > 0,
662            "cannot get accounts for height {height}: no blocks available"
663        );
664
665        // Check if we have the desired state snapshot. If so, we can load the desired accounts
666        // directly.
667        if height < block_height {
668            load_reward_merkle_tree_v2(self, height)
669                .await
670                .map(|(permitted_tree, leaf)| (permitted_tree.tree, leaf))
671        } else {
672            // If we do not have the exact snapshot we need, we can try going back to the last
673            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
674            let (state, leaf) = reconstruct_state(
675                instance,
676                self,
677                &mut tx,
678                block_height - 1,
679                view,
680                &[],
681                accounts,
682            )
683            .await?;
684            Ok((state.reward_merkle_tree_v2, leaf))
685        }
686    }
687
688    async fn get_accounts(
689        &self,
690        instance: &NodeState,
691        height: u64,
692        view: ViewNumber,
693        accounts: &[FeeAccount],
694    ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
695        let mut tx = self.read().await.context(format!(
696            "opening transaction to fetch account {accounts:?}; height {height}"
697        ))?;
698
699        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
700            .await
701            .context("getting block height")? as u64;
702        ensure!(
703            block_height > 0,
704            "cannot get accounts for height {height}: no blocks available"
705        );
706
707        // Check if we have the desired state snapshot. If so, we can load the desired accounts
708        // directly.
709        if height < block_height {
710            load_accounts(&mut tx, height, accounts).await
711        } else {
712            // If we do not have the exact snapshot we need, we can try going back to the last
713            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
714            let (state, leaf) = reconstruct_state(
715                instance,
716                self,
717                &mut tx,
718                block_height - 1,
719                view,
720                accounts,
721                &[],
722            )
723            .await?;
724            Ok((state.fee_merkle_tree, leaf))
725        }
726    }
727
728    async fn get_frontier(
729        &self,
730        instance: &NodeState,
731        height: u64,
732        view: ViewNumber,
733    ) -> anyhow::Result<BlocksFrontier> {
734        let mut tx = self.read().await.context(format!(
735            "opening transaction to fetch frontier at height {height}"
736        ))?;
737
738        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
739            .await
740            .context("getting block height")? as u64;
741        ensure!(
742            block_height > 0,
743            "cannot get frontier for height {height}: no blocks available"
744        );
745
746        // Check if we have the desired state snapshot. If so, we can load the desired frontier
747        // directly.
748        if height < block_height {
749            load_frontier(&mut tx, height).await
750        } else {
751            // If we do not have the exact snapshot we need, we can try going back to the last
752            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
753            let (state, _) =
754                reconstruct_state(instance, self, &mut tx, block_height - 1, view, &[], &[])
755                    .await?;
756            match state.block_merkle_tree.lookup(height - 1) {
757                LookupResult::Ok(_, proof) => Ok(proof),
758                _ => {
759                    bail!(
760                        "state snapshot {view:?},{height} was found but does not contain frontier \
761                         at height {}; this should not be possible",
762                        height - 1
763                    );
764                },
765            }
766        }
767    }
768
769    async fn get_chain_config(
770        &self,
771        commitment: Commitment<ChainConfig>,
772    ) -> anyhow::Result<ChainConfig> {
773        let mut tx = self.read().await.context(format!(
774            "opening transaction to fetch chain config {commitment}"
775        ))?;
776        load_chain_config(&mut tx, commitment).await
777    }
778
779    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
780        let mut tx = self
781            .read()
782            .await
783            .context(format!("opening transaction to fetch leaf at {height}"))?;
784        let leaf: Leaf2 = tx
785            .get_leaf((height as usize).into())
786            .await
787            .context(format!("leaf {height} not available"))?
788            .leaf()
789            .clone();
790
791        // New protocol: cert2 alone proves finality. Return the leaf range up to the finalizing
792        // cert2's height
793        if leaf.block_header().version() >= NEW_PROTOCOL_VERSION {
794            let cert2: espresso_types::Certificate2<SeqTypes> =
795                tx.load_earliest_cert2(height).await?.context(format!(
796                    "no cert2 available for new-protocol leaf at {height}"
797                ))?;
798            let cert2_height = cert2.data.block_number;
799            let mut leaves = vec![leaf];
800            if height < cert2_height {
801                let descendants = tx
802                    .get_leaf_range((height as usize + 1)..=cert2_height as usize)
803                    .await?;
804                leaves.extend(descendants.into_iter().flatten().map(|l| l.leaf().clone()));
805            }
806            return Ok(leaves);
807        }
808
809        // Legacy protocol: build a 3-chain that decides `height`.
810        let mut last_leaf = leaf;
811        let mut chain = vec![last_leaf.clone()];
812        let mut h = height + 1;
813
814        loop {
815            let lqd = tx.get_leaf((h as usize).into()).await?;
816            let leaf = lqd.leaf();
817
818            if leaf.justify_qc().view_number() == last_leaf.view_number() {
819                chain.push(leaf.clone());
820            } else {
821                h += 1;
822                continue;
823            }
824
825            // just one away from deciding
826            if leaf.view_number() == last_leaf.view_number() + 1 {
827                last_leaf = leaf.clone();
828                h += 1;
829                break;
830            }
831            h += 1;
832            last_leaf = leaf.clone();
833        }
834
835        loop {
836            let lqd = tx.get_leaf((h as usize).into()).await?;
837            let leaf = lqd.leaf();
838            if leaf.justify_qc().view_number() == last_leaf.view_number() {
839                chain.push(leaf.clone());
840                break;
841            }
842            h += 1;
843        }
844
845        Ok(chain)
846    }
847
848    async fn load_earliest_cert2(
849        &self,
850        height: u64,
851    ) -> anyhow::Result<Option<espresso_types::Certificate2<SeqTypes>>> {
852        let mut tx = self
853            .read()
854            .await
855            .context("opening transaction to fetch cert2")?;
856        Ok(tx.load_earliest_cert2(height).await?)
857    }
858
859    async fn get_leaf(&self, height: u64) -> anyhow::Result<Leaf2> {
860        let mut tx = self
861            .read()
862            .await
863            .context(format!("opening transaction to fetch leaf at {height}"))?;
864        let lqd = tx
865            .get_leaf((height as usize).into())
866            .await
867            .context(format!("leaf {height} not available"))?;
868        Ok(lqd.leaf().clone())
869    }
870}
871
872impl RewardMerkleTreeDataSource for DataSource {
873    async fn load_v1_reward_account_proof(
874        &self,
875        height: u64,
876        account: RewardAccountV1,
877    ) -> anyhow::Result<RewardAccountQueryDataV1> {
878        self.as_ref()
879            .load_v1_reward_account_proof(height, account)
880            .await
881    }
882
883    fn persist_tree(
884        &self,
885        height: u64,
886        merkle_tree: Vec<u8>,
887    ) -> impl Send + Future<Output = anyhow::Result<()>> {
888        async move { self.as_ref().persist_tree(height, merkle_tree).await }
889    }
890
891    fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
892        async move { self.as_ref().load_tree(height).await }
893    }
894
895    fn load_latest_tree(
896        &self,
897        height: u64,
898    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
899        async move { self.as_ref().load_latest_tree(height).await }
900    }
901
902    fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
903        async move { self.as_ref().garbage_collect(height).await }
904    }
905
906    fn persist_proofs(
907        &self,
908        height: u64,
909        proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
910    ) -> impl Send + Future<Output = anyhow::Result<()>> {
911        async move { self.as_ref().persist_proofs(height, proofs).await }
912    }
913
914    fn load_proof(
915        &self,
916        height: u64,
917        account: Vec<u8>,
918        epoch_height: u64,
919    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
920        async move {
921            self.as_ref()
922                .load_proof(height, account, epoch_height)
923                .await
924        }
925    }
926
927    fn load_latest_proof(
928        &self,
929        account: Vec<u8>,
930    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
931        async move { self.as_ref().load_latest_proof(account).await }
932    }
933
934    fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool> {
935        async move { self.as_ref().proof_exists(height).await }
936    }
937
938    fn persist_reward_proofs(
939        &self,
940        node_state: &NodeState,
941        height: u64,
942        version: Version,
943    ) -> impl Send + Future<Output = anyhow::Result<()>> {
944        async move {
945            self.as_ref()
946                .persist_reward_proofs(node_state, height, version)
947                .await
948        }
949    }
950}
951
952impl CatchupStorage for DataSource {
953    async fn get_accounts(
954        &self,
955        instance: &NodeState,
956        height: u64,
957        view: ViewNumber,
958        accounts: &[FeeAccount],
959    ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
960        self.as_ref()
961            .get_accounts(instance, height, view, accounts)
962            .await
963    }
964
965    async fn get_reward_accounts_v2(
966        &self,
967        instance: &NodeState,
968        height: u64,
969        view: ViewNumber,
970        accounts: &[RewardAccountV2],
971    ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
972        self.as_ref()
973            .get_reward_accounts_v2(instance, height, view, accounts)
974            .await
975    }
976
977    async fn get_reward_accounts_v1(
978        &self,
979        instance: &NodeState,
980        height: u64,
981        view: ViewNumber,
982        accounts: &[RewardAccountV1],
983    ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
984        self.as_ref()
985            .get_reward_accounts_v1(instance, height, view, accounts)
986            .await
987    }
988
989    async fn get_frontier(
990        &self,
991        instance: &NodeState,
992        height: u64,
993        view: ViewNumber,
994    ) -> anyhow::Result<BlocksFrontier> {
995        self.as_ref().get_frontier(instance, height, view).await
996    }
997
998    async fn get_chain_config(
999        &self,
1000        commitment: Commitment<ChainConfig>,
1001    ) -> anyhow::Result<ChainConfig> {
1002        self.as_ref().get_chain_config(commitment).await
1003    }
1004    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
1005        self.as_ref().get_leaf_chain(height).await
1006    }
1007
1008    async fn load_earliest_cert2(
1009        &self,
1010        height: u64,
1011    ) -> anyhow::Result<Option<espresso_types::Certificate2<SeqTypes>>> {
1012        self.as_ref().load_earliest_cert2(height).await
1013    }
1014
1015    async fn get_leaf(&self, height: u64) -> anyhow::Result<Leaf2> {
1016        self.as_ref().get_leaf(height).await
1017    }
1018}
1019
1020#[async_trait]
1021impl ChainConfigPersistence for Transaction<Write> {
1022    async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()> {
1023        let commitment = chain_config.commitment();
1024        let data = bincode::serialize(&chain_config)?;
1025        self.upsert(
1026            "chain_config",
1027            ["commitment", "data"],
1028            ["commitment"],
1029            [(commitment.to_string(), data)],
1030        )
1031        .await
1032    }
1033}
1034
1035impl super::data_source::PruningDataSource for SqlStorage {
1036    async fn get_oldest_block(
1037        &self,
1038    ) -> anyhow::Result<Option<hotshot_query_service::availability::BlockQueryData<SeqTypes>>> {
1039        let mut tx = self
1040            .read()
1041            .await
1042            .context("opening transaction to fetch oldest block")?;
1043        let row = sqlx::query("SELECT MIN(height) AS height FROM header")
1044            .fetch_one(tx.as_mut())
1045            .await
1046            .context("failed to query oldest block height")?;
1047        let height: Option<i64> = row.try_get("height")?;
1048        match height {
1049            None => Ok(None),
1050            Some(h) => {
1051                let h = usize::try_from(h).context("block height out of range")?;
1052                Ok(Some(
1053                    tx.get_block(hotshot_query_service::availability::BlockId::<SeqTypes>::from(h))
1054                        .await
1055                        .context(format!("block {h} not available"))?,
1056                ))
1057            },
1058        }
1059    }
1060
1061    async fn get_oldest_leaf(
1062        &self,
1063    ) -> anyhow::Result<Option<hotshot_query_service::availability::LeafQueryData<SeqTypes>>> {
1064        let mut tx = self
1065            .read()
1066            .await
1067            .context("opening transaction to fetch oldest leaf")?;
1068        let row = sqlx::query("SELECT MIN(height) AS height FROM leaf2")
1069            .fetch_one(tx.as_mut())
1070            .await
1071            .context("failed to query oldest leaf height")?;
1072        let height: Option<i64> = row.try_get("height")?;
1073        match height {
1074            None => Ok(None),
1075            Some(h) => {
1076                let h = usize::try_from(h).context("leaf height out of range")?;
1077                Ok(Some(
1078                    tx.get_leaf(LeafId::<SeqTypes>::from(h))
1079                        .await
1080                        .context(format!("leaf {h} not available"))?,
1081                ))
1082            },
1083        }
1084    }
1085}
1086
1087impl super::data_source::DatabaseMetadataSource for SqlStorage {
1088    async fn get_table_sizes(&self) -> anyhow::Result<Vec<super::data_source::TableSize>> {
1089        let mut tx = self
1090            .read()
1091            .await
1092            .context("opening transaction to fetch table sizes")?;
1093
1094        #[cfg(not(feature = "embedded-db"))]
1095        {
1096            let query = r#"
1097                SELECT
1098                    schemaname || '.' || relname AS table_name,
1099                    n_live_tup AS row_count,
1100                    pg_total_relation_size(relid) AS total_size_bytes
1101                FROM pg_stat_user_tables
1102                ORDER BY pg_total_relation_size(relid) DESC
1103            "#;
1104
1105            let rows = sqlx::query(query)
1106                .fetch_all(tx.as_mut())
1107                .await
1108                .context("failed to query table sizes")?;
1109
1110            let mut table_sizes = Vec::new();
1111            for row in rows {
1112                let table_name: String = row.try_get("table_name")?;
1113                let row_count: i64 = row.try_get("row_count").unwrap_or(-1);
1114                let total_size_bytes: Option<i64> = row.try_get("total_size_bytes").ok();
1115
1116                table_sizes.push(super::data_source::TableSize {
1117                    table_name,
1118                    row_count,
1119                    total_size_bytes,
1120                });
1121            }
1122
1123            Ok(table_sizes)
1124        }
1125
1126        #[cfg(feature = "embedded-db")]
1127        {
1128            // First, get all table names from sqlite_master
1129            let table_names_query = r#"
1130                SELECT name
1131                FROM sqlite_master
1132                WHERE type = 'table'
1133                AND name NOT LIKE 'sqlite_%'
1134                ORDER BY name
1135            "#;
1136
1137            let table_rows = sqlx::query(table_names_query)
1138                .fetch_all(tx.as_mut())
1139                .await
1140                .context("failed to query table names")?;
1141
1142            let mut table_sizes = Vec::new();
1143
1144            // For each table, get the row count
1145            for row in table_rows {
1146                let table_name: String = row.try_get("name")?;
1147
1148                // Run SELECT COUNT(*) for this specific table
1149                // Use format! here since we need to dynamically insert the table name
1150                let count_query = format!("SELECT COUNT(*) as count FROM \"{}\"", table_name);
1151                let count_row = sqlx::query(&count_query)
1152                    .fetch_one(tx.as_mut())
1153                    .await
1154                    .context(format!(
1155                        "failed to query row count for table {}",
1156                        table_name
1157                    ))?;
1158
1159                let row_count: i64 = count_row.try_get("count").unwrap_or(0);
1160
1161                table_sizes.push(super::data_source::TableSize {
1162                    table_name,
1163                    row_count,
1164                    total_size_bytes: None,
1165                });
1166            }
1167
1168            Ok(table_sizes)
1169        }
1170    }
1171}
1172
1173impl super::data_source::DatabaseMetadataSource for DataSource {
1174    async fn get_table_sizes(&self) -> anyhow::Result<Vec<super::data_source::TableSize>> {
1175        self.as_ref().get_table_sizes().await
1176    }
1177}
1178
1179impl super::data_source::PruningDataSource for DataSource {
1180    async fn get_oldest_block(
1181        &self,
1182    ) -> anyhow::Result<Option<hotshot_query_service::availability::BlockQueryData<SeqTypes>>> {
1183        self.as_ref().get_oldest_block().await
1184    }
1185
1186    async fn get_oldest_leaf(
1187        &self,
1188    ) -> anyhow::Result<Option<hotshot_query_service::availability::LeafQueryData<SeqTypes>>> {
1189        self.as_ref().get_oldest_leaf().await
1190    }
1191}
1192
1193async fn load_frontier<Mode: TransactionMode>(
1194    tx: &mut Transaction<Mode>,
1195    height: u64,
1196) -> anyhow::Result<BlocksFrontier> {
1197    tx.get_path(
1198        Snapshot::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::Index(height),
1199        height
1200            .checked_sub(1)
1201            .ok_or(anyhow::anyhow!("Subtract with overflow ({height})!"))?,
1202    )
1203    .await
1204    .context(format!("fetching frontier at height {height}"))
1205}
1206
1207async fn load_v1_reward_accounts(
1208    db: &SqlStorage,
1209    height: u64,
1210    accounts: &[RewardAccountV1],
1211) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
1212    // Open a new read transaction to get the leaf
1213    let mut tx = db
1214        .read()
1215        .await
1216        .with_context(|| "failed to open read transaction")?;
1217
1218    // Get the leaf from the database
1219    let leaf = tx
1220        .get_leaf(LeafId::<SeqTypes>::from(height as usize))
1221        .await
1222        .context(format!("leaf {height} not available"))?;
1223    let header = leaf.header();
1224
1225    if header.version() < EPOCH_VERSION || header.version() >= DRB_AND_HEADER_UPGRADE_VERSION {
1226        return Ok((
1227            RewardMerkleTreeV1::new(REWARD_MERKLE_TREE_V1_HEIGHT),
1228            leaf.leaf().clone(),
1229        ));
1230    }
1231
1232    // Get the merkle root from the header and create a snapshot from it
1233    let merkle_root = header.reward_merkle_tree_root().unwrap_left();
1234    let mut snapshot = RewardMerkleTreeV1::from_commitment(merkle_root);
1235
1236    // Create a bounded join set with 10 concurrent tasks
1237    let mut join_set = BoundedJoinSet::new(10);
1238
1239    // Create a map from task ID to account
1240    let mut task_id_to_account = HashMap::new();
1241
1242    // Loop through each account, spawning a task to get the path for the account
1243    for account in accounts {
1244        // Clone things we will need in the closure
1245        let db_clone = db.clone();
1246        let account_clone = *account;
1247        let header_height = header.height();
1248
1249        // Create the closure that will get the path for the account
1250        let func = async move {
1251            // Open a new transaction
1252            let mut tx = db_clone
1253                .read()
1254                .await
1255                .with_context(|| "failed to open read transaction")?;
1256
1257            // Get the path for the account
1258            let proof = tx
1259                .get_path(
1260                    Snapshot::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::Index(
1261                        header_height,
1262                    ),
1263                    account_clone,
1264                )
1265                .await
1266                .with_context(|| {
1267                    format!(
1268                        "failed to get path for v1 reward account {account_clone:?}; height \
1269                         {height}"
1270                    )
1271                })?;
1272
1273            Ok::<_, anyhow::Error>(proof)
1274        };
1275
1276        // Spawn the task
1277        let id = join_set.spawn(func).id();
1278
1279        // Add the task ID to the account map
1280        task_id_to_account.insert(id, account);
1281    }
1282
1283    // Wait for each task to complete
1284    while let Some(result) = join_set.join_next_with_id().await {
1285        // Get the inner result (past the join error)
1286        let (id, result) = result.with_context(|| "failed to join task")?;
1287
1288        // Get the proof from the result
1289        let proof = result?;
1290
1291        // Get the account from the task ID to account map
1292        let account = task_id_to_account
1293            .remove(&id)
1294            .with_context(|| "task ID for spawned task not found")?;
1295
1296        match proof.proof.first().with_context(|| {
1297            format!("empty proof for v1 reward account {account:?}; height {height}")
1298        })? {
1299            MerkleNode::Leaf { pos, elem, .. } => {
1300                snapshot.remember(*pos, *elem, proof)?;
1301            },
1302            MerkleNode::Empty => {
1303                snapshot.non_membership_remember(*account, proof)?;
1304            },
1305            _ => {
1306                bail!("invalid proof for v1 reward account {account:?}; height {height}");
1307            },
1308        }
1309    }
1310
1311    Ok((snapshot, leaf.leaf().clone()))
1312}
1313
1314/// Loads reward accounts for new reward merkle tree (V4).
1315async fn load_reward_merkle_tree_v2(
1316    db: &SqlStorage,
1317    height: u64,
1318) -> anyhow::Result<(PermittedRewardMerkleTreeV2, Leaf2)> {
1319    // Open a new read transaction to get the leaf
1320    let mut tx = db
1321        .read()
1322        .await
1323        .with_context(|| "failed to open read transaction")?;
1324
1325    // Get the leaf from the database
1326    let leaf = tx
1327        .get_leaf(LeafId::<SeqTypes>::from(height as usize))
1328        .await
1329        .with_context(|| format!("leaf {height} not available"))?;
1330
1331    let snapshot = db.load_reward_merkle_tree_v2(height).await?;
1332
1333    Ok((snapshot, leaf.leaf().clone()))
1334}
1335
1336async fn load_accounts<Mode: TransactionMode>(
1337    tx: &mut Transaction<Mode>,
1338    height: u64,
1339    accounts: &[FeeAccount],
1340) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
1341    let leaf = tx
1342        .get_leaf(LeafId::<SeqTypes>::from(height as usize))
1343        .await
1344        .context(format!("leaf {height} not available"))?;
1345    let header = leaf.header();
1346
1347    let mut snapshot = FeeMerkleTree::from_commitment(header.fee_merkle_tree_root());
1348    for account in accounts {
1349        let proof = tx
1350            .get_path(
1351                Snapshot::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::Index(
1352                    header.height(),
1353                ),
1354                *account,
1355            )
1356            .await
1357            .context(format!(
1358                "fetching account {account}; height {}",
1359                header.height()
1360            ))?;
1361        match proof.proof.first().context(format!(
1362            "empty proof for account {account}; height {}",
1363            header.height()
1364        ))? {
1365            MerkleNode::Leaf { pos, elem, .. } => {
1366                snapshot.remember(*pos, *elem, proof)?;
1367            },
1368            MerkleNode::Empty => {
1369                snapshot.non_membership_remember(*account, proof)?;
1370            },
1371            _ => {
1372                bail!("Invalid proof");
1373            },
1374        }
1375    }
1376
1377    Ok((snapshot, leaf.leaf().clone()))
1378}
1379
1380async fn load_chain_config<Mode: TransactionMode>(
1381    tx: &mut Transaction<Mode>,
1382    commitment: Commitment<ChainConfig>,
1383) -> anyhow::Result<ChainConfig> {
1384    let (data,) = query_as::<(Vec<u8>,)>("SELECT data from chain_config where commitment = $1")
1385        .bind(commitment.to_string())
1386        .fetch_one(tx.as_mut())
1387        .await
1388        .unwrap();
1389
1390    bincode::deserialize(&data[..]).context("failed to deserialize")
1391}
1392
1393/// Reconstructs the `ValidatedState` from a specific block height up to a given view.
1394///
1395/// This loads all required fee and reward accounts into the Merkle tree before applying the
1396/// State Transition Function (STF), preventing recursive catchup during STF replay.
1397///
1398/// Note: Even if the primary goal is to catch up the block Merkle tree,
1399/// fee and reward header dependencies must still be present beforehand
1400/// This is because reconstructing the `ValidatedState` involves replaying the STF over a
1401/// range of leaves, and the STF requires all associated data to be present in the `ValidatedState`;
1402/// otherwise, it will attempt to trigger catchup itself.
1403#[tracing::instrument(skip(instance, db, tx))]
1404pub(crate) async fn reconstruct_state<Mode: TransactionMode>(
1405    instance: &NodeState,
1406    db: &SqlStorage,
1407    tx: &mut Transaction<Mode>,
1408    from_height: u64,
1409    to_view: ViewNumber,
1410    fee_accounts: &[FeeAccount],
1411    reward_accounts: &[RewardAccountV2],
1412) -> anyhow::Result<(ValidatedState, Leaf2)> {
1413    tracing::info!("attempting to reconstruct fee state");
1414    let from_leaf = tx
1415        .get_leaf((from_height as usize).into())
1416        .await
1417        .context(format!("leaf {from_height} not available"))?;
1418    let from_leaf: Leaf2 = from_leaf.leaf().clone();
1419    ensure!(
1420        from_leaf.view_number() < to_view,
1421        "state reconstruction: starting state {:?} must be before ending state {to_view:?}",
1422        from_leaf.view_number(),
1423    );
1424
1425    // Get the sequence of headers we will be applying to compute the latest state.
1426    let mut leaves = VecDeque::new();
1427    let to_leaf = get_leaf_from_proposal(tx, "view = $1", &(to_view.u64() as i64))
1428        .await
1429        .context(format!(
1430            "unable to reconstruct state because leaf {to_view:?} is not available"
1431        ))?;
1432    let mut parent = to_leaf.parent_commitment();
1433    tracing::debug!(?to_leaf, ?parent, view = ?to_view, "have required leaf");
1434    leaves.push_front(to_leaf.clone());
1435    while parent != Committable::commit(&from_leaf) {
1436        let leaf = get_leaf_from_proposal(tx, "leaf_hash = $1", &parent.to_string())
1437            .await
1438            .context(format!(
1439                "unable to reconstruct state because leaf {parent} is not available"
1440            ))?;
1441        parent = leaf.parent_commitment();
1442        tracing::debug!(?leaf, ?parent, "have required leaf");
1443        leaves.push_front(leaf);
1444    }
1445
1446    // Get the initial state.
1447    let mut parent = from_leaf;
1448    let mut state = ValidatedState::from_header(parent.block_header());
1449
1450    // Pre-load the state with the accounts we care about to ensure they will be present in the
1451    // final state.
1452    let mut catchup = NullStateCatchup::default();
1453
1454    let mut fee_accounts = fee_accounts.iter().copied().collect::<HashSet<_>>();
1455    // Add in all the accounts we will need to replay any of the headers, to ensure that we don't
1456    // need to do catchup recursively.
1457    tracing::info!(
1458        "reconstructing fee accounts state for from height {from_height} to view {to_view}"
1459    );
1460
1461    let dependencies =
1462        fee_header_dependencies(&mut catchup, tx, instance, &parent, &leaves).await?;
1463    fee_accounts.extend(dependencies);
1464    let fee_accounts = fee_accounts.into_iter().collect::<Vec<_>>();
1465    state.fee_merkle_tree = load_accounts(tx, from_height, &fee_accounts)
1466        .await
1467        .context("unable to reconstruct state because accounts are not available at origin")?
1468        .0;
1469    ensure!(
1470        state.fee_merkle_tree.commitment() == parent.block_header().fee_merkle_tree_root(),
1471        "loaded fee state does not match parent header"
1472    );
1473
1474    tracing::info!(
1475        "reconstructing reward accounts for from height {from_height} to view {to_view}"
1476    );
1477
1478    let mut reward_accounts = reward_accounts.iter().copied().collect::<HashSet<_>>();
1479
1480    // Collect all reward account dependencies needed for replaying the STF.
1481    // These accounts must be preloaded into the reward Merkle tree to prevent recursive catchups.
1482    let dependencies = reward_header_dependencies(instance, &leaves).await?;
1483    reward_accounts.extend(dependencies);
1484    let reward_accounts = reward_accounts.into_iter().collect::<Vec<_>>();
1485
1486    // Load all required reward accounts and update the reward Merkle tree.
1487    match parent.block_header().reward_merkle_tree_root() {
1488        either::Either::Left(expected_root) => {
1489            let accts = reward_accounts
1490                .into_iter()
1491                .map(RewardAccountV1::from)
1492                .collect::<Vec<_>>();
1493            state.reward_merkle_tree_v1 = load_v1_reward_accounts(db, from_height, &accts)
1494                .await
1495                .context(
1496                    "unable to reconstruct state because v1 reward accounts are not available at \
1497                     origin",
1498                )?
1499                .0;
1500            ensure!(
1501                state.reward_merkle_tree_v1.commitment() == expected_root,
1502                "loaded v1 reward state does not match parent header"
1503            );
1504        },
1505        either::Either::Right(expected_root) => {
1506            let version = parent.block_header().version();
1507            let epoch_height = instance
1508                .epoch_height
1509                .context("epoch_height not set but parent has V2 reward tree")?;
1510
1511            // V5+ only stores the reward merkle tree at epoch's last block, so if we're
1512            // reconstructing at a non boundary height there is no row at from_height.
1513            // Load the latest tree instead
1514            // the commitment check below will catch it if
1515            // we ended up with a tree from an older epoch.
1516            // But this should never happen as we don't garbage collect the latest tree
1517            if version >= EPOCH_REWARD_VERSION && !is_last_block(from_height, epoch_height) {
1518                let tree = db
1519                    .load_latest_reward_merkle_tree_v2(from_height)
1520                    .await
1521                    .context("RewardMerkleTreeV2 not available at or below origin")?;
1522                state.reward_merkle_tree_v2 = tree.tree;
1523            } else {
1524                state.reward_merkle_tree_v2 = load_reward_merkle_tree_v2(db, from_height)
1525                    .await
1526                    .context("RewardMerkleTreeV2 not available at origin")?
1527                    .0
1528                    .tree;
1529            }
1530            ensure!(
1531                state.reward_merkle_tree_v2.commitment() == expected_root,
1532                "loaded reward state does not match parent header"
1533            );
1534        },
1535    }
1536
1537    // We need the blocks frontier as well, to apply the STF.
1538    let frontier = load_frontier(tx, from_height)
1539        .await
1540        .context("unable to reconstruct state because frontier is not available at origin")?;
1541    match frontier
1542        .proof
1543        .first()
1544        .context("empty proof for frontier at origin")?
1545    {
1546        MerkleNode::Leaf { pos, elem, .. } => state
1547            .block_merkle_tree
1548            .remember(*pos, *elem, frontier)
1549            .context("failed to remember frontier")?,
1550        _ => bail!("invalid frontier proof"),
1551    }
1552
1553    // Apply subsequent headers to compute the later state.
1554    for proposal in leaves {
1555        state = compute_state_update(&state, instance, &catchup, &parent, &proposal)
1556            .await
1557            .context(format!(
1558                "unable to reconstruct state because state update {} failed",
1559                proposal.height(),
1560            ))?
1561            .0;
1562        parent = proposal;
1563    }
1564
1565    tracing::info!(from_height, ?to_view, "successfully reconstructed state");
1566    Ok((state, to_leaf))
1567}
1568
1569/// Get the dependencies needed to apply the STF to the given list of headers.
1570///
1571/// Returns
1572/// * A state catchup implementation seeded with all the chain configs required to apply the headers
1573///   in `leaves`
1574/// * The set of accounts that must be preloaded to apply these headers
1575async fn fee_header_dependencies<Mode: TransactionMode>(
1576    catchup: &mut NullStateCatchup,
1577    tx: &mut Transaction<Mode>,
1578    instance: &NodeState,
1579    mut parent: &Leaf2,
1580    leaves: impl IntoIterator<Item = &Leaf2>,
1581) -> anyhow::Result<HashSet<FeeAccount>> {
1582    let mut accounts = HashSet::default();
1583
1584    for proposal in leaves {
1585        let header = proposal.block_header();
1586        let height = header.height();
1587        let view = proposal.view_number();
1588        tracing::debug!(height, ?view, "fetching dependencies for proposal");
1589
1590        let header_cf = header.chain_config();
1591        let chain_config = if header_cf.commit() == instance.chain_config.commit() {
1592            instance.chain_config
1593        } else {
1594            match header_cf.resolve() {
1595                Some(cf) => cf,
1596                None => {
1597                    tracing::info!(
1598                        height,
1599                        ?view,
1600                        commit = %header_cf.commit(),
1601                        "chain config not available, attempting to load from storage",
1602                    );
1603                    let cf = load_chain_config(tx, header_cf.commit())
1604                        .await
1605                        .context(format!(
1606                            "loading chain config {} for header {},{:?}",
1607                            header_cf.commit(),
1608                            header.height(),
1609                            proposal.view_number()
1610                        ))?;
1611
1612                    // If we had to fetch a chain config now, store it in the catchup implementation
1613                    // so the STF will be able to look it up later.
1614                    catchup.add_chain_config(cf);
1615                    cf
1616                },
1617            }
1618        };
1619
1620        accounts.insert(chain_config.fee_recipient);
1621        accounts.extend(
1622            get_l1_deposits(instance, header, parent, chain_config.fee_contract)
1623                .await
1624                .into_iter()
1625                .map(|fee| fee.account()),
1626        );
1627        accounts.extend(header.fee_info().accounts());
1628        parent = proposal;
1629    }
1630    Ok(accounts)
1631}
1632
1633/// Identifies all reward accounts required to replay the State Transition Function
1634/// for the given leaf proposals. These accounts should be present in the Merkle tree
1635/// *before* applying the STF to avoid recursive catchup (i.e., STF triggering another catchup).
1636async fn reward_header_dependencies(
1637    instance: &NodeState,
1638    leaves: impl IntoIterator<Item = &Leaf2>,
1639) -> anyhow::Result<HashSet<RewardAccountV2>> {
1640    let mut reward_accounts = HashSet::default();
1641    let epoch_height = instance.epoch_height;
1642
1643    let Some(epoch_height) = epoch_height else {
1644        tracing::info!("epoch height is not set. returning empty reward_header_dependencies");
1645        return Ok(HashSet::new());
1646    };
1647
1648    let coordinator = instance.coordinator.clone();
1649    let first_epoch = coordinator.membership().first_epoch();
1650    // add all the chain configs needed to apply STF to headers to the catchup
1651    for proposal in leaves {
1652        let header = proposal.block_header();
1653
1654        let height = header.height();
1655        let view = proposal.view_number();
1656        tracing::debug!(height, ?view, "fetching dependencies for proposal");
1657
1658        let version = header.version();
1659        // Skip if version is less than epoch version
1660        if version < EPOCH_VERSION || version >= EPOCH_REWARD_VERSION {
1661            continue;
1662        }
1663
1664        let first_epoch = first_epoch.context("first epoch not found")?;
1665
1666        let proposal_epoch = EpochNumber::new(epoch_from_block_number(height, epoch_height));
1667
1668        // reward distribution starts third epoch onwards
1669        if proposal_epoch <= first_epoch + 1 {
1670            continue;
1671        }
1672
1673        let epoch_membership = match coordinator.membership_for_epoch(Some(proposal_epoch)) {
1674            Ok(e) => e,
1675            Err(err) => {
1676                tracing::info!(
1677                    "failed to get membership for epoch={proposal_epoch:?}. err={err:#}"
1678                );
1679
1680                coordinator
1681                    .wait_for_catchup(proposal_epoch)
1682                    .await
1683                    .context(format!("failed to catchup for epoch={proposal_epoch}"))?
1684            },
1685        };
1686
1687        let snapshot = epoch_membership
1688            .snapshot()
1689            .with_context(|| format!("no committee for epoch={proposal_epoch}"))?;
1690        let leader = snapshot.lookup_leader(proposal.view_number())?;
1691        let validator = snapshot.validator_config(&leader)?;
1692
1693        reward_accounts.insert(RewardAccountV2(validator.account));
1694
1695        let delegators: Vec<RewardAccountV2> = validator
1696            .delegators
1697            .keys()
1698            .map(|d| RewardAccountV2(*d))
1699            .collect();
1700
1701        reward_accounts.extend(delegators);
1702    }
1703    Ok(reward_accounts)
1704}
1705
1706async fn get_leaf_from_proposal<Mode, P>(
1707    tx: &mut Transaction<Mode>,
1708    where_clause: &str,
1709    param: P,
1710) -> anyhow::Result<Leaf2>
1711where
1712    P: Type<Db> + for<'q> Encode<'q, Db>,
1713{
1714    let (data,) = query_as::<(Vec<u8>,)>(&format!(
1715        "SELECT data FROM quorum_proposals2 WHERE {where_clause} LIMIT 1",
1716    ))
1717    .bind(param)
1718    .fetch_one(tx.as_mut())
1719    .await?;
1720    let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1721        bincode::deserialize(&data)?;
1722    Ok(Leaf2::from_quorum_proposal(&proposal.data))
1723}
1724
1725#[cfg(any(test, feature = "testing"))]
1726pub(crate) mod impl_testable_data_source {
1727
1728    use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
1729    use light_client::state::LightClientOptions;
1730
1731    use super::*;
1732    use crate::api::{self, data_source::testing::TestableSequencerDataSource, options::Query};
1733
1734    pub fn tmp_options(db: &TmpDb) -> Options {
1735        #[cfg(not(feature = "embedded-db"))]
1736        {
1737            let opt = crate::persistence::sql::PostgresOptions {
1738                port: Some(db.port()),
1739                host: Some(db.host()),
1740                user: Some("postgres".into()),
1741                password: Some("password".into()),
1742                ..Default::default()
1743            };
1744
1745            opt.into()
1746        }
1747
1748        #[cfg(feature = "embedded-db")]
1749        {
1750            let opt = crate::persistence::sql::SqliteOptions { path: db.path() };
1751            opt.into()
1752        }
1753    }
1754
1755    #[async_trait]
1756    impl TestableSequencerDataSource for DataSource {
1757        type Storage = TmpDb;
1758
1759        async fn create_storage() -> Self::Storage {
1760            TmpDb::init().await
1761        }
1762
1763        fn persistence_options(storage: &Self::Storage) -> Self::Options {
1764            tmp_options(storage)
1765        }
1766
1767        fn leaf_only_ds_options(
1768            storage: &Self::Storage,
1769            opt: api::Options,
1770        ) -> anyhow::Result<api::Options> {
1771            let mut ds_opts = tmp_options(storage);
1772            ds_opts.lightweight = true;
1773            Ok(opt.query_sql(Default::default(), ds_opts))
1774        }
1775
1776        fn options(storage: &Self::Storage, opt: api::Options) -> api::Options {
1777            opt.query_sql(
1778                Query {
1779                    light_client: LightClientOptions {
1780                        // Enable testnet features when running in tests.
1781                        decaf: true,
1782                        ..Default::default()
1783                    },
1784                    ..Default::default()
1785                },
1786                tmp_options(storage),
1787            )
1788        }
1789    }
1790}
1791
1792#[cfg(test)]
1793mod tests {
1794    use alloy::primitives::Address;
1795    use espresso_types::{
1796        v0_3::RewardAmount,
1797        v0_4::{REWARD_MERKLE_TREE_V2_HEIGHT, RewardAccountV2, RewardMerkleTreeV2},
1798    };
1799    use hotshot_query_service::{
1800        data_source::{
1801            Transaction, VersionedDataSource,
1802            sql::Config,
1803            storage::{
1804                MerklizedStateStorage, UpdateAvailabilityStorage,
1805                sql::{
1806                    SqlStorage, StorageConnectionType, Transaction as SqlTransaction, Write,
1807                    testing::TmpDb,
1808                },
1809            },
1810        },
1811        merklized_state::{MerklizedState, Snapshot, UpdateStateData},
1812    };
1813    use jf_merkle_tree_compat::{
1814        LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme,
1815    };
1816    use light_client::testing::{leaf_chain, leaf_chain_with_upgrade};
1817    use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION, Upgrade};
1818
1819    use super::impl_testable_data_source::tmp_options;
1820    use crate::{SeqTypes, api::RewardMerkleTreeDataSource};
1821
1822    fn make_reward_account(i: usize) -> RewardAccountV2 {
1823        let mut addr_bytes = [0u8; 20];
1824        addr_bytes[16..20].copy_from_slice(&(i as u32).to_be_bytes());
1825        RewardAccountV2(Address::from(addr_bytes))
1826    }
1827
1828    async fn insert_test_header(
1829        tx: &mut SqlTransaction<Write>,
1830        block_height: u64,
1831        reward_tree: &RewardMerkleTreeV2,
1832    ) {
1833        let reward_commitment = serde_json::to_value(reward_tree.commitment()).unwrap();
1834        let test_data = serde_json::json!({
1835            "block_merkle_tree_root": format!("block_root_{}", block_height),
1836            "fee_merkle_tree_root": format!("fee_root_{}", block_height),
1837            "fields": {
1838                RewardMerkleTreeV2::header_state_commitment_field(): reward_commitment
1839            }
1840        });
1841        tx.upsert(
1842            "header",
1843            [
1844                "height",
1845                "hash",
1846                "payload_hash",
1847                "timestamp",
1848                "data",
1849                "ns_table",
1850            ],
1851            ["height"],
1852            [(
1853                block_height as i64,
1854                format!("hash_{}", block_height),
1855                format!("payload_{}", block_height),
1856                block_height as i64,
1857                test_data,
1858                "ns_table".to_string(),
1859            )],
1860        )
1861        .await
1862        .unwrap();
1863    }
1864
1865    async fn batch_insert_proofs(
1866        tx: &mut SqlTransaction<Write>,
1867        reward_tree: &RewardMerkleTreeV2,
1868        accounts: &[RewardAccountV2],
1869        block_height: u64,
1870    ) {
1871        let proofs_and_paths: Vec<_> = accounts
1872            .iter()
1873            .map(|account| {
1874                let proof = match reward_tree.universal_lookup(*account) {
1875                    LookupResult::Ok(_, proof) => proof,
1876                    LookupResult::NotInMemory => panic!("account not in memory"),
1877                    LookupResult::NotFound(proof) => proof,
1878                };
1879                let traversal_path = <RewardAccountV2 as ToTraversalPath<
1880                    { RewardMerkleTreeV2::ARITY },
1881                >>::to_traversal_path(
1882                    account, reward_tree.height()
1883                );
1884                (proof, traversal_path)
1885            })
1886            .collect();
1887
1888        UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::insert_merkle_nodes_batch(
1889            tx,
1890            proofs_and_paths,
1891            block_height,
1892        )
1893        .await
1894        .expect("failed to batch insert proofs");
1895    }
1896
1897    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1898    async fn test_reward_accounts_batch_insertion() {
1899        // Batch insertion of 1000 accounts at height 1
1900        // Balance updates for some accounts at height 2
1901        // New accounts added at height 2
1902        // More balance updates at height 3
1903        // Querying correct balances at each height snapshot
1904
1905        let db = TmpDb::init().await;
1906        let opt = tmp_options(&db);
1907        let cfg = Config::try_from(&opt).expect("failed to create config from options");
1908        let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
1909            .await
1910            .expect("failed to connect to storage");
1911
1912        let num_initial_accounts = 1000usize;
1913
1914        let initial_accounts: Vec<RewardAccountV2> =
1915            (0..num_initial_accounts).map(make_reward_account).collect();
1916
1917        tracing::info!(
1918            "Height 1: Inserting {} initial accounts",
1919            num_initial_accounts
1920        );
1921
1922        let mut reward_tree_h1 = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT);
1923
1924        for (i, account) in initial_accounts.iter().enumerate() {
1925            let reward_amount = RewardAmount::from(((i + 1) * 1000) as u64);
1926            reward_tree_h1.update(*account, reward_amount).unwrap();
1927        }
1928
1929        let mut tx = storage.write().await.unwrap();
1930        insert_test_header(&mut tx, 1, &reward_tree_h1).await;
1931        batch_insert_proofs(&mut tx, &reward_tree_h1, &initial_accounts, 1).await;
1932
1933        UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 1)
1934            .await
1935            .unwrap();
1936        tx.commit().await.unwrap();
1937
1938        tracing::info!("Height 2: Updating balances and adding new accounts");
1939
1940        let mut reward_tree_h2 = reward_tree_h1.clone();
1941
1942        // Update balances for accounts 0-99
1943        let updated_accounts_h2: Vec<RewardAccountV2> = (0..100).map(make_reward_account).collect();
1944        for (i, account) in updated_accounts_h2.iter().enumerate() {
1945            let new_reward = RewardAmount::from(((i + 1) * 2000) as u64);
1946            reward_tree_h2.update(*account, new_reward).unwrap();
1947        }
1948
1949        // Add 100 new accounts (1000..1099)
1950        let new_accounts_h2: Vec<RewardAccountV2> = (1000..1100).map(make_reward_account).collect();
1951        for (i, account) in new_accounts_h2.iter().enumerate() {
1952            let reward_amount = RewardAmount::from(((i + 1001) * 500) as u64);
1953            reward_tree_h2.update(*account, reward_amount).unwrap();
1954        }
1955
1956        let mut changed_accounts_h2 = updated_accounts_h2.clone();
1957        changed_accounts_h2.extend(new_accounts_h2.clone());
1958
1959        let mut tx = storage.write().await.unwrap();
1960        insert_test_header(&mut tx, 2, &reward_tree_h2).await;
1961        batch_insert_proofs(&mut tx, &reward_tree_h2, &changed_accounts_h2, 2).await;
1962
1963        UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 2)
1964            .await
1965            .unwrap();
1966        tx.commit().await.unwrap();
1967
1968        tracing::info!("Height 3: More balance updates");
1969
1970        let mut reward_tree_h3 = reward_tree_h2.clone();
1971
1972        // Update balances for accounts 500-599
1973        let updated_accounts_h3: Vec<RewardAccountV2> =
1974            (500..600).map(make_reward_account).collect();
1975        for (i, account) in updated_accounts_h3.iter().enumerate() {
1976            let new_reward = RewardAmount::from(((500 + i + 1) * 3000) as u64);
1977            reward_tree_h3.update(*account, new_reward).unwrap();
1978        }
1979
1980        let mut tx = storage.write().await.unwrap();
1981        insert_test_header(&mut tx, 3, &reward_tree_h3).await;
1982        batch_insert_proofs(&mut tx, &reward_tree_h3, &updated_accounts_h3, 3).await;
1983
1984        UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 3)
1985            .await
1986            .unwrap();
1987        tx.commit().await.unwrap();
1988
1989        tracing::info!("Verifying all account proofs at each height");
1990
1991        // Verify height=1
1992        // All 1000 initial accounts
1993        let snapshot_h1 =
1994            Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(1);
1995        for i in 0..num_initial_accounts {
1996            let account = make_reward_account(i);
1997            let proof = storage
1998                .read()
1999                .await
2000                .unwrap()
2001                .get_path(snapshot_h1, account)
2002                .await
2003                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h1: {e}"));
2004
2005            let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
2006            let actual_reward = proof.elem().expect("account should exist");
2007            assert_eq!(*actual_reward, expected_reward,);
2008
2009            assert!(
2010                RewardMerkleTreeV2::verify(reward_tree_h1.commitment(), account, proof)
2011                    .unwrap()
2012                    .is_ok(),
2013            );
2014        }
2015        tracing::info!("Verified height=1 {num_initial_accounts} accounts with proofs",);
2016
2017        // Verify accounts 1000-1099 don't exist at height 1
2018        for i in 1000..1100 {
2019            let account = make_reward_account(i);
2020            let proof = storage
2021                .read()
2022                .await
2023                .unwrap()
2024                .get_path(snapshot_h1, account)
2025                .await
2026                .unwrap();
2027            assert!(proof.elem().is_none(),);
2028
2029            // Verify non-membership proof
2030            assert!(
2031                RewardMerkleTreeV2::non_membership_verify(
2032                    reward_tree_h1.commitment(),
2033                    account,
2034                    proof
2035                )
2036                .unwrap(),
2037            );
2038        }
2039        tracing::info!("Height 1: Verified 100 non-membership proofs");
2040
2041        // Verify height 2
2042        let snapshot_h2 =
2043            Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(2);
2044
2045        // Accounts 0-99
2046        for i in 0..100 {
2047            let account = make_reward_account(i);
2048            let proof = storage
2049                .read()
2050                .await
2051                .unwrap()
2052                .get_path(snapshot_h2, account)
2053                .await
2054                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
2055
2056            let expected_reward = RewardAmount::from(((i + 1) * 2000) as u64);
2057            let actual_reward = proof.elem().expect("account should exist");
2058            assert_eq!(*actual_reward, expected_reward,);
2059
2060            assert!(
2061                RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
2062                    .unwrap()
2063                    .is_ok(),
2064            );
2065        }
2066
2067        // Accounts 100-999: original rewards
2068        for i in 100..1000 {
2069            let account = make_reward_account(i);
2070            let proof = storage
2071                .read()
2072                .await
2073                .unwrap()
2074                .get_path(snapshot_h2, account)
2075                .await
2076                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
2077
2078            let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
2079            let actual_reward = proof.elem().expect("account should exist");
2080            assert_eq!(*actual_reward, expected_reward,);
2081
2082            assert!(
2083                RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
2084                    .unwrap()
2085                    .is_ok(),
2086            );
2087        }
2088
2089        // Accounts 1000-1099
2090        // new accounts
2091        for i in 1000..1100 {
2092            let account = make_reward_account(i);
2093            let proof = storage
2094                .read()
2095                .await
2096                .unwrap()
2097                .get_path(snapshot_h2, account)
2098                .await
2099                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
2100
2101            let expected_reward = RewardAmount::from(((i + 1) * 500) as u64);
2102            let actual_reward = proof.elem().expect("account should exist");
2103            assert_eq!(*actual_reward, expected_reward,);
2104
2105            assert!(
2106                RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
2107                    .unwrap()
2108                    .is_ok(),
2109            );
2110        }
2111        tracing::info!("Height 2: Verified all 1100 accounts with proofs");
2112
2113        // Verify HEIGHT 3: All accounts
2114        let snapshot_h3 =
2115            Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(3);
2116
2117        // Accounts 0-99
2118        for i in 0..100 {
2119            let account = make_reward_account(i);
2120            let proof = storage
2121                .read()
2122                .await
2123                .unwrap()
2124                .get_path(snapshot_h3, account)
2125                .await
2126                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
2127
2128            let expected_reward = RewardAmount::from(((i + 1) * 2000) as u64);
2129            let actual_reward = proof.elem().expect("account should exist");
2130            assert_eq!(*actual_reward, expected_reward,);
2131
2132            assert!(
2133                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
2134                    .unwrap()
2135                    .is_ok(),
2136            );
2137        }
2138
2139        for i in 100..500 {
2140            let account = make_reward_account(i);
2141            let proof = storage
2142                .read()
2143                .await
2144                .unwrap()
2145                .get_path(snapshot_h3, account)
2146                .await
2147                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
2148
2149            let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
2150            let actual_reward = proof.elem().expect("account should exist");
2151            assert_eq!(*actual_reward, expected_reward,);
2152
2153            assert!(
2154                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
2155                    .unwrap()
2156                    .is_ok(),
2157            );
2158        }
2159
2160        // Accounts 500-599
2161        for i in 500..600 {
2162            let account = make_reward_account(i);
2163            let proof = storage
2164                .read()
2165                .await
2166                .unwrap()
2167                .get_path(snapshot_h3, account)
2168                .await
2169                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
2170
2171            let expected_reward = RewardAmount::from(((i + 1) * 3000) as u64);
2172            let actual_reward = proof.elem().expect("account should exist");
2173            assert_eq!(*actual_reward, expected_reward,);
2174
2175            assert!(
2176                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
2177                    .unwrap()
2178                    .is_ok(),
2179            );
2180        }
2181
2182        // Accounts 600-999
2183        for i in 600..1000 {
2184            let account = make_reward_account(i);
2185            let proof = storage
2186                .read()
2187                .await
2188                .unwrap()
2189                .get_path(snapshot_h3, account)
2190                .await
2191                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
2192
2193            let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
2194            let actual_reward = proof.elem().expect("account should exist");
2195            assert_eq!(*actual_reward, expected_reward,);
2196
2197            assert!(
2198                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
2199                    .unwrap()
2200                    .is_ok(),
2201            );
2202        }
2203
2204        // Accounts 1000-1099: new accounts (from h2)
2205        for i in 1000..1100 {
2206            let account = make_reward_account(i);
2207            let proof = storage
2208                .read()
2209                .await
2210                .unwrap()
2211                .get_path(snapshot_h3, account)
2212                .await
2213                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
2214
2215            let expected_reward = RewardAmount::from(((i + 1) * 500) as u64);
2216            let actual_reward = proof.elem().expect("account should exist");
2217            assert_eq!(*actual_reward, expected_reward,);
2218
2219            assert!(
2220                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
2221                    .unwrap()
2222                    .is_ok(),
2223            );
2224        }
2225        tracing::info!("Height 3: Verified all 1100 accounts with proofs");
2226
2227        // Verify non-membership proofs for accounts that never existed
2228        for i in 1100..1110 {
2229            let account = make_reward_account(i);
2230            let proof = storage
2231                .read()
2232                .await
2233                .unwrap()
2234                .get_path(snapshot_h3, account)
2235                .await
2236                .unwrap();
2237
2238            assert!(
2239                proof.elem().is_none(),
2240                "Account {i} should not exist at height 3"
2241            );
2242
2243            assert!(
2244                RewardMerkleTreeV2::non_membership_verify(
2245                    reward_tree_h3.commitment(),
2246                    account,
2247                    proof
2248                )
2249                .unwrap(),
2250            );
2251        }
2252        tracing::info!("Height 3: Verified 10 non-membership proofs");
2253    }
2254
2255    #[tokio::test]
2256    #[test_log::test]
2257    async fn test_merkle_proof_gc() {
2258        let db = TmpDb::init().await;
2259        let opt = tmp_options(&db);
2260        let cfg = Config::try_from(&opt).expect("failed to create config from options");
2261        let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2262            .await
2263            .expect("failed to connect to storage");
2264
2265        let account = vec![0; 32];
2266
2267        // Insert mock leaves at heights 0-2 so load_proof can check leaf version.
2268        let leaves = leaf_chain(0..=2, DRB_AND_HEADER_UPGRADE_VERSION).await;
2269        let mut tx = storage.write().await.unwrap();
2270        for leaf in &leaves {
2271            tx.insert_leaf(leaf).await.unwrap();
2272        }
2273        Transaction::commit(tx).await.unwrap();
2274
2275        // Insert some mock proofs at heights 0-2.
2276        for h in 0..=2 {
2277            storage
2278                .persist_proofs(h, [(account.clone(), h.to_le_bytes().to_vec())].into_iter())
2279                .await
2280                .unwrap();
2281        }
2282
2283        // Test garbage collection.
2284        storage.garbage_collect(2).await.unwrap();
2285
2286        // Make sure the proof at height 2 is still available.
2287        assert_eq!(
2288            storage.load_proof(2, account.clone(), 0).await.unwrap(),
2289            2u64.to_le_bytes()
2290        );
2291
2292        // Meanwhile, the proofs at heights 0-1 have been garbage collected.
2293        for h in 0..2 {
2294            let err = storage.load_proof(h, account.clone(), 0).await.unwrap_err();
2295            assert!(err.to_string().contains("Missing proof"), "{err:#}");
2296        }
2297
2298        // Garbage collect the remaining proof.
2299        storage.garbage_collect(3).await.unwrap();
2300        let err = storage.load_proof(2, account, 0).await.unwrap_err();
2301        assert!(err.to_string().contains("Missing proof"), "{err:#}");
2302    }
2303
2304    #[tokio::test]
2305    #[test_log::test]
2306    async fn test_load_proof_v5_epoch_boundary() {
2307        let db = TmpDb::init().await;
2308        let opt = tmp_options(&db);
2309        let cfg = Config::try_from(&opt).expect("failed to create config from options");
2310        let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2311            .await
2312            .expect("failed to connect to storage");
2313
2314        let epoch_height = 10u64;
2315        let account = vec![0; 32];
2316
2317        // Create V5 leaves at heights 0..=15.
2318        let leaves = leaf_chain(0..=15, EPOCH_REWARD_VERSION).await;
2319        let mut tx = storage.write().await.unwrap();
2320        for leaf in &leaves {
2321            tx.insert_leaf(leaf).await.unwrap();
2322        }
2323        Transaction::commit(tx).await.unwrap();
2324
2325        // Store proofs only at epoch boundaries (height 10).
2326        let boundary_proof = b"proof_at_10".to_vec();
2327        {
2328            let mut tx = storage.write().await.unwrap();
2329            tx.upsert(
2330                "reward_merkle_tree_v2_proofs",
2331                ["height", "account", "proof"],
2332                ["height", "account"],
2333                [(10i64, account.clone(), boundary_proof.clone())],
2334            )
2335            .await
2336            .unwrap();
2337            Transaction::commit(tx).await.unwrap();
2338        }
2339
2340        // Querying at the epoch boundary itself should return the proof directly.
2341        assert_eq!(
2342            storage
2343                .load_proof(10, account.clone(), epoch_height)
2344                .await
2345                .unwrap(),
2346            boundary_proof,
2347        );
2348
2349        // Querying at a non-boundary V5 height (e.g. 15) should resolve to the
2350        // previous epoch boundary (10) and return its proof.
2351        assert_eq!(
2352            storage
2353                .load_proof(15, account.clone(), epoch_height)
2354                .await
2355                .unwrap(),
2356            boundary_proof,
2357        );
2358    }
2359
2360    #[tokio::test]
2361    #[test_log::test]
2362    async fn test_load_proof_v4_to_v5_upgrade_boundary() {
2363        let db = TmpDb::init().await;
2364        let opt = tmp_options(&db);
2365        let cfg = Config::try_from(&opt).expect("failed to create config from options");
2366        let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2367            .await
2368            .expect("failed to connect to storage");
2369
2370        let epoch_height = 10u64;
2371        let account = vec![0; 32];
2372
2373        // V4 leaves at heights 0..=10, V5 from height 11 onward.
2374        // The upgrade happens at height 11.
2375        let upgrade = Upgrade::new(DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION);
2376        let leaves = leaf_chain_with_upgrade(0..=15, 11, upgrade).await;
2377        {
2378            let mut tx = storage.write().await.unwrap();
2379            for leaf in &leaves {
2380                tx.insert_leaf(leaf).await.unwrap();
2381            }
2382            Transaction::commit(tx).await.unwrap();
2383        }
2384
2385        // Querying at V5 height 15 resolves to prev epoch boundary
2386        // But leaf at height 10 is V4, so this should fail.
2387        storage
2388            .load_proof(15, account.clone(), epoch_height)
2389            .await
2390            .unwrap_err();
2391
2392        let v4_proof = b"v4_proof_at_5".to_vec();
2393        {
2394            let mut tx = storage.write().await.unwrap();
2395            tx.upsert(
2396                "reward_merkle_tree_v2_proofs",
2397                ["height", "account", "proof"],
2398                ["height", "account"],
2399                [(5i64, account.clone(), v4_proof.clone())],
2400            )
2401            .await
2402            .unwrap();
2403            Transaction::commit(tx).await.unwrap();
2404        }
2405        assert_eq!(
2406            storage.load_proof(5, account.clone(), 0).await.unwrap(),
2407            v4_proof,
2408        );
2409    }
2410
2411    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2412    async fn test_get_table_sizes() {
2413        use super::super::data_source::DatabaseMetadataSource;
2414
2415        let db = TmpDb::init().await;
2416        let opt = tmp_options(&db);
2417        let cfg = Config::try_from(&opt).expect("failed to create config from options");
2418        let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2419            .await
2420            .expect("failed to connect to storage");
2421
2422        // Insert some test data to ensure tables have rows
2423        let mut tx = storage.write().await.unwrap();
2424
2425        // Insert a test header
2426        let reward_tree = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT);
2427        insert_test_header(&mut tx, 1, &reward_tree).await;
2428
2429        tx.commit().await.unwrap();
2430
2431        // Call get_table_sizes and verify it doesn't error
2432        let table_sizes = storage
2433            .get_table_sizes()
2434            .await
2435            .expect("get_table_sizes should succeed");
2436
2437        // Verify we got some tables back
2438        assert!(
2439            !table_sizes.is_empty(),
2440            "should have at least one table in the database"
2441        );
2442    }
2443}