espresso_node/api/
sql.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2
3use anyhow::{Context, bail, ensure};
4use async_trait::async_trait;
5use committable::{Commitment, Committable};
6use espresso_types::{
7    BlockMerkleTree, ChainConfig, FeeAccount, FeeMerkleTree, Leaf2, NodeState, ValidatedState,
8    get_l1_deposits,
9    v0_1::IterableFeeInfo,
10    v0_3::{
11        REWARD_MERKLE_TREE_V1_HEIGHT, RewardAccountProofV1, RewardAccountQueryDataV1,
12        RewardAccountV1, RewardMerkleTreeV1,
13    },
14    v0_4::{PermittedRewardMerkleTreeV2, RewardAccountV2, RewardMerkleTreeV2},
15    v0_6::RewardAccountQueryDataV2,
16};
17use futures::future::Future;
18use hotshot::traits::ValidatedState as _;
19use hotshot_query_service::{
20    Resolvable,
21    availability::LeafId,
22    data_source::{
23        VersionedDataSource,
24        sql::{Config, SqlDataSource, Transaction},
25        storage::{
26            AvailabilityStorage, MerklizedStateStorage, NodeStorage, SqlStorage,
27            pruning::PrunerConfig,
28            sql::{Db, TransactionMode, Write, query_as},
29        },
30    },
31    merklized_state::Snapshot,
32};
33use hotshot_types::{
34    data::{EpochNumber, QuorumProposalWrapper, ViewNumber},
35    message::Proposal,
36    utils::{epoch_from_block_number, is_last_block},
37    vote::HasViewNumber,
38};
39use jf_merkle_tree_compat::{
40    ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, LookupResult,
41    MerkleTreeScheme, prelude::MerkleNode,
42};
43use sqlx::{Encode, Row, Type};
44use vbs::version::Version;
45use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION, EPOCH_VERSION};
46
47use super::{
48    BlocksFrontier,
49    data_source::{Provider, SequencerDataSource},
50};
51use crate::{
52    SeqTypes,
53    api::RewardMerkleTreeDataSource,
54    catchup::{CatchupStorage, NullStateCatchup},
55    persistence::{ChainConfigPersistence, sql::Options},
56    state::compute_state_update,
57    util::BoundedJoinSet,
58};
59
60pub type DataSource = SqlDataSource<SeqTypes, Provider>;
61
62#[async_trait]
63impl SequencerDataSource for DataSource {
64    type Options = Options;
65
66    async fn create(opt: Self::Options, provider: Provider, reset: bool) -> anyhow::Result<Self> {
67        let fetch_limit = opt.fetch_rate_limit;
68        let active_fetch_delay = opt.active_fetch_delay;
69        let chunk_fetch_delay = opt.chunk_fetch_delay;
70        let mut cfg = Config::try_from(&opt)?;
71
72        if reset {
73            cfg = cfg.reset_schema();
74        }
75
76        let mut builder = cfg.builder(provider).await?;
77
78        if let Some(limit) = fetch_limit {
79            builder = builder.with_rate_limit(limit);
80        }
81
82        if opt.lightweight {
83            tracing::warn!("enabling light weight mode..");
84            builder = builder.leaf_only();
85        }
86
87        if let Some(delay) = active_fetch_delay {
88            builder = builder.with_active_fetch_delay(delay);
89        }
90        if let Some(delay) = chunk_fetch_delay {
91            builder = builder.with_chunk_fetch_delay(delay);
92        }
93        if let Some(chunk_size) = opt.sync_status_chunk_size {
94            builder = builder.with_sync_status_chunk_size(chunk_size);
95        }
96        if let Some(ttl) = opt.sync_status_ttl {
97            builder = builder.with_sync_status_ttl(ttl);
98        }
99        if let Some(chunk_size) = opt.proactive_scan_chunk_size {
100            builder = builder.with_proactive_range_chunk_size(chunk_size);
101        }
102        if let Some(interval) = opt.proactive_scan_interval {
103            builder = builder.with_proactive_interval(interval);
104        }
105        if opt.disable_proactive_fetching {
106            builder = builder.disable_proactive_fetching();
107        }
108
109        builder.build().await
110    }
111}
112
113impl RewardMerkleTreeDataSource for SqlStorage {
114    async fn load_v1_reward_account_proof(
115        &self,
116        height: u64,
117        account: RewardAccountV1,
118    ) -> anyhow::Result<RewardAccountQueryDataV1> {
119        let mut tx = self.read().await.context(format!(
120            "opening transaction to fetch v1 reward account {account:?}; height {height}"
121        ))?;
122
123        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
124            .await
125            .context("getting block height")? as u64;
126        ensure!(
127            block_height > 0,
128            "cannot get accounts for height {height}: no blocks available"
129        );
130
131        // Check if we have the desired state snapshot. If so, we can load the desired accounts
132        // directly.
133        if height < block_height {
134            let (tree, _) = load_v1_reward_accounts(self, height, &[account])
135                .await
136                .with_context(|| {
137                    format!("failed to load v1 reward account {account:?} at height {height}")
138                })?;
139
140            let (proof, balance) = RewardAccountProofV1::prove(&tree, account.into())
141                .with_context(|| {
142                    format!("reward account {account:?} not available at height {height}")
143                })?;
144
145            Ok(RewardAccountQueryDataV1 { balance, proof })
146        } else {
147            bail!(
148                "requested height {height} is not yet available (latest block height: \
149                 {block_height})"
150            );
151        }
152    }
153
154    fn persist_tree(
155        &self,
156        height: u64,
157        merkle_tree: Vec<u8>,
158    ) -> impl Send + Future<Output = anyhow::Result<()>> {
159        async move {
160            let mut tx = self
161                .write()
162                .await
163                .context("opening transaction for reward merkle tree v2")?;
164
165            tx.upsert(
166                "reward_merkle_tree_v2_data",
167                ["height", "balances"],
168                ["height"],
169                [(height as i64, merkle_tree)],
170            )
171            .await?;
172
173            hotshot_query_service::data_source::Transaction::commit(tx)
174                .await
175                .context("Transaction to store reward merkle tree v2 failed.")
176        }
177    }
178
179    fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
180        async move {
181            let mut tx = self
182                .read()
183                .await
184                .context("opening transaction for state update")?;
185
186            let row = sqlx::query(
187                r#"
188                SELECT balances
189                FROM reward_merkle_tree_v2_data
190                WHERE height = $1
191                "#,
192            )
193            .bind(height as i64)
194            .fetch_optional(tx.as_mut())
195            .await?
196            .context(format!(
197                "No reward merkle tree for height {} in storage",
198                height
199            ))?;
200
201            row.try_get::<Vec<u8>, _>("balances")
202                .context("Missing field balances from row; this should never happen")
203        }
204    }
205
206    fn persist_proofs(
207        &self,
208        height: u64,
209        proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
210    ) -> impl Send + Future<Output = anyhow::Result<()>> {
211        async move {
212            let mut iter = proofs.map(|(account, proof)| (height as i64, account, proof));
213
214            loop {
215                let mut chunk = Vec::with_capacity(20);
216
217                for _ in 0..20 {
218                    let Some(row) = iter.next() else {
219                        continue;
220                    };
221                    chunk.push(row);
222                }
223
224                if chunk.is_empty() {
225                    break;
226                }
227
228                let mut tx = self
229                    .write()
230                    .await
231                    .context("opening transaction for state update")?;
232
233                tokio::spawn(async move {
234                    tx.upsert(
235                        "reward_merkle_tree_v2_proofs",
236                        ["height", "account", "proof"],
237                        ["height", "account"],
238                        chunk,
239                    )
240                    .await?;
241
242                    hotshot_query_service::data_source::Transaction::commit(tx)
243                        .await
244                        .context("Transaction to store reward merkle tree failed.")?;
245
246                    Ok::<_, anyhow::Error>(())
247                });
248            }
249            Ok(())
250        }
251    }
252
253    /// Load a reward proof for a given height and account.
254    ///
255    /// For V5+ (epoch rewards), if the requested height is not an epoch boundary,
256    /// resolves to the previous epoch's last block. Verifies the boundary block is
257    /// V5+ to handle V4→V5 upgrades.
258    /// For V4 (per-block rewards), proofs are stored at every block height.
259    fn load_proof(
260        &self,
261        height: u64,
262        account: Vec<u8>,
263        epoch_height: u64,
264    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
265        async move {
266            let mut tx = self
267                .read()
268                .await
269                .context("opening transaction for load_proof")?;
270
271            let leaf = tx
272                .get_leaf(LeafId::<SeqTypes>::from(height as usize))
273                .await
274                .context(format!("leaf {height} not available"))?;
275
276            // For V5+, resolve to the last epoch boundary where proofs were stored.
277            // For V4, proofs exist at every height.
278            let proof_height = if leaf.header().version() < EPOCH_REWARD_VERSION
279                || is_last_block(height, epoch_height)
280            {
281                height
282            } else {
283                let prev_epoch_last_block =
284                    epoch_from_block_number(height, epoch_height).saturating_sub(1) * epoch_height;
285                let boundary_leaf = tx
286                    .get_leaf(LeafId::<SeqTypes>::from(prev_epoch_last_block as usize))
287                    .await
288                    .context(format!(
289                        "leaf at epoch boundary {prev_epoch_last_block} not available"
290                    ))?;
291                ensure!(
292                    boundary_leaf.header().version() >= EPOCH_REWARD_VERSION,
293                    "no epoch reward proofs available at boundary {prev_epoch_last_block}"
294                );
295                prev_epoch_last_block
296            };
297
298            sqlx::query_scalar(
299                "SELECT proof FROM reward_merkle_tree_v2_proofs WHERE height = $1 AND account = $2",
300            )
301            .bind(proof_height as i64)
302            .bind(account)
303            .fetch_optional(tx.as_mut())
304            .await?
305            .context(format!(
306                "Missing proofs at height {proof_height} (resolved from {height})"
307            ))
308        }
309    }
310
311    fn load_latest_proof(
312        &self,
313        account: Vec<u8>,
314    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
315        async move {
316            let mut tx = self
317                .read()
318                .await
319                .context("opening transaction for state update")?;
320
321            let row = sqlx::query(
322                r#"
323                SELECT proof 
324                FROM reward_merkle_tree_v2_proofs
325                WHERE account = $1
326                ORDER BY height DESC
327                LIMIT 1
328                "#,
329            )
330            .bind(account)
331            .fetch_optional(tx.as_mut())
332            .await?
333            .context("Missing proofs")?;
334
335            row.try_get::<Vec<u8>, _>("proof")
336                .context("Missing field proof from row; this should never happen")
337        }
338    }
339
340    fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
341        async move {
342            let batch_size = self.get_pruning_config().unwrap_or_default().batch_size();
343
344            // Postgres supports improved performance by using `FOR UPDATE` in a sub-select to
345            // acquire exclusive locks on visited rows, which are later deleted. SQLite does not
346            // support this syntax.
347            #[cfg(not(feature = "embedded-db"))]
348            let for_update = "FOR UPDATE";
349            #[cfg(feature = "embedded-db")]
350            let for_update = "";
351
352            // Delete batches from the merkle data table until there is nothing left to delete.
353            loop {
354                let mut tx = self
355                    .write()
356                    .await
357                    .context("opening transaction for state update")?;
358
359                let res = sqlx::query(&format!(
360                    "
361                    WITH delete_batch AS (
362                        SELECT d.height FROM reward_merkle_tree_v2_data AS d
363                            WHERE d.height < $1
364                            ORDER BY d.height DESC
365                            LIMIT $2
366                            {for_update}
367                    )
368                    DELETE FROM reward_merkle_tree_v2_data AS del
369                    WHERE del.height IN (SELECT * FROM delete_batch)
370                "
371                ))
372                .bind(height as i64)
373                .bind(batch_size as i64)
374                .execute(tx.as_mut())
375                .await?;
376
377                hotshot_query_service::data_source::Transaction::commit(tx)
378                    .await
379                    .context(
380                        "Transaction to garbage collect reward merkle trees from storage failed.",
381                    )?;
382
383                if res.rows_affected() == 0 {
384                    break;
385                } else {
386                    tracing::debug!(
387                        "deleted {} rows from reward_merkle_tree_v2_data",
388                        res.rows_affected()
389                    );
390                }
391            }
392
393            // Delete batches from the proofs table until there is nothing left to delete.
394            loop {
395                let mut tx = self
396                    .write()
397                    .await
398                    .context("opening transaction for state update")?;
399
400                let res = sqlx::query(&format!(
401                    "
402                    WITH delete_batch AS (
403                        SELECT d.height, d.account FROM reward_merkle_tree_v2_proofs AS d
404                            WHERE d.height < $1
405                            ORDER BY d.height, d.account DESC
406                            LIMIT $2
407                            {for_update}
408                    )
409                    DELETE FROM reward_merkle_tree_v2_proofs AS del
410                    WHERE (del.height, del.account) IN (SELECT * FROM delete_batch)
411                ",
412                ))
413                .bind(height as i64)
414                .bind(batch_size as i64)
415                .execute(tx.as_mut())
416                .await?;
417
418                hotshot_query_service::data_source::Transaction::commit(tx)
419                    .await
420                    .context("Transaction to garbage collect proofs from storage failed.")?;
421
422                if res.rows_affected() == 0 {
423                    break;
424                } else {
425                    tracing::debug!(
426                        "deleted {} rows from reward_merkle_tree_v2_proofs",
427                        res.rows_affected()
428                    );
429                }
430            }
431
432            Ok(())
433        }
434    }
435
436    fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool> {
437        async move {
438            let Ok(mut tx) = self.write().await else {
439                return false;
440            };
441
442            sqlx::query_as(
443                r#"
444                SELECT EXISTS(
445                SELECT 1 FROM reward_merkle_tree_v2_proofs
446                WHERE height = $1
447                )
448                "#,
449            )
450            .bind(height as i64)
451            .fetch_one(tx.as_mut())
452            .await
453            .ok()
454            .unwrap_or((false,))
455            .0
456        }
457    }
458    /// Generate and persist reward proofs for the current L1-finalized height.
459    ///
460    /// For V5+ (epoch rewards), the reward tree is only stored at epoch boundaries.
461    /// We resolve to the nearest epoch boundary to load the tree, verify it's V5+
462    /// (for V4→V5 upgrades), and store the generated proofs at `finalized_hotshot_height`.
463    /// For V4 (per-block rewards), the tree exists at every block height.
464    fn persist_reward_proofs(
465        &self,
466        node_state: &NodeState,
467        height: u64,
468        version: Version,
469    ) -> impl Send + Future<Output = anyhow::Result<()>> {
470        async move {
471            // For V4 (per-block rewards), only persist proofs every 30th block.
472            // For V5+ (epoch rewards), this is only called at epoch boundaries.
473            //
474            // In tests, we persist proofs at every block height so
475            // reward claim tests can query proofs at arbitrary heights. This can be
476            // removed once all reward claim tests are updated
477            if !cfg!(any(test, feature = "testing"))
478                && version < EPOCH_REWARD_VERSION
479                && !(height + node_state.node_id).is_multiple_of(30)
480            {
481                return Ok(());
482            }
483
484            // In tests, we use the current block height directly instead
485            // of querying the L1 finalized HotShot height, because test environments don't
486            // deploy a light client contract
487            let finalized_hotshot_height = if cfg!(any(test, feature = "testing")) {
488                height
489            } else {
490                match node_state.finalized_hotshot_height().await {
491                    Ok(h) => h,
492                    Err(err) => {
493                        tracing::warn!("failed to get finalized hotshot height: {err:#}");
494                        return Ok(());
495                    },
496                }
497            };
498
499            if self.proof_exists(finalized_hotshot_height).await {
500                return Ok(());
501            }
502
503            // Resolve which height to load the reward tree from.
504            // V4: tree exists at every height. V5+: only at epoch boundaries.
505            let mut tree_height = finalized_hotshot_height;
506            if version >= EPOCH_REWARD_VERSION {
507                let epoch_height = node_state
508                    .epoch_height
509                    .context("epoch_height not set in node state")?;
510                if !is_last_block(finalized_hotshot_height, epoch_height) {
511                    tree_height = epoch_from_block_number(finalized_hotshot_height, epoch_height)
512                        .saturating_sub(1)
513                        * epoch_height;
514                    if tree_height == 0 {
515                        return Ok(());
516                    }
517                    // During V4→V5 upgrades the previous epoch boundary may be V4.
518                    let mut tx = self.read().await.context("opening read transaction")?;
519                    let leaf = tx
520                        .get_leaf(LeafId::<SeqTypes>::from(tree_height as usize))
521                        .await
522                        .context(format!(
523                            "leaf at epoch boundary {tree_height} not available"
524                        ))?;
525                    if leaf.header().version() < EPOCH_REWARD_VERSION {
526                        return Ok(());
527                    }
528                }
529            }
530
531            let permitted_tree = match self.load_reward_merkle_tree_v2(tree_height).await {
532                Ok(tree) => tree,
533                Err(err) => {
534                    tracing::warn!(tree_height, "failed to load reward merkle tree: {err:#}");
535                    return Ok(());
536                },
537            };
538
539            let tree = permitted_tree.tree;
540            let iter = tree
541                .iter()
542                .filter_map(|(account, balance): (&RewardAccountV2, _)| {
543                    let proof = espresso_types::v0_6::RewardAccountProofV2::prove(
544                        &tree,
545                        (*account).into(),
546                    )?;
547                    let proof = RewardAccountQueryDataV2 {
548                        balance: (*balance).into(),
549                        proof: proof.0,
550                    };
551                    Some((
552                        bincode::serialize(&account).ok()?,
553                        bincode::serialize(&proof).ok()?,
554                    ))
555                });
556
557            if let Err(err) = self.persist_proofs(finalized_hotshot_height, iter).await {
558                tracing::warn!(
559                    finalized_hotshot_height,
560                    "failed to persist proofs: {err:#}"
561                );
562            }
563
564            Ok(())
565        }
566    }
567}
568
569impl CatchupStorage for SqlStorage {
570    async fn get_reward_accounts_v1(
571        &self,
572        instance: &NodeState,
573        height: u64,
574        view: ViewNumber,
575        accounts: &[RewardAccountV1],
576    ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
577        let mut tx = self.read().await.context(format!(
578            "opening transaction to fetch v1 reward account {accounts:?}; height {height}"
579        ))?;
580
581        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
582            .await
583            .context("getting block height")? as u64;
584        ensure!(
585            block_height > 0,
586            "cannot get accounts for height {height}: no blocks available"
587        );
588
589        // Check if we have the desired state snapshot. If so, we can load the desired accounts
590        // directly.
591        if height < block_height {
592            load_v1_reward_accounts(self, height, accounts).await
593        } else {
594            let accounts: Vec<_> = accounts
595                .iter()
596                .map(|acct| RewardAccountV2::from(*acct))
597                .collect();
598            // If we do not have the exact snapshot we need, we can try going back to the last
599            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
600            let (state, leaf) = reconstruct_state(
601                instance,
602                self,
603                &mut tx,
604                block_height - 1,
605                view,
606                &[],
607                &accounts,
608            )
609            .await?;
610            Ok((state.reward_merkle_tree_v1, leaf))
611        }
612    }
613
614    async fn get_reward_accounts_v2(
615        &self,
616        instance: &NodeState,
617        height: u64,
618        view: ViewNumber,
619        accounts: &[RewardAccountV2],
620    ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
621        let mut tx = self.read().await.context(format!(
622            "opening transaction to fetch reward account {accounts:?}; height {height}"
623        ))?;
624
625        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
626            .await
627            .context("getting block height")? as u64;
628        ensure!(
629            block_height > 0,
630            "cannot get accounts for height {height}: no blocks available"
631        );
632
633        // Check if we have the desired state snapshot. If so, we can load the desired accounts
634        // directly.
635        if height < block_height {
636            load_reward_merkle_tree_v2(self, height)
637                .await
638                .map(|(permitted_tree, leaf)| (permitted_tree.tree, leaf))
639        } else {
640            // If we do not have the exact snapshot we need, we can try going back to the last
641            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
642            let (state, leaf) = reconstruct_state(
643                instance,
644                self,
645                &mut tx,
646                block_height - 1,
647                view,
648                &[],
649                accounts,
650            )
651            .await?;
652            Ok((state.reward_merkle_tree_v2, leaf))
653        }
654    }
655
656    async fn get_accounts(
657        &self,
658        instance: &NodeState,
659        height: u64,
660        view: ViewNumber,
661        accounts: &[FeeAccount],
662    ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
663        let mut tx = self.read().await.context(format!(
664            "opening transaction to fetch account {accounts:?}; height {height}"
665        ))?;
666
667        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
668            .await
669            .context("getting block height")? as u64;
670        ensure!(
671            block_height > 0,
672            "cannot get accounts for height {height}: no blocks available"
673        );
674
675        // Check if we have the desired state snapshot. If so, we can load the desired accounts
676        // directly.
677        if height < block_height {
678            load_accounts(&mut tx, height, accounts).await
679        } else {
680            // If we do not have the exact snapshot we need, we can try going back to the last
681            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
682            let (state, leaf) = reconstruct_state(
683                instance,
684                self,
685                &mut tx,
686                block_height - 1,
687                view,
688                accounts,
689                &[],
690            )
691            .await?;
692            Ok((state.fee_merkle_tree, leaf))
693        }
694    }
695
696    async fn get_frontier(
697        &self,
698        instance: &NodeState,
699        height: u64,
700        view: ViewNumber,
701    ) -> anyhow::Result<BlocksFrontier> {
702        let mut tx = self.read().await.context(format!(
703            "opening transaction to fetch frontier at height {height}"
704        ))?;
705
706        let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
707            .await
708            .context("getting block height")? as u64;
709        ensure!(
710            block_height > 0,
711            "cannot get frontier for height {height}: no blocks available"
712        );
713
714        // Check if we have the desired state snapshot. If so, we can load the desired frontier
715        // directly.
716        if height < block_height {
717            load_frontier(&mut tx, height).await
718        } else {
719            // If we do not have the exact snapshot we need, we can try going back to the last
720            // snapshot we _do_ have and replaying subsequent blocks to compute the desired state.
721            let (state, _) =
722                reconstruct_state(instance, self, &mut tx, block_height - 1, view, &[], &[])
723                    .await?;
724            match state.block_merkle_tree.lookup(height - 1) {
725                LookupResult::Ok(_, proof) => Ok(proof),
726                _ => {
727                    bail!(
728                        "state snapshot {view:?},{height} was found but does not contain frontier \
729                         at height {}; this should not be possible",
730                        height - 1
731                    );
732                },
733            }
734        }
735    }
736
737    async fn get_chain_config(
738        &self,
739        commitment: Commitment<ChainConfig>,
740    ) -> anyhow::Result<ChainConfig> {
741        let mut tx = self.read().await.context(format!(
742            "opening transaction to fetch chain config {commitment}"
743        ))?;
744        load_chain_config(&mut tx, commitment).await
745    }
746
747    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
748        let mut tx = self
749            .read()
750            .await
751            .context(format!("opening transaction to fetch leaf at {height}"))?;
752        let leaf = tx
753            .get_leaf((height as usize).into())
754            .await
755            .context(format!("leaf {height} not available"))?;
756        let mut last_leaf: Leaf2 = leaf.leaf().clone();
757        let mut chain = vec![last_leaf.clone()];
758        let mut h = height + 1;
759
760        loop {
761            let lqd = tx.get_leaf((h as usize).into()).await?;
762            let leaf = lqd.leaf();
763
764            if leaf.justify_qc().view_number() == last_leaf.view_number() {
765                chain.push(leaf.clone());
766            } else {
767                h += 1;
768                continue;
769            }
770
771            // just one away from deciding
772            if leaf.view_number() == last_leaf.view_number() + 1 {
773                last_leaf = leaf.clone();
774                h += 1;
775                break;
776            }
777            h += 1;
778            last_leaf = leaf.clone();
779        }
780
781        loop {
782            let lqd = tx.get_leaf((h as usize).into()).await?;
783            let leaf = lqd.leaf();
784            if leaf.justify_qc().view_number() == last_leaf.view_number() {
785                chain.push(leaf.clone());
786                break;
787            }
788            h += 1;
789        }
790
791        Ok(chain)
792    }
793}
794
795impl RewardMerkleTreeDataSource for DataSource {
796    async fn load_v1_reward_account_proof(
797        &self,
798        height: u64,
799        account: RewardAccountV1,
800    ) -> anyhow::Result<RewardAccountQueryDataV1> {
801        self.as_ref()
802            .load_v1_reward_account_proof(height, account)
803            .await
804    }
805
806    fn persist_tree(
807        &self,
808        height: u64,
809        merkle_tree: Vec<u8>,
810    ) -> impl Send + Future<Output = anyhow::Result<()>> {
811        async move { self.as_ref().persist_tree(height, merkle_tree).await }
812    }
813
814    fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
815        async move { self.as_ref().load_tree(height).await }
816    }
817
818    fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
819        async move { self.as_ref().garbage_collect(height).await }
820    }
821
822    fn persist_proofs(
823        &self,
824        height: u64,
825        proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
826    ) -> impl Send + Future<Output = anyhow::Result<()>> {
827        async move { self.as_ref().persist_proofs(height, proofs).await }
828    }
829
830    fn load_proof(
831        &self,
832        height: u64,
833        account: Vec<u8>,
834        epoch_height: u64,
835    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
836        async move {
837            self.as_ref()
838                .load_proof(height, account, epoch_height)
839                .await
840        }
841    }
842
843    fn load_latest_proof(
844        &self,
845        account: Vec<u8>,
846    ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
847        async move { self.as_ref().load_latest_proof(account).await }
848    }
849
850    fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool> {
851        async move { self.as_ref().proof_exists(height).await }
852    }
853
854    fn persist_reward_proofs(
855        &self,
856        node_state: &NodeState,
857        height: u64,
858        version: Version,
859    ) -> impl Send + Future<Output = anyhow::Result<()>> {
860        async move {
861            self.as_ref()
862                .persist_reward_proofs(node_state, height, version)
863                .await
864        }
865    }
866}
867
868impl CatchupStorage for DataSource {
869    async fn get_accounts(
870        &self,
871        instance: &NodeState,
872        height: u64,
873        view: ViewNumber,
874        accounts: &[FeeAccount],
875    ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
876        self.as_ref()
877            .get_accounts(instance, height, view, accounts)
878            .await
879    }
880
881    async fn get_reward_accounts_v2(
882        &self,
883        instance: &NodeState,
884        height: u64,
885        view: ViewNumber,
886        accounts: &[RewardAccountV2],
887    ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
888        self.as_ref()
889            .get_reward_accounts_v2(instance, height, view, accounts)
890            .await
891    }
892
893    async fn get_reward_accounts_v1(
894        &self,
895        instance: &NodeState,
896        height: u64,
897        view: ViewNumber,
898        accounts: &[RewardAccountV1],
899    ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
900        self.as_ref()
901            .get_reward_accounts_v1(instance, height, view, accounts)
902            .await
903    }
904
905    async fn get_frontier(
906        &self,
907        instance: &NodeState,
908        height: u64,
909        view: ViewNumber,
910    ) -> anyhow::Result<BlocksFrontier> {
911        self.as_ref().get_frontier(instance, height, view).await
912    }
913
914    async fn get_chain_config(
915        &self,
916        commitment: Commitment<ChainConfig>,
917    ) -> anyhow::Result<ChainConfig> {
918        self.as_ref().get_chain_config(commitment).await
919    }
920    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
921        self.as_ref().get_leaf_chain(height).await
922    }
923}
924
925#[async_trait]
926impl ChainConfigPersistence for Transaction<Write> {
927    async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()> {
928        let commitment = chain_config.commitment();
929        let data = bincode::serialize(&chain_config)?;
930        self.upsert(
931            "chain_config",
932            ["commitment", "data"],
933            ["commitment"],
934            [(commitment.to_string(), data)],
935        )
936        .await
937    }
938}
939
940impl super::data_source::DatabaseMetadataSource for SqlStorage {
941    async fn get_table_sizes(&self) -> anyhow::Result<Vec<super::data_source::TableSize>> {
942        let mut tx = self
943            .read()
944            .await
945            .context("opening transaction to fetch table sizes")?;
946
947        #[cfg(not(feature = "embedded-db"))]
948        {
949            let query = r#"
950                SELECT
951                    schemaname || '.' || relname AS table_name,
952                    n_live_tup AS row_count,
953                    pg_total_relation_size(relid) AS total_size_bytes
954                FROM pg_stat_user_tables
955                ORDER BY pg_total_relation_size(relid) DESC
956            "#;
957
958            let rows = sqlx::query(query)
959                .fetch_all(tx.as_mut())
960                .await
961                .context("failed to query table sizes")?;
962
963            let mut table_sizes = Vec::new();
964            for row in rows {
965                let table_name: String = row.try_get("table_name")?;
966                let row_count: i64 = row.try_get("row_count").unwrap_or(-1);
967                let total_size_bytes: Option<i64> = row.try_get("total_size_bytes").ok();
968
969                table_sizes.push(super::data_source::TableSize {
970                    table_name,
971                    row_count,
972                    total_size_bytes,
973                });
974            }
975
976            Ok(table_sizes)
977        }
978
979        #[cfg(feature = "embedded-db")]
980        {
981            // First, get all table names from sqlite_master
982            let table_names_query = r#"
983                SELECT name
984                FROM sqlite_master
985                WHERE type = 'table'
986                AND name NOT LIKE 'sqlite_%'
987                ORDER BY name
988            "#;
989
990            let table_rows = sqlx::query(table_names_query)
991                .fetch_all(tx.as_mut())
992                .await
993                .context("failed to query table names")?;
994
995            let mut table_sizes = Vec::new();
996
997            // For each table, get the row count
998            for row in table_rows {
999                let table_name: String = row.try_get("name")?;
1000
1001                // Run SELECT COUNT(*) for this specific table
1002                // Use format! here since we need to dynamically insert the table name
1003                let count_query = format!("SELECT COUNT(*) as count FROM \"{}\"", table_name);
1004                let count_row = sqlx::query(&count_query)
1005                    .fetch_one(tx.as_mut())
1006                    .await
1007                    .context(format!(
1008                        "failed to query row count for table {}",
1009                        table_name
1010                    ))?;
1011
1012                let row_count: i64 = count_row.try_get("count").unwrap_or(0);
1013
1014                table_sizes.push(super::data_source::TableSize {
1015                    table_name,
1016                    row_count,
1017                    total_size_bytes: None,
1018                });
1019            }
1020
1021            Ok(table_sizes)
1022        }
1023    }
1024}
1025
1026impl super::data_source::DatabaseMetadataSource for DataSource {
1027    async fn get_table_sizes(&self) -> anyhow::Result<Vec<super::data_source::TableSize>> {
1028        self.as_ref().get_table_sizes().await
1029    }
1030}
1031
1032async fn load_frontier<Mode: TransactionMode>(
1033    tx: &mut Transaction<Mode>,
1034    height: u64,
1035) -> anyhow::Result<BlocksFrontier> {
1036    tx.get_path(
1037        Snapshot::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::Index(height),
1038        height
1039            .checked_sub(1)
1040            .ok_or(anyhow::anyhow!("Subtract with overflow ({height})!"))?,
1041    )
1042    .await
1043    .context(format!("fetching frontier at height {height}"))
1044}
1045
1046async fn load_v1_reward_accounts(
1047    db: &SqlStorage,
1048    height: u64,
1049    accounts: &[RewardAccountV1],
1050) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
1051    // Open a new read transaction to get the leaf
1052    let mut tx = db
1053        .read()
1054        .await
1055        .with_context(|| "failed to open read transaction")?;
1056
1057    // Get the leaf from the database
1058    let leaf = tx
1059        .get_leaf(LeafId::<SeqTypes>::from(height as usize))
1060        .await
1061        .context(format!("leaf {height} not available"))?;
1062    let header = leaf.header();
1063
1064    if header.version() < EPOCH_VERSION || header.version() >= DRB_AND_HEADER_UPGRADE_VERSION {
1065        return Ok((
1066            RewardMerkleTreeV1::new(REWARD_MERKLE_TREE_V1_HEIGHT),
1067            leaf.leaf().clone(),
1068        ));
1069    }
1070
1071    // Get the merkle root from the header and create a snapshot from it
1072    let merkle_root = header.reward_merkle_tree_root().unwrap_left();
1073    let mut snapshot = RewardMerkleTreeV1::from_commitment(merkle_root);
1074
1075    // Create a bounded join set with 10 concurrent tasks
1076    let mut join_set = BoundedJoinSet::new(10);
1077
1078    // Create a map from task ID to account
1079    let mut task_id_to_account = HashMap::new();
1080
1081    // Loop through each account, spawning a task to get the path for the account
1082    for account in accounts {
1083        // Clone things we will need in the closure
1084        let db_clone = db.clone();
1085        let account_clone = *account;
1086        let header_height = header.height();
1087
1088        // Create the closure that will get the path for the account
1089        let func = async move {
1090            // Open a new transaction
1091            let mut tx = db_clone
1092                .read()
1093                .await
1094                .with_context(|| "failed to open read transaction")?;
1095
1096            // Get the path for the account
1097            let proof = tx
1098                .get_path(
1099                    Snapshot::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::Index(
1100                        header_height,
1101                    ),
1102                    account_clone,
1103                )
1104                .await
1105                .with_context(|| {
1106                    format!(
1107                        "failed to get path for v1 reward account {account_clone:?}; height \
1108                         {height}"
1109                    )
1110                })?;
1111
1112            Ok::<_, anyhow::Error>(proof)
1113        };
1114
1115        // Spawn the task
1116        let id = join_set.spawn(func).id();
1117
1118        // Add the task ID to the account map
1119        task_id_to_account.insert(id, account);
1120    }
1121
1122    // Wait for each task to complete
1123    while let Some(result) = join_set.join_next_with_id().await {
1124        // Get the inner result (past the join error)
1125        let (id, result) = result.with_context(|| "failed to join task")?;
1126
1127        // Get the proof from the result
1128        let proof = result?;
1129
1130        // Get the account from the task ID to account map
1131        let account = task_id_to_account
1132            .remove(&id)
1133            .with_context(|| "task ID for spawned task not found")?;
1134
1135        match proof.proof.first().with_context(|| {
1136            format!("empty proof for v1 reward account {account:?}; height {height}")
1137        })? {
1138            MerkleNode::Leaf { pos, elem, .. } => {
1139                snapshot.remember(*pos, *elem, proof)?;
1140            },
1141            MerkleNode::Empty => {
1142                snapshot.non_membership_remember(*account, proof)?;
1143            },
1144            _ => {
1145                bail!("invalid proof for v1 reward account {account:?}; height {height}");
1146            },
1147        }
1148    }
1149
1150    Ok((snapshot, leaf.leaf().clone()))
1151}
1152
1153/// Loads reward accounts for new reward merkle tree (V4).
1154async fn load_reward_merkle_tree_v2(
1155    db: &SqlStorage,
1156    height: u64,
1157) -> anyhow::Result<(PermittedRewardMerkleTreeV2, Leaf2)> {
1158    // Open a new read transaction to get the leaf
1159    let mut tx = db
1160        .read()
1161        .await
1162        .with_context(|| "failed to open read transaction")?;
1163
1164    // Get the leaf from the database
1165    let leaf = tx
1166        .get_leaf(LeafId::<SeqTypes>::from(height as usize))
1167        .await
1168        .with_context(|| format!("leaf {height} not available"))?;
1169
1170    let snapshot = db.load_reward_merkle_tree_v2(height).await?;
1171
1172    Ok((snapshot, leaf.leaf().clone()))
1173}
1174
1175async fn load_accounts<Mode: TransactionMode>(
1176    tx: &mut Transaction<Mode>,
1177    height: u64,
1178    accounts: &[FeeAccount],
1179) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
1180    let leaf = tx
1181        .get_leaf(LeafId::<SeqTypes>::from(height as usize))
1182        .await
1183        .context(format!("leaf {height} not available"))?;
1184    let header = leaf.header();
1185
1186    let mut snapshot = FeeMerkleTree::from_commitment(header.fee_merkle_tree_root());
1187    for account in accounts {
1188        let proof = tx
1189            .get_path(
1190                Snapshot::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::Index(
1191                    header.height(),
1192                ),
1193                *account,
1194            )
1195            .await
1196            .context(format!(
1197                "fetching account {account}; height {}",
1198                header.height()
1199            ))?;
1200        match proof.proof.first().context(format!(
1201            "empty proof for account {account}; height {}",
1202            header.height()
1203        ))? {
1204            MerkleNode::Leaf { pos, elem, .. } => {
1205                snapshot.remember(*pos, *elem, proof)?;
1206            },
1207            MerkleNode::Empty => {
1208                snapshot.non_membership_remember(*account, proof)?;
1209            },
1210            _ => {
1211                bail!("Invalid proof");
1212            },
1213        }
1214    }
1215
1216    Ok((snapshot, leaf.leaf().clone()))
1217}
1218
1219async fn load_chain_config<Mode: TransactionMode>(
1220    tx: &mut Transaction<Mode>,
1221    commitment: Commitment<ChainConfig>,
1222) -> anyhow::Result<ChainConfig> {
1223    let (data,) = query_as::<(Vec<u8>,)>("SELECT data from chain_config where commitment = $1")
1224        .bind(commitment.to_string())
1225        .fetch_one(tx.as_mut())
1226        .await
1227        .unwrap();
1228
1229    bincode::deserialize(&data[..]).context("failed to deserialize")
1230}
1231
1232/// Reconstructs the `ValidatedState` from a specific block height up to a given view.
1233///
1234/// This loads all required fee and reward accounts into the Merkle tree before applying the
1235/// State Transition Function (STF), preventing recursive catchup during STF replay.
1236///
1237/// Note: Even if the primary goal is to catch up the block Merkle tree,
1238/// fee and reward header dependencies must still be present beforehand
1239/// This is because reconstructing the `ValidatedState` involves replaying the STF over a
1240/// range of leaves, and the STF requires all associated data to be present in the `ValidatedState`;
1241/// otherwise, it will attempt to trigger catchup itself.
1242#[tracing::instrument(skip(instance, db, tx))]
1243pub(crate) async fn reconstruct_state<Mode: TransactionMode>(
1244    instance: &NodeState,
1245    db: &SqlStorage,
1246    tx: &mut Transaction<Mode>,
1247    from_height: u64,
1248    to_view: ViewNumber,
1249    fee_accounts: &[FeeAccount],
1250    reward_accounts: &[RewardAccountV2],
1251) -> anyhow::Result<(ValidatedState, Leaf2)> {
1252    tracing::info!("attempting to reconstruct fee state");
1253    let from_leaf = tx
1254        .get_leaf((from_height as usize).into())
1255        .await
1256        .context(format!("leaf {from_height} not available"))?;
1257    let from_leaf: Leaf2 = from_leaf.leaf().clone();
1258    ensure!(
1259        from_leaf.view_number() < to_view,
1260        "state reconstruction: starting state {:?} must be before ending state {to_view:?}",
1261        from_leaf.view_number(),
1262    );
1263
1264    // Get the sequence of headers we will be applying to compute the latest state.
1265    let mut leaves = VecDeque::new();
1266    let to_leaf = get_leaf_from_proposal(tx, "view = $1", &(to_view.u64() as i64))
1267        .await
1268        .context(format!(
1269            "unable to reconstruct state because leaf {to_view:?} is not available"
1270        ))?;
1271    let mut parent = to_leaf.parent_commitment();
1272    tracing::debug!(?to_leaf, ?parent, view = ?to_view, "have required leaf");
1273    leaves.push_front(to_leaf.clone());
1274    while parent != Committable::commit(&from_leaf) {
1275        let leaf = get_leaf_from_proposal(tx, "leaf_hash = $1", &parent.to_string())
1276            .await
1277            .context(format!(
1278                "unable to reconstruct state because leaf {parent} is not available"
1279            ))?;
1280        parent = leaf.parent_commitment();
1281        tracing::debug!(?leaf, ?parent, "have required leaf");
1282        leaves.push_front(leaf);
1283    }
1284
1285    // Get the initial state.
1286    let mut parent = from_leaf;
1287    let mut state = ValidatedState::from_header(parent.block_header());
1288
1289    // Pre-load the state with the accounts we care about to ensure they will be present in the
1290    // final state.
1291    let mut catchup = NullStateCatchup::default();
1292
1293    let mut fee_accounts = fee_accounts.iter().copied().collect::<HashSet<_>>();
1294    // Add in all the accounts we will need to replay any of the headers, to ensure that we don't
1295    // need to do catchup recursively.
1296    tracing::info!(
1297        "reconstructing fee accounts state for from height {from_height} to view {to_view}"
1298    );
1299
1300    let dependencies =
1301        fee_header_dependencies(&mut catchup, tx, instance, &parent, &leaves).await?;
1302    fee_accounts.extend(dependencies);
1303    let fee_accounts = fee_accounts.into_iter().collect::<Vec<_>>();
1304    state.fee_merkle_tree = load_accounts(tx, from_height, &fee_accounts)
1305        .await
1306        .context("unable to reconstruct state because accounts are not available at origin")?
1307        .0;
1308    ensure!(
1309        state.fee_merkle_tree.commitment() == parent.block_header().fee_merkle_tree_root(),
1310        "loaded fee state does not match parent header"
1311    );
1312
1313    tracing::info!(
1314        "reconstructing reward accounts for from height {from_height} to view {to_view}"
1315    );
1316
1317    let mut reward_accounts = reward_accounts.iter().copied().collect::<HashSet<_>>();
1318
1319    // Collect all reward account dependencies needed for replaying the STF.
1320    // These accounts must be preloaded into the reward Merkle tree to prevent recursive catchups.
1321    let dependencies = reward_header_dependencies(instance, &leaves).await?;
1322    reward_accounts.extend(dependencies);
1323    let reward_accounts = reward_accounts.into_iter().collect::<Vec<_>>();
1324
1325    // Load all required reward accounts and update the reward Merkle tree.
1326    match parent.block_header().reward_merkle_tree_root() {
1327        either::Either::Left(expected_root) => {
1328            let accts = reward_accounts
1329                .into_iter()
1330                .map(RewardAccountV1::from)
1331                .collect::<Vec<_>>();
1332            state.reward_merkle_tree_v1 = load_v1_reward_accounts(db, from_height, &accts)
1333                .await
1334                .context(
1335                    "unable to reconstruct state because v1 reward accounts are not available at \
1336                     origin",
1337                )?
1338                .0;
1339            ensure!(
1340                state.reward_merkle_tree_v1.commitment() == expected_root,
1341                "loaded v1 reward state does not match parent header"
1342            );
1343        },
1344        either::Either::Right(expected_root) => {
1345            state.reward_merkle_tree_v2 = load_reward_merkle_tree_v2(db, from_height)
1346                .await
1347                .context(
1348                    "unable to reconstruct state because RewardMerkleTreeV2 not available at \
1349                     origin",
1350                )?
1351                .0
1352                .tree;
1353            ensure!(
1354                state.reward_merkle_tree_v2.commitment() == expected_root,
1355                "loaded reward state does not match parent header"
1356            );
1357        },
1358    }
1359
1360    // We need the blocks frontier as well, to apply the STF.
1361    let frontier = load_frontier(tx, from_height)
1362        .await
1363        .context("unable to reconstruct state because frontier is not available at origin")?;
1364    match frontier
1365        .proof
1366        .first()
1367        .context("empty proof for frontier at origin")?
1368    {
1369        MerkleNode::Leaf { pos, elem, .. } => state
1370            .block_merkle_tree
1371            .remember(*pos, *elem, frontier)
1372            .context("failed to remember frontier")?,
1373        _ => bail!("invalid frontier proof"),
1374    }
1375
1376    // Apply subsequent headers to compute the later state.
1377    for proposal in leaves {
1378        state = compute_state_update(&state, instance, &catchup, &parent, &proposal)
1379            .await
1380            .context(format!(
1381                "unable to reconstruct state because state update {} failed",
1382                proposal.height(),
1383            ))?
1384            .0;
1385        parent = proposal;
1386    }
1387
1388    tracing::info!(from_height, ?to_view, "successfully reconstructed state");
1389    Ok((state, to_leaf))
1390}
1391
1392/// Get the dependencies needed to apply the STF to the given list of headers.
1393///
1394/// Returns
1395/// * A state catchup implementation seeded with all the chain configs required to apply the headers
1396///   in `leaves`
1397/// * The set of accounts that must be preloaded to apply these headers
1398async fn fee_header_dependencies<Mode: TransactionMode>(
1399    catchup: &mut NullStateCatchup,
1400    tx: &mut Transaction<Mode>,
1401    instance: &NodeState,
1402    mut parent: &Leaf2,
1403    leaves: impl IntoIterator<Item = &Leaf2>,
1404) -> anyhow::Result<HashSet<FeeAccount>> {
1405    let mut accounts = HashSet::default();
1406
1407    for proposal in leaves {
1408        let header = proposal.block_header();
1409        let height = header.height();
1410        let view = proposal.view_number();
1411        tracing::debug!(height, ?view, "fetching dependencies for proposal");
1412
1413        let header_cf = header.chain_config();
1414        let chain_config = if header_cf.commit() == instance.chain_config.commit() {
1415            instance.chain_config
1416        } else {
1417            match header_cf.resolve() {
1418                Some(cf) => cf,
1419                None => {
1420                    tracing::info!(
1421                        height,
1422                        ?view,
1423                        commit = %header_cf.commit(),
1424                        "chain config not available, attempting to load from storage",
1425                    );
1426                    let cf = load_chain_config(tx, header_cf.commit())
1427                        .await
1428                        .context(format!(
1429                            "loading chain config {} for header {},{:?}",
1430                            header_cf.commit(),
1431                            header.height(),
1432                            proposal.view_number()
1433                        ))?;
1434
1435                    // If we had to fetch a chain config now, store it in the catchup implementation
1436                    // so the STF will be able to look it up later.
1437                    catchup.add_chain_config(cf);
1438                    cf
1439                },
1440            }
1441        };
1442
1443        accounts.insert(chain_config.fee_recipient);
1444        accounts.extend(
1445            get_l1_deposits(instance, header, parent, chain_config.fee_contract)
1446                .await
1447                .into_iter()
1448                .map(|fee| fee.account()),
1449        );
1450        accounts.extend(header.fee_info().accounts());
1451        parent = proposal;
1452    }
1453    Ok(accounts)
1454}
1455
1456/// Identifies all reward accounts required to replay the State Transition Function
1457/// for the given leaf proposals. These accounts should be present in the Merkle tree
1458/// *before* applying the STF to avoid recursive catchup (i.e., STF triggering another catchup).
1459async fn reward_header_dependencies(
1460    instance: &NodeState,
1461    leaves: impl IntoIterator<Item = &Leaf2>,
1462) -> anyhow::Result<HashSet<RewardAccountV2>> {
1463    let mut reward_accounts = HashSet::default();
1464    let epoch_height = instance.epoch_height;
1465
1466    let Some(epoch_height) = epoch_height else {
1467        tracing::info!("epoch height is not set. returning empty reward_header_dependencies");
1468        return Ok(HashSet::new());
1469    };
1470
1471    let coordinator = instance.coordinator.clone();
1472    let membership_lock = coordinator.membership().read().await;
1473    let first_epoch = membership_lock.first_epoch();
1474    drop(membership_lock);
1475    // add all the chain configs needed to apply STF to headers to the catchup
1476    for proposal in leaves {
1477        let header = proposal.block_header();
1478
1479        let height = header.height();
1480        let view = proposal.view_number();
1481        tracing::debug!(height, ?view, "fetching dependencies for proposal");
1482
1483        let version = header.version();
1484        // Skip if version is less than epoch version
1485        if version < EPOCH_VERSION || version >= EPOCH_REWARD_VERSION {
1486            continue;
1487        }
1488
1489        let first_epoch = first_epoch.context("first epoch not found")?;
1490
1491        let proposal_epoch = EpochNumber::new(epoch_from_block_number(height, epoch_height));
1492
1493        // reward distribution starts third epoch onwards
1494        if proposal_epoch <= first_epoch + 1 {
1495            continue;
1496        }
1497
1498        let epoch_membership = match coordinator.membership_for_epoch(Some(proposal_epoch)).await {
1499            Ok(e) => e,
1500            Err(err) => {
1501                tracing::info!(
1502                    "failed to get membership for epoch={proposal_epoch:?}. err={err:#}"
1503                );
1504
1505                coordinator
1506                    .wait_for_catchup(proposal_epoch)
1507                    .await
1508                    .context(format!("failed to catchup for epoch={proposal_epoch}"))?
1509            },
1510        };
1511
1512        let leader = epoch_membership.leader(proposal.view_number()).await?;
1513        let membership_lock = coordinator.membership().read().await;
1514        let validator = membership_lock.get_validator_config(&proposal_epoch, leader)?;
1515        drop(membership_lock);
1516
1517        reward_accounts.insert(RewardAccountV2(validator.account));
1518
1519        let delegators: Vec<RewardAccountV2> = validator
1520            .delegators
1521            .keys()
1522            .map(|d| RewardAccountV2(*d))
1523            .collect();
1524
1525        reward_accounts.extend(delegators);
1526    }
1527    Ok(reward_accounts)
1528}
1529
1530async fn get_leaf_from_proposal<Mode, P>(
1531    tx: &mut Transaction<Mode>,
1532    where_clause: &str,
1533    param: P,
1534) -> anyhow::Result<Leaf2>
1535where
1536    P: Type<Db> + for<'q> Encode<'q, Db>,
1537{
1538    let (data,) = query_as::<(Vec<u8>,)>(&format!(
1539        "SELECT data FROM quorum_proposals2 WHERE {where_clause} LIMIT 1",
1540    ))
1541    .bind(param)
1542    .fetch_one(tx.as_mut())
1543    .await?;
1544    let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1545        bincode::deserialize(&data)?;
1546    Ok(Leaf2::from_quorum_proposal(&proposal.data))
1547}
1548
1549#[cfg(any(test, feature = "testing"))]
1550pub(crate) mod impl_testable_data_source {
1551
1552    use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
1553
1554    use super::*;
1555    use crate::api::{self, data_source::testing::TestableSequencerDataSource};
1556
1557    pub fn tmp_options(db: &TmpDb) -> Options {
1558        #[cfg(not(feature = "embedded-db"))]
1559        {
1560            let opt = crate::persistence::sql::PostgresOptions {
1561                port: Some(db.port()),
1562                host: Some(db.host()),
1563                user: Some("postgres".into()),
1564                password: Some("password".into()),
1565                ..Default::default()
1566            };
1567
1568            opt.into()
1569        }
1570
1571        #[cfg(feature = "embedded-db")]
1572        {
1573            let opt = crate::persistence::sql::SqliteOptions { path: db.path() };
1574            opt.into()
1575        }
1576    }
1577
1578    #[async_trait]
1579    impl TestableSequencerDataSource for DataSource {
1580        type Storage = TmpDb;
1581
1582        async fn create_storage() -> Self::Storage {
1583            TmpDb::init().await
1584        }
1585
1586        fn persistence_options(storage: &Self::Storage) -> Self::Options {
1587            tmp_options(storage)
1588        }
1589
1590        fn leaf_only_ds_options(
1591            storage: &Self::Storage,
1592            opt: api::Options,
1593        ) -> anyhow::Result<api::Options> {
1594            let mut ds_opts = tmp_options(storage);
1595            ds_opts.lightweight = true;
1596            Ok(opt.query_sql(Default::default(), ds_opts))
1597        }
1598
1599        fn options(storage: &Self::Storage, opt: api::Options) -> api::Options {
1600            opt.query_sql(Default::default(), tmp_options(storage))
1601        }
1602    }
1603}
1604
1605#[cfg(test)]
1606mod tests {
1607    use alloy::primitives::Address;
1608    use espresso_types::{
1609        v0_3::RewardAmount,
1610        v0_4::{REWARD_MERKLE_TREE_V2_HEIGHT, RewardAccountV2, RewardMerkleTreeV2},
1611    };
1612    use hotshot_query_service::{
1613        data_source::{
1614            Transaction, VersionedDataSource,
1615            sql::Config,
1616            storage::{
1617                MerklizedStateStorage, UpdateAvailabilityStorage,
1618                sql::{
1619                    SqlStorage, StorageConnectionType, Transaction as SqlTransaction, Write,
1620                    testing::TmpDb,
1621                },
1622            },
1623        },
1624        merklized_state::{MerklizedState, Snapshot, UpdateStateData},
1625    };
1626    use jf_merkle_tree_compat::{
1627        LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme,
1628    };
1629    use light_client::testing::{leaf_chain, leaf_chain_with_upgrade};
1630    use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION, Upgrade};
1631
1632    use super::impl_testable_data_source::tmp_options;
1633    use crate::{SeqTypes, api::RewardMerkleTreeDataSource};
1634
1635    fn make_reward_account(i: usize) -> RewardAccountV2 {
1636        let mut addr_bytes = [0u8; 20];
1637        addr_bytes[16..20].copy_from_slice(&(i as u32).to_be_bytes());
1638        RewardAccountV2(Address::from(addr_bytes))
1639    }
1640
1641    async fn insert_test_header(
1642        tx: &mut SqlTransaction<Write>,
1643        block_height: u64,
1644        reward_tree: &RewardMerkleTreeV2,
1645    ) {
1646        let reward_commitment = serde_json::to_value(reward_tree.commitment()).unwrap();
1647        let test_data = serde_json::json!({
1648            "block_merkle_tree_root": format!("block_root_{}", block_height),
1649            "fee_merkle_tree_root": format!("fee_root_{}", block_height),
1650            "fields": {
1651                RewardMerkleTreeV2::header_state_commitment_field(): reward_commitment
1652            }
1653        });
1654        tx.upsert(
1655            "header",
1656            [
1657                "height",
1658                "hash",
1659                "payload_hash",
1660                "timestamp",
1661                "data",
1662                "ns_table",
1663            ],
1664            ["height"],
1665            [(
1666                block_height as i64,
1667                format!("hash_{}", block_height),
1668                format!("payload_{}", block_height),
1669                block_height as i64,
1670                test_data,
1671                "ns_table".to_string(),
1672            )],
1673        )
1674        .await
1675        .unwrap();
1676    }
1677
1678    async fn batch_insert_proofs(
1679        tx: &mut SqlTransaction<Write>,
1680        reward_tree: &RewardMerkleTreeV2,
1681        accounts: &[RewardAccountV2],
1682        block_height: u64,
1683    ) {
1684        let proofs_and_paths: Vec<_> = accounts
1685            .iter()
1686            .map(|account| {
1687                let proof = match reward_tree.universal_lookup(*account) {
1688                    LookupResult::Ok(_, proof) => proof,
1689                    LookupResult::NotInMemory => panic!("account not in memory"),
1690                    LookupResult::NotFound(proof) => proof,
1691                };
1692                let traversal_path = <RewardAccountV2 as ToTraversalPath<
1693                    { RewardMerkleTreeV2::ARITY },
1694                >>::to_traversal_path(
1695                    account, reward_tree.height()
1696                );
1697                (proof, traversal_path)
1698            })
1699            .collect();
1700
1701        UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::insert_merkle_nodes_batch(
1702            tx,
1703            proofs_and_paths,
1704            block_height,
1705        )
1706        .await
1707        .expect("failed to batch insert proofs");
1708    }
1709
1710    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1711    async fn test_reward_accounts_batch_insertion() {
1712        // Batch insertion of 1000 accounts at height 1
1713        // Balance updates for some accounts at height 2
1714        // New accounts added at height 2
1715        // More balance updates at height 3
1716        // Querying correct balances at each height snapshot
1717
1718        let db = TmpDb::init().await;
1719        let opt = tmp_options(&db);
1720        let cfg = Config::try_from(&opt).expect("failed to create config from options");
1721        let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
1722            .await
1723            .expect("failed to connect to storage");
1724
1725        let num_initial_accounts = 1000usize;
1726
1727        let initial_accounts: Vec<RewardAccountV2> =
1728            (0..num_initial_accounts).map(make_reward_account).collect();
1729
1730        tracing::info!(
1731            "Height 1: Inserting {} initial accounts",
1732            num_initial_accounts
1733        );
1734
1735        let mut reward_tree_h1 = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT);
1736
1737        for (i, account) in initial_accounts.iter().enumerate() {
1738            let reward_amount = RewardAmount::from(((i + 1) * 1000) as u64);
1739            reward_tree_h1.update(*account, reward_amount).unwrap();
1740        }
1741
1742        let mut tx = storage.write().await.unwrap();
1743        insert_test_header(&mut tx, 1, &reward_tree_h1).await;
1744        batch_insert_proofs(&mut tx, &reward_tree_h1, &initial_accounts, 1).await;
1745
1746        UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 1)
1747            .await
1748            .unwrap();
1749        tx.commit().await.unwrap();
1750
1751        tracing::info!("Height 2: Updating balances and adding new accounts");
1752
1753        let mut reward_tree_h2 = reward_tree_h1.clone();
1754
1755        // Update balances for accounts 0-99
1756        let updated_accounts_h2: Vec<RewardAccountV2> = (0..100).map(make_reward_account).collect();
1757        for (i, account) in updated_accounts_h2.iter().enumerate() {
1758            let new_reward = RewardAmount::from(((i + 1) * 2000) as u64);
1759            reward_tree_h2.update(*account, new_reward).unwrap();
1760        }
1761
1762        // Add 100 new accounts (1000..1099)
1763        let new_accounts_h2: Vec<RewardAccountV2> = (1000..1100).map(make_reward_account).collect();
1764        for (i, account) in new_accounts_h2.iter().enumerate() {
1765            let reward_amount = RewardAmount::from(((i + 1001) * 500) as u64);
1766            reward_tree_h2.update(*account, reward_amount).unwrap();
1767        }
1768
1769        let mut changed_accounts_h2 = updated_accounts_h2.clone();
1770        changed_accounts_h2.extend(new_accounts_h2.clone());
1771
1772        let mut tx = storage.write().await.unwrap();
1773        insert_test_header(&mut tx, 2, &reward_tree_h2).await;
1774        batch_insert_proofs(&mut tx, &reward_tree_h2, &changed_accounts_h2, 2).await;
1775
1776        UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 2)
1777            .await
1778            .unwrap();
1779        tx.commit().await.unwrap();
1780
1781        tracing::info!("Height 3: More balance updates");
1782
1783        let mut reward_tree_h3 = reward_tree_h2.clone();
1784
1785        // Update balances for accounts 500-599
1786        let updated_accounts_h3: Vec<RewardAccountV2> =
1787            (500..600).map(make_reward_account).collect();
1788        for (i, account) in updated_accounts_h3.iter().enumerate() {
1789            let new_reward = RewardAmount::from(((500 + i + 1) * 3000) as u64);
1790            reward_tree_h3.update(*account, new_reward).unwrap();
1791        }
1792
1793        let mut tx = storage.write().await.unwrap();
1794        insert_test_header(&mut tx, 3, &reward_tree_h3).await;
1795        batch_insert_proofs(&mut tx, &reward_tree_h3, &updated_accounts_h3, 3).await;
1796
1797        UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 3)
1798            .await
1799            .unwrap();
1800        tx.commit().await.unwrap();
1801
1802        tracing::info!("Verifying all account proofs at each height");
1803
1804        // Verify height=1
1805        // All 1000 initial accounts
1806        let snapshot_h1 =
1807            Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(1);
1808        for i in 0..num_initial_accounts {
1809            let account = make_reward_account(i);
1810            let proof = storage
1811                .read()
1812                .await
1813                .unwrap()
1814                .get_path(snapshot_h1, account)
1815                .await
1816                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h1: {e}"));
1817
1818            let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
1819            let actual_reward = proof.elem().expect("account should exist");
1820            assert_eq!(*actual_reward, expected_reward,);
1821
1822            assert!(
1823                RewardMerkleTreeV2::verify(reward_tree_h1.commitment(), account, proof)
1824                    .unwrap()
1825                    .is_ok(),
1826            );
1827        }
1828        tracing::info!("Verified height=1 {num_initial_accounts} accounts with proofs",);
1829
1830        // Verify accounts 1000-1099 don't exist at height 1
1831        for i in 1000..1100 {
1832            let account = make_reward_account(i);
1833            let proof = storage
1834                .read()
1835                .await
1836                .unwrap()
1837                .get_path(snapshot_h1, account)
1838                .await
1839                .unwrap();
1840            assert!(proof.elem().is_none(),);
1841
1842            // Verify non-membership proof
1843            assert!(
1844                RewardMerkleTreeV2::non_membership_verify(
1845                    reward_tree_h1.commitment(),
1846                    account,
1847                    proof
1848                )
1849                .unwrap(),
1850            );
1851        }
1852        tracing::info!("Height 1: Verified 100 non-membership proofs");
1853
1854        // Verify height 2
1855        let snapshot_h2 =
1856            Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(2);
1857
1858        // Accounts 0-99
1859        for i in 0..100 {
1860            let account = make_reward_account(i);
1861            let proof = storage
1862                .read()
1863                .await
1864                .unwrap()
1865                .get_path(snapshot_h2, account)
1866                .await
1867                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
1868
1869            let expected_reward = RewardAmount::from(((i + 1) * 2000) as u64);
1870            let actual_reward = proof.elem().expect("account should exist");
1871            assert_eq!(*actual_reward, expected_reward,);
1872
1873            assert!(
1874                RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
1875                    .unwrap()
1876                    .is_ok(),
1877            );
1878        }
1879
1880        // Accounts 100-999: original rewards
1881        for i in 100..1000 {
1882            let account = make_reward_account(i);
1883            let proof = storage
1884                .read()
1885                .await
1886                .unwrap()
1887                .get_path(snapshot_h2, account)
1888                .await
1889                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
1890
1891            let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
1892            let actual_reward = proof.elem().expect("account should exist");
1893            assert_eq!(*actual_reward, expected_reward,);
1894
1895            assert!(
1896                RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
1897                    .unwrap()
1898                    .is_ok(),
1899            );
1900        }
1901
1902        // Accounts 1000-1099
1903        // new accounts
1904        for i in 1000..1100 {
1905            let account = make_reward_account(i);
1906            let proof = storage
1907                .read()
1908                .await
1909                .unwrap()
1910                .get_path(snapshot_h2, account)
1911                .await
1912                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
1913
1914            let expected_reward = RewardAmount::from(((i + 1) * 500) as u64);
1915            let actual_reward = proof.elem().expect("account should exist");
1916            assert_eq!(*actual_reward, expected_reward,);
1917
1918            assert!(
1919                RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
1920                    .unwrap()
1921                    .is_ok(),
1922            );
1923        }
1924        tracing::info!("Height 2: Verified all 1100 accounts with proofs");
1925
1926        // Verify HEIGHT 3: All accounts
1927        let snapshot_h3 =
1928            Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(3);
1929
1930        // Accounts 0-99
1931        for i in 0..100 {
1932            let account = make_reward_account(i);
1933            let proof = storage
1934                .read()
1935                .await
1936                .unwrap()
1937                .get_path(snapshot_h3, account)
1938                .await
1939                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
1940
1941            let expected_reward = RewardAmount::from(((i + 1) * 2000) as u64);
1942            let actual_reward = proof.elem().expect("account should exist");
1943            assert_eq!(*actual_reward, expected_reward,);
1944
1945            assert!(
1946                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
1947                    .unwrap()
1948                    .is_ok(),
1949            );
1950        }
1951
1952        for i in 100..500 {
1953            let account = make_reward_account(i);
1954            let proof = storage
1955                .read()
1956                .await
1957                .unwrap()
1958                .get_path(snapshot_h3, account)
1959                .await
1960                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
1961
1962            let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
1963            let actual_reward = proof.elem().expect("account should exist");
1964            assert_eq!(*actual_reward, expected_reward,);
1965
1966            assert!(
1967                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
1968                    .unwrap()
1969                    .is_ok(),
1970            );
1971        }
1972
1973        // Accounts 500-599
1974        for i in 500..600 {
1975            let account = make_reward_account(i);
1976            let proof = storage
1977                .read()
1978                .await
1979                .unwrap()
1980                .get_path(snapshot_h3, account)
1981                .await
1982                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
1983
1984            let expected_reward = RewardAmount::from(((i + 1) * 3000) as u64);
1985            let actual_reward = proof.elem().expect("account should exist");
1986            assert_eq!(*actual_reward, expected_reward,);
1987
1988            assert!(
1989                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
1990                    .unwrap()
1991                    .is_ok(),
1992            );
1993        }
1994
1995        // Accounts 600-999
1996        for i in 600..1000 {
1997            let account = make_reward_account(i);
1998            let proof = storage
1999                .read()
2000                .await
2001                .unwrap()
2002                .get_path(snapshot_h3, account)
2003                .await
2004                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
2005
2006            let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
2007            let actual_reward = proof.elem().expect("account should exist");
2008            assert_eq!(*actual_reward, expected_reward,);
2009
2010            assert!(
2011                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
2012                    .unwrap()
2013                    .is_ok(),
2014            );
2015        }
2016
2017        // Accounts 1000-1099: new accounts (from h2)
2018        for i in 1000..1100 {
2019            let account = make_reward_account(i);
2020            let proof = storage
2021                .read()
2022                .await
2023                .unwrap()
2024                .get_path(snapshot_h3, account)
2025                .await
2026                .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
2027
2028            let expected_reward = RewardAmount::from(((i + 1) * 500) as u64);
2029            let actual_reward = proof.elem().expect("account should exist");
2030            assert_eq!(*actual_reward, expected_reward,);
2031
2032            assert!(
2033                RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
2034                    .unwrap()
2035                    .is_ok(),
2036            );
2037        }
2038        tracing::info!("Height 3: Verified all 1100 accounts with proofs");
2039
2040        // Verify non-membership proofs for accounts that never existed
2041        for i in 1100..1110 {
2042            let account = make_reward_account(i);
2043            let proof = storage
2044                .read()
2045                .await
2046                .unwrap()
2047                .get_path(snapshot_h3, account)
2048                .await
2049                .unwrap();
2050
2051            assert!(
2052                proof.elem().is_none(),
2053                "Account {i} should not exist at height 3"
2054            );
2055
2056            assert!(
2057                RewardMerkleTreeV2::non_membership_verify(
2058                    reward_tree_h3.commitment(),
2059                    account,
2060                    proof
2061                )
2062                .unwrap(),
2063            );
2064        }
2065        tracing::info!("Height 3: Verified 10 non-membership proofs");
2066    }
2067
2068    #[tokio::test]
2069    #[test_log::test]
2070    async fn test_merkle_proof_gc() {
2071        let db = TmpDb::init().await;
2072        let opt = tmp_options(&db);
2073        let cfg = Config::try_from(&opt).expect("failed to create config from options");
2074        let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2075            .await
2076            .expect("failed to connect to storage");
2077
2078        let account = vec![0; 32];
2079
2080        // Insert mock leaves at heights 0-2 so load_proof can check leaf version.
2081        let leaves = leaf_chain(0..=2, DRB_AND_HEADER_UPGRADE_VERSION).await;
2082        let mut tx = storage.write().await.unwrap();
2083        for leaf in &leaves {
2084            tx.insert_leaf(leaf).await.unwrap();
2085        }
2086        Transaction::commit(tx).await.unwrap();
2087
2088        // Insert some mock proofs at heights 0-2.
2089        for h in 0..=2 {
2090            storage
2091                .persist_proofs(h, [(account.clone(), h.to_le_bytes().to_vec())].into_iter())
2092                .await
2093                .unwrap();
2094        }
2095
2096        // Test garbage collection.
2097        storage.garbage_collect(2).await.unwrap();
2098
2099        // Make sure the proof at height 2 is still available.
2100        assert_eq!(
2101            storage.load_proof(2, account.clone(), 0).await.unwrap(),
2102            2u64.to_le_bytes()
2103        );
2104
2105        // Meanwhile, the proofs at heights 0-1 have been garbage collected.
2106        for h in 0..2 {
2107            let err = storage.load_proof(h, account.clone(), 0).await.unwrap_err();
2108            assert!(err.to_string().contains("Missing proof"), "{err:#}");
2109        }
2110
2111        // Garbage collect the remaining proof.
2112        storage.garbage_collect(3).await.unwrap();
2113        let err = storage.load_proof(2, account, 0).await.unwrap_err();
2114        assert!(err.to_string().contains("Missing proof"), "{err:#}");
2115    }
2116
2117    #[tokio::test]
2118    #[test_log::test]
2119    async fn test_load_proof_v5_epoch_boundary() {
2120        let db = TmpDb::init().await;
2121        let opt = tmp_options(&db);
2122        let cfg = Config::try_from(&opt).expect("failed to create config from options");
2123        let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2124            .await
2125            .expect("failed to connect to storage");
2126
2127        let epoch_height = 10u64;
2128        let account = vec![0; 32];
2129
2130        // Create V5 leaves at heights 0..=15.
2131        let leaves = leaf_chain(0..=15, EPOCH_REWARD_VERSION).await;
2132        let mut tx = storage.write().await.unwrap();
2133        for leaf in &leaves {
2134            tx.insert_leaf(leaf).await.unwrap();
2135        }
2136        Transaction::commit(tx).await.unwrap();
2137
2138        // Store proofs only at epoch boundaries (height 10).
2139        let boundary_proof = b"proof_at_10".to_vec();
2140        {
2141            let mut tx = storage.write().await.unwrap();
2142            tx.upsert(
2143                "reward_merkle_tree_v2_proofs",
2144                ["height", "account", "proof"],
2145                ["height", "account"],
2146                [(10i64, account.clone(), boundary_proof.clone())],
2147            )
2148            .await
2149            .unwrap();
2150            Transaction::commit(tx).await.unwrap();
2151        }
2152
2153        // Querying at the epoch boundary itself should return the proof directly.
2154        assert_eq!(
2155            storage
2156                .load_proof(10, account.clone(), epoch_height)
2157                .await
2158                .unwrap(),
2159            boundary_proof,
2160        );
2161
2162        // Querying at a non-boundary V5 height (e.g. 15) should resolve to the
2163        // previous epoch boundary (10) and return its proof.
2164        assert_eq!(
2165            storage
2166                .load_proof(15, account.clone(), epoch_height)
2167                .await
2168                .unwrap(),
2169            boundary_proof,
2170        );
2171    }
2172
2173    #[tokio::test]
2174    #[test_log::test]
2175    async fn test_load_proof_v4_to_v5_upgrade_boundary() {
2176        let db = TmpDb::init().await;
2177        let opt = tmp_options(&db);
2178        let cfg = Config::try_from(&opt).expect("failed to create config from options");
2179        let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2180            .await
2181            .expect("failed to connect to storage");
2182
2183        let epoch_height = 10u64;
2184        let account = vec![0; 32];
2185
2186        // V4 leaves at heights 0..=10, V5 from height 11 onward.
2187        // The upgrade happens at height 11.
2188        let upgrade = Upgrade::new(DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION);
2189        let leaves = leaf_chain_with_upgrade(0..=15, 11, upgrade).await;
2190        {
2191            let mut tx = storage.write().await.unwrap();
2192            for leaf in &leaves {
2193                tx.insert_leaf(leaf).await.unwrap();
2194            }
2195            Transaction::commit(tx).await.unwrap();
2196        }
2197
2198        // Querying at V5 height 15 resolves to prev epoch boundary
2199        // But leaf at height 10 is V4, so this should fail.
2200        storage
2201            .load_proof(15, account.clone(), epoch_height)
2202            .await
2203            .unwrap_err();
2204
2205        let v4_proof = b"v4_proof_at_5".to_vec();
2206        {
2207            let mut tx = storage.write().await.unwrap();
2208            tx.upsert(
2209                "reward_merkle_tree_v2_proofs",
2210                ["height", "account", "proof"],
2211                ["height", "account"],
2212                [(5i64, account.clone(), v4_proof.clone())],
2213            )
2214            .await
2215            .unwrap();
2216            Transaction::commit(tx).await.unwrap();
2217        }
2218        assert_eq!(
2219            storage.load_proof(5, account.clone(), 0).await.unwrap(),
2220            v4_proof,
2221        );
2222    }
2223
2224    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2225    async fn test_get_table_sizes() {
2226        use super::super::data_source::DatabaseMetadataSource;
2227
2228        let db = TmpDb::init().await;
2229        let opt = tmp_options(&db);
2230        let cfg = Config::try_from(&opt).expect("failed to create config from options");
2231        let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2232            .await
2233            .expect("failed to connect to storage");
2234
2235        // Insert some test data to ensure tables have rows
2236        let mut tx = storage.write().await.unwrap();
2237
2238        // Insert a test header
2239        let reward_tree = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT);
2240        insert_test_header(&mut tx, 1, &reward_tree).await;
2241
2242        tx.commit().await.unwrap();
2243
2244        // Call get_table_sizes and verify it doesn't error
2245        let table_sizes = storage
2246            .get_table_sizes()
2247            .await
2248            .expect("get_table_sizes should succeed");
2249
2250        // Verify we got some tables back
2251        assert!(
2252            !table_sizes.is_empty(),
2253            "should have at least one table in the database"
2254        );
2255    }
2256}