1use std::collections::{HashMap, HashSet, VecDeque};
2
3use anyhow::{Context, bail, ensure};
4use async_trait::async_trait;
5use committable::{Commitment, Committable};
6use espresso_types::{
7 BlockMerkleTree, ChainConfig, FeeAccount, FeeMerkleTree, Leaf2, NodeState, ValidatedState,
8 get_l1_deposits,
9 v0_1::IterableFeeInfo,
10 v0_3::{
11 REWARD_MERKLE_TREE_V1_HEIGHT, RewardAccountProofV1, RewardAccountQueryDataV1,
12 RewardAccountV1, RewardMerkleTreeV1,
13 },
14 v0_4::{PermittedRewardMerkleTreeV2, RewardAccountV2, RewardMerkleTreeV2},
15 v0_6::RewardAccountQueryDataV2,
16};
17use futures::future::Future;
18use hotshot::traits::ValidatedState as _;
19use hotshot_query_service::{
20 Resolvable,
21 availability::LeafId,
22 data_source::{
23 VersionedDataSource,
24 sql::{Config, SqlDataSource, Transaction},
25 storage::{
26 AvailabilityStorage, MerklizedStateStorage, NodeStorage, SqlStorage,
27 pruning::PrunerConfig,
28 sql::{Db, TransactionMode, Write, query_as},
29 },
30 },
31 merklized_state::Snapshot,
32};
33use hotshot_types::{
34 data::{EpochNumber, QuorumProposalWrapper, ViewNumber},
35 message::Proposal,
36 traits::election::MembershipSnapshot,
37 utils::{epoch_from_block_number, is_last_block},
38 vote::HasViewNumber,
39};
40use jf_merkle_tree_compat::{
41 ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, LookupResult,
42 MerkleTreeScheme, prelude::MerkleNode,
43};
44use sqlx::{Encode, Row, Type};
45use vbs::version::Version;
46use versions::{
47 DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION, EPOCH_VERSION, NEW_PROTOCOL_VERSION,
48};
49
50use super::{
51 BlocksFrontier,
52 data_source::{Provider, SequencerDataSource},
53};
54use crate::{
55 SeqTypes,
56 api::RewardMerkleTreeDataSource,
57 catchup::{CatchupStorage, NullStateCatchup},
58 persistence::{ChainConfigPersistence, sql::Options},
59 state::compute_state_update,
60 util::BoundedJoinSet,
61};
62
63pub type DataSource = SqlDataSource<SeqTypes, Provider>;
64
65#[async_trait]
66impl SequencerDataSource for DataSource {
67 type Options = Options;
68
69 async fn create(opt: Self::Options, provider: Provider, reset: bool) -> anyhow::Result<Self> {
70 let fetch_limit = opt.fetch_rate_limit;
71 let active_fetch_delay = opt.active_fetch_delay;
72 let chunk_fetch_delay = opt.chunk_fetch_delay;
73 let mut cfg = Config::try_from(&opt)?;
74
75 if reset {
76 cfg = cfg.reset_schema();
77 }
78
79 let mut builder = cfg.builder(provider).await?;
80
81 if let Some(limit) = fetch_limit {
82 builder = builder.with_rate_limit(limit);
83 }
84
85 if opt.lightweight {
86 tracing::warn!("enabling light weight mode..");
87 builder = builder.leaf_only();
88 }
89
90 if let Some(delay) = active_fetch_delay {
91 builder = builder.with_active_fetch_delay(delay);
92 }
93 if let Some(delay) = chunk_fetch_delay {
94 builder = builder.with_chunk_fetch_delay(delay);
95 }
96 if let Some(chunk_size) = opt.sync_status_chunk_size {
97 builder = builder.with_sync_status_chunk_size(chunk_size);
98 }
99 if let Some(ttl) = opt.sync_status_ttl {
100 builder = builder.with_sync_status_ttl(ttl);
101 }
102 if let Some(chunk_size) = opt.proactive_scan_chunk_size {
103 builder = builder.with_proactive_range_chunk_size(chunk_size);
104 }
105 if let Some(interval) = opt.proactive_scan_interval {
106 builder = builder.with_proactive_interval(interval);
107 }
108 if opt.disable_proactive_fetching {
109 builder = builder.disable_proactive_fetching();
110 }
111
112 builder.build().await
113 }
114}
115
116impl RewardMerkleTreeDataSource for SqlStorage {
117 async fn load_v1_reward_account_proof(
118 &self,
119 height: u64,
120 account: RewardAccountV1,
121 ) -> anyhow::Result<RewardAccountQueryDataV1> {
122 let mut tx = self.read().await.context(format!(
123 "opening transaction to fetch v1 reward account {account:?}; height {height}"
124 ))?;
125
126 let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
127 .await
128 .context("getting block height")? as u64;
129 ensure!(
130 block_height > 0,
131 "cannot get accounts for height {height}: no blocks available"
132 );
133
134 if height < block_height {
137 let (tree, _) = load_v1_reward_accounts(self, height, &[account])
138 .await
139 .with_context(|| {
140 format!("failed to load v1 reward account {account:?} at height {height}")
141 })?;
142
143 let (proof, balance) = RewardAccountProofV1::prove(&tree, account.into())
144 .with_context(|| {
145 format!("reward account {account:?} not available at height {height}")
146 })?;
147
148 Ok(RewardAccountQueryDataV1 { balance, proof })
149 } else {
150 bail!(
151 "requested height {height} is not yet available (latest block height: \
152 {block_height})"
153 );
154 }
155 }
156
157 fn persist_tree(
158 &self,
159 height: u64,
160 merkle_tree: Vec<u8>,
161 ) -> impl Send + Future<Output = anyhow::Result<()>> {
162 async move {
163 let mut tx = self
164 .write()
165 .await
166 .context("opening transaction for reward merkle tree v2")?;
167
168 tx.upsert(
169 "reward_merkle_tree_v2_data",
170 ["height", "balances"],
171 ["height"],
172 [(height as i64, merkle_tree)],
173 )
174 .await?;
175
176 hotshot_query_service::data_source::Transaction::commit(tx)
177 .await
178 .context("Transaction to store reward merkle tree v2 failed.")
179 }
180 }
181
182 fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
183 async move {
184 let mut tx = self
185 .read()
186 .await
187 .context("opening transaction for state update")?;
188
189 let row = sqlx::query(
190 r#"
191 SELECT balances
192 FROM reward_merkle_tree_v2_data
193 WHERE height = $1
194 "#,
195 )
196 .bind(height as i64)
197 .fetch_optional(tx.as_mut())
198 .await?
199 .context(format!(
200 "No reward merkle tree for height {} in storage",
201 height
202 ))?;
203
204 row.try_get::<Vec<u8>, _>("balances")
205 .context("Missing field balances from row; this should never happen")
206 }
207 }
208
209 fn load_latest_tree(
210 &self,
211 height: u64,
212 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
213 async move {
214 let mut tx = self
215 .read()
216 .await
217 .context("opening transaction for load_latest_tree")?;
218
219 let row = sqlx::query(
220 r#"
221 SELECT balances
222 FROM reward_merkle_tree_v2_data
223 WHERE height <= $1
224 ORDER BY height DESC
225 LIMIT 1
226 "#,
227 )
228 .bind(height as i64)
229 .fetch_optional(tx.as_mut())
230 .await?
231 .context(format!(
232 "No reward merkle tree at or below height {} in storage",
233 height
234 ))?;
235
236 row.try_get::<Vec<u8>, _>("balances")
237 .context("Missing field balances from row; this should never happen")
238 }
239 }
240
241 fn persist_proofs(
242 &self,
243 height: u64,
244 proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
245 ) -> impl Send + Future<Output = anyhow::Result<()>> {
246 async move {
247 let mut iter = proofs.map(|(account, proof)| (height as i64, account, proof));
248
249 loop {
250 let mut chunk = Vec::with_capacity(20);
251
252 for _ in 0..20 {
253 let Some(row) = iter.next() else {
254 continue;
255 };
256 chunk.push(row);
257 }
258
259 if chunk.is_empty() {
260 break;
261 }
262
263 let mut tx = self
264 .write()
265 .await
266 .context("opening transaction for state update")?;
267
268 tokio::spawn(async move {
269 tx.upsert(
270 "reward_merkle_tree_v2_proofs",
271 ["height", "account", "proof"],
272 ["height", "account"],
273 chunk,
274 )
275 .await?;
276
277 hotshot_query_service::data_source::Transaction::commit(tx)
278 .await
279 .context("Transaction to store reward merkle tree failed.")?;
280
281 Ok::<_, anyhow::Error>(())
282 });
283 }
284 Ok(())
285 }
286 }
287
288 fn load_proof(
295 &self,
296 height: u64,
297 account: Vec<u8>,
298 epoch_height: u64,
299 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
300 async move {
301 let mut tx = self
302 .read()
303 .await
304 .context("opening transaction for load_proof")?;
305
306 let leaf = tx
307 .get_leaf(LeafId::<SeqTypes>::from(height as usize))
308 .await
309 .context(format!("leaf {height} not available"))?;
310
311 let proof_height = if leaf.header().version() < EPOCH_REWARD_VERSION
314 || is_last_block(height, epoch_height)
315 {
316 height
317 } else {
318 let prev_epoch_last_block =
319 epoch_from_block_number(height, epoch_height).saturating_sub(1) * epoch_height;
320 let boundary_leaf = tx
321 .get_leaf(LeafId::<SeqTypes>::from(prev_epoch_last_block as usize))
322 .await
323 .context(format!(
324 "leaf at epoch boundary {prev_epoch_last_block} not available"
325 ))?;
326 ensure!(
327 boundary_leaf.header().version() >= EPOCH_REWARD_VERSION,
328 "no epoch reward proofs available at boundary {prev_epoch_last_block}"
329 );
330 prev_epoch_last_block
331 };
332
333 sqlx::query_scalar(
334 "SELECT proof FROM reward_merkle_tree_v2_proofs WHERE height = $1 AND account = $2",
335 )
336 .bind(proof_height as i64)
337 .bind(account)
338 .fetch_optional(tx.as_mut())
339 .await?
340 .context(format!(
341 "Missing proofs at height {proof_height} (resolved from {height})"
342 ))
343 }
344 }
345
346 fn load_latest_proof(
347 &self,
348 account: Vec<u8>,
349 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
350 async move {
351 let mut tx = self
352 .read()
353 .await
354 .context("opening transaction for state update")?;
355
356 let row = sqlx::query(
357 r#"
358 SELECT proof
359 FROM reward_merkle_tree_v2_proofs
360 WHERE account = $1
361 ORDER BY height DESC
362 LIMIT 1
363 "#,
364 )
365 .bind(account)
366 .fetch_optional(tx.as_mut())
367 .await?
368 .context("Missing proofs")?;
369
370 row.try_get::<Vec<u8>, _>("proof")
371 .context("Missing field proof from row; this should never happen")
372 }
373 }
374
375 fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
376 async move {
377 let batch_size = self.get_pruning_config().unwrap_or_default().batch_size();
378
379 #[cfg(not(feature = "embedded-db"))]
383 let for_update = "FOR UPDATE";
384 #[cfg(feature = "embedded-db")]
385 let for_update = "";
386
387 loop {
389 let mut tx = self
390 .write()
391 .await
392 .context("opening transaction for state update")?;
393
394 let res = sqlx::query(&format!(
395 "
396 WITH delete_batch AS (
397 SELECT d.height FROM reward_merkle_tree_v2_data AS d
398 WHERE d.height < $1
399 ORDER BY d.height DESC
400 LIMIT $2
401 {for_update}
402 )
403 DELETE FROM reward_merkle_tree_v2_data AS del
404 WHERE del.height IN (SELECT * FROM delete_batch)
405 "
406 ))
407 .bind(height as i64)
408 .bind(batch_size as i64)
409 .execute(tx.as_mut())
410 .await?;
411
412 hotshot_query_service::data_source::Transaction::commit(tx)
413 .await
414 .context(
415 "Transaction to garbage collect reward merkle trees from storage failed.",
416 )?;
417
418 if res.rows_affected() == 0 {
419 break;
420 } else {
421 tracing::debug!(
422 "deleted {} rows from reward_merkle_tree_v2_data",
423 res.rows_affected()
424 );
425 }
426 }
427
428 loop {
430 let mut tx = self
431 .write()
432 .await
433 .context("opening transaction for state update")?;
434
435 let res = sqlx::query(&format!(
436 "
437 WITH delete_batch AS (
438 SELECT d.height, d.account FROM reward_merkle_tree_v2_proofs AS d
439 WHERE d.height < $1
440 ORDER BY d.height, d.account DESC
441 LIMIT $2
442 {for_update}
443 )
444 DELETE FROM reward_merkle_tree_v2_proofs AS del
445 WHERE (del.height, del.account) IN (SELECT * FROM delete_batch)
446 ",
447 ))
448 .bind(height as i64)
449 .bind(batch_size as i64)
450 .execute(tx.as_mut())
451 .await?;
452
453 hotshot_query_service::data_source::Transaction::commit(tx)
454 .await
455 .context("Transaction to garbage collect proofs from storage failed.")?;
456
457 if res.rows_affected() == 0 {
458 break;
459 } else {
460 tracing::debug!(
461 "deleted {} rows from reward_merkle_tree_v2_proofs",
462 res.rows_affected()
463 );
464 }
465 }
466
467 Ok(())
468 }
469 }
470
471 fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool> {
472 async move {
473 let Ok(mut tx) = self.write().await else {
474 return false;
475 };
476
477 sqlx::query_as(
478 r#"
479 SELECT EXISTS(
480 SELECT 1 FROM reward_merkle_tree_v2_proofs
481 WHERE height = $1
482 )
483 "#,
484 )
485 .bind(height as i64)
486 .fetch_one(tx.as_mut())
487 .await
488 .ok()
489 .unwrap_or((false,))
490 .0
491 }
492 }
493 fn persist_reward_proofs(
500 &self,
501 node_state: &NodeState,
502 height: u64,
503 version: Version,
504 ) -> impl Send + Future<Output = anyhow::Result<()>> {
505 async move {
506 if !cfg!(any(test, feature = "testing"))
512 && !(height + node_state.node_id).is_multiple_of(30)
513 {
514 return Ok(());
515 }
516
517 let finalized_hotshot_height = if cfg!(any(test, feature = "testing")) {
521 height
522 } else {
523 match node_state.finalized_hotshot_height().await {
524 Ok(h) => h,
525 Err(err) => {
526 tracing::warn!("failed to get finalized hotshot height: {err:#}");
527 return Ok(());
528 },
529 }
530 };
531
532 let mut tree_height = finalized_hotshot_height;
535 let mut proof_height = finalized_hotshot_height;
536 if version >= EPOCH_REWARD_VERSION {
537 let epoch_height = node_state
538 .epoch_height
539 .context("epoch_height not set in node state")?;
540 if !is_last_block(finalized_hotshot_height, epoch_height) {
541 tree_height = epoch_from_block_number(finalized_hotshot_height, epoch_height)
542 .saturating_sub(1)
543 * epoch_height;
544 if tree_height == 0 {
545 return Ok(());
546 }
547 let mut tx = self.read().await.context("opening read transaction")?;
549 let leaf = tx
550 .get_leaf(LeafId::<SeqTypes>::from(tree_height as usize))
551 .await
552 .context(format!(
553 "leaf at epoch boundary {tree_height} not available"
554 ))?;
555 if leaf.header().version() < EPOCH_REWARD_VERSION {
556 return Ok(());
557 }
558 }
559 proof_height = tree_height;
560 }
561
562 if self.proof_exists(proof_height).await {
563 return Ok(());
564 }
565
566 let permitted_tree = match self.load_reward_merkle_tree_v2(tree_height).await {
567 Ok(tree) => tree,
568 Err(err) => {
569 tracing::warn!(tree_height, "failed to load reward merkle tree: {err:#}");
570 return Ok(());
571 },
572 };
573
574 let tree = permitted_tree.tree;
575 let iter = tree
576 .iter()
577 .filter_map(|(account, balance): (&RewardAccountV2, _)| {
578 let proof = espresso_types::v0_6::RewardAccountProofV2::prove(
579 &tree,
580 (*account).into(),
581 )?;
582 let proof = RewardAccountQueryDataV2 {
583 balance: (*balance).into(),
584 proof: proof.0,
585 };
586 Some((
587 bincode::serialize(&account).ok()?,
588 bincode::serialize(&proof).ok()?,
589 ))
590 });
591
592 if let Err(err) = self.persist_proofs(proof_height, iter).await {
593 tracing::warn!(proof_height, "failed to persist proofs: {err:#}");
594 }
595
596 Ok(())
597 }
598 }
599}
600
601impl CatchupStorage for SqlStorage {
602 async fn get_reward_accounts_v1(
603 &self,
604 instance: &NodeState,
605 height: u64,
606 view: ViewNumber,
607 accounts: &[RewardAccountV1],
608 ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
609 let mut tx = self.read().await.context(format!(
610 "opening transaction to fetch v1 reward account {accounts:?}; height {height}"
611 ))?;
612
613 let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
614 .await
615 .context("getting block height")? as u64;
616 ensure!(
617 block_height > 0,
618 "cannot get accounts for height {height}: no blocks available"
619 );
620
621 if height < block_height {
624 load_v1_reward_accounts(self, height, accounts).await
625 } else {
626 let accounts: Vec<_> = accounts
627 .iter()
628 .map(|acct| RewardAccountV2::from(*acct))
629 .collect();
630 let (state, leaf) = reconstruct_state(
633 instance,
634 self,
635 &mut tx,
636 block_height - 1,
637 view,
638 &[],
639 &accounts,
640 )
641 .await?;
642 Ok((state.reward_merkle_tree_v1, leaf))
643 }
644 }
645
646 async fn get_reward_accounts_v2(
647 &self,
648 instance: &NodeState,
649 height: u64,
650 view: ViewNumber,
651 accounts: &[RewardAccountV2],
652 ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
653 let mut tx = self.read().await.context(format!(
654 "opening transaction to fetch reward account {accounts:?}; height {height}"
655 ))?;
656
657 let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
658 .await
659 .context("getting block height")? as u64;
660 ensure!(
661 block_height > 0,
662 "cannot get accounts for height {height}: no blocks available"
663 );
664
665 if height < block_height {
668 load_reward_merkle_tree_v2(self, height)
669 .await
670 .map(|(permitted_tree, leaf)| (permitted_tree.tree, leaf))
671 } else {
672 let (state, leaf) = reconstruct_state(
675 instance,
676 self,
677 &mut tx,
678 block_height - 1,
679 view,
680 &[],
681 accounts,
682 )
683 .await?;
684 Ok((state.reward_merkle_tree_v2, leaf))
685 }
686 }
687
688 async fn get_accounts(
689 &self,
690 instance: &NodeState,
691 height: u64,
692 view: ViewNumber,
693 accounts: &[FeeAccount],
694 ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
695 let mut tx = self.read().await.context(format!(
696 "opening transaction to fetch account {accounts:?}; height {height}"
697 ))?;
698
699 let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
700 .await
701 .context("getting block height")? as u64;
702 ensure!(
703 block_height > 0,
704 "cannot get accounts for height {height}: no blocks available"
705 );
706
707 if height < block_height {
710 load_accounts(&mut tx, height, accounts).await
711 } else {
712 let (state, leaf) = reconstruct_state(
715 instance,
716 self,
717 &mut tx,
718 block_height - 1,
719 view,
720 accounts,
721 &[],
722 )
723 .await?;
724 Ok((state.fee_merkle_tree, leaf))
725 }
726 }
727
728 async fn get_frontier(
729 &self,
730 instance: &NodeState,
731 height: u64,
732 view: ViewNumber,
733 ) -> anyhow::Result<BlocksFrontier> {
734 let mut tx = self.read().await.context(format!(
735 "opening transaction to fetch frontier at height {height}"
736 ))?;
737
738 let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
739 .await
740 .context("getting block height")? as u64;
741 ensure!(
742 block_height > 0,
743 "cannot get frontier for height {height}: no blocks available"
744 );
745
746 if height < block_height {
749 load_frontier(&mut tx, height).await
750 } else {
751 let (state, _) =
754 reconstruct_state(instance, self, &mut tx, block_height - 1, view, &[], &[])
755 .await?;
756 match state.block_merkle_tree.lookup(height - 1) {
757 LookupResult::Ok(_, proof) => Ok(proof),
758 _ => {
759 bail!(
760 "state snapshot {view:?},{height} was found but does not contain frontier \
761 at height {}; this should not be possible",
762 height - 1
763 );
764 },
765 }
766 }
767 }
768
769 async fn get_chain_config(
770 &self,
771 commitment: Commitment<ChainConfig>,
772 ) -> anyhow::Result<ChainConfig> {
773 let mut tx = self.read().await.context(format!(
774 "opening transaction to fetch chain config {commitment}"
775 ))?;
776 load_chain_config(&mut tx, commitment).await
777 }
778
779 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
780 let mut tx = self
781 .read()
782 .await
783 .context(format!("opening transaction to fetch leaf at {height}"))?;
784 let leaf: Leaf2 = tx
785 .get_leaf((height as usize).into())
786 .await
787 .context(format!("leaf {height} not available"))?
788 .leaf()
789 .clone();
790
791 if leaf.block_header().version() >= NEW_PROTOCOL_VERSION {
794 let cert2: espresso_types::Certificate2<SeqTypes> =
795 tx.load_earliest_cert2(height).await?.context(format!(
796 "no cert2 available for new-protocol leaf at {height}"
797 ))?;
798 let cert2_height = cert2.data.block_number;
799 let mut leaves = vec![leaf];
800 if height < cert2_height {
801 let descendants = tx
802 .get_leaf_range((height as usize + 1)..=cert2_height as usize)
803 .await?;
804 leaves.extend(descendants.into_iter().flatten().map(|l| l.leaf().clone()));
805 }
806 return Ok(leaves);
807 }
808
809 let mut last_leaf = leaf;
811 let mut chain = vec![last_leaf.clone()];
812 let mut h = height + 1;
813
814 loop {
815 let lqd = tx.get_leaf((h as usize).into()).await?;
816 let leaf = lqd.leaf();
817
818 if leaf.justify_qc().view_number() == last_leaf.view_number() {
819 chain.push(leaf.clone());
820 } else {
821 h += 1;
822 continue;
823 }
824
825 if leaf.view_number() == last_leaf.view_number() + 1 {
827 last_leaf = leaf.clone();
828 h += 1;
829 break;
830 }
831 h += 1;
832 last_leaf = leaf.clone();
833 }
834
835 loop {
836 let lqd = tx.get_leaf((h as usize).into()).await?;
837 let leaf = lqd.leaf();
838 if leaf.justify_qc().view_number() == last_leaf.view_number() {
839 chain.push(leaf.clone());
840 break;
841 }
842 h += 1;
843 }
844
845 Ok(chain)
846 }
847
848 async fn load_earliest_cert2(
849 &self,
850 height: u64,
851 ) -> anyhow::Result<Option<espresso_types::Certificate2<SeqTypes>>> {
852 let mut tx = self
853 .read()
854 .await
855 .context("opening transaction to fetch cert2")?;
856 Ok(tx.load_earliest_cert2(height).await?)
857 }
858
859 async fn get_leaf(&self, height: u64) -> anyhow::Result<Leaf2> {
860 let mut tx = self
861 .read()
862 .await
863 .context(format!("opening transaction to fetch leaf at {height}"))?;
864 let lqd = tx
865 .get_leaf((height as usize).into())
866 .await
867 .context(format!("leaf {height} not available"))?;
868 Ok(lqd.leaf().clone())
869 }
870}
871
872impl RewardMerkleTreeDataSource for DataSource {
873 async fn load_v1_reward_account_proof(
874 &self,
875 height: u64,
876 account: RewardAccountV1,
877 ) -> anyhow::Result<RewardAccountQueryDataV1> {
878 self.as_ref()
879 .load_v1_reward_account_proof(height, account)
880 .await
881 }
882
883 fn persist_tree(
884 &self,
885 height: u64,
886 merkle_tree: Vec<u8>,
887 ) -> impl Send + Future<Output = anyhow::Result<()>> {
888 async move { self.as_ref().persist_tree(height, merkle_tree).await }
889 }
890
891 fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
892 async move { self.as_ref().load_tree(height).await }
893 }
894
895 fn load_latest_tree(
896 &self,
897 height: u64,
898 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
899 async move { self.as_ref().load_latest_tree(height).await }
900 }
901
902 fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
903 async move { self.as_ref().garbage_collect(height).await }
904 }
905
906 fn persist_proofs(
907 &self,
908 height: u64,
909 proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
910 ) -> impl Send + Future<Output = anyhow::Result<()>> {
911 async move { self.as_ref().persist_proofs(height, proofs).await }
912 }
913
914 fn load_proof(
915 &self,
916 height: u64,
917 account: Vec<u8>,
918 epoch_height: u64,
919 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
920 async move {
921 self.as_ref()
922 .load_proof(height, account, epoch_height)
923 .await
924 }
925 }
926
927 fn load_latest_proof(
928 &self,
929 account: Vec<u8>,
930 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
931 async move { self.as_ref().load_latest_proof(account).await }
932 }
933
934 fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool> {
935 async move { self.as_ref().proof_exists(height).await }
936 }
937
938 fn persist_reward_proofs(
939 &self,
940 node_state: &NodeState,
941 height: u64,
942 version: Version,
943 ) -> impl Send + Future<Output = anyhow::Result<()>> {
944 async move {
945 self.as_ref()
946 .persist_reward_proofs(node_state, height, version)
947 .await
948 }
949 }
950}
951
952impl CatchupStorage for DataSource {
953 async fn get_accounts(
954 &self,
955 instance: &NodeState,
956 height: u64,
957 view: ViewNumber,
958 accounts: &[FeeAccount],
959 ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
960 self.as_ref()
961 .get_accounts(instance, height, view, accounts)
962 .await
963 }
964
965 async fn get_reward_accounts_v2(
966 &self,
967 instance: &NodeState,
968 height: u64,
969 view: ViewNumber,
970 accounts: &[RewardAccountV2],
971 ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
972 self.as_ref()
973 .get_reward_accounts_v2(instance, height, view, accounts)
974 .await
975 }
976
977 async fn get_reward_accounts_v1(
978 &self,
979 instance: &NodeState,
980 height: u64,
981 view: ViewNumber,
982 accounts: &[RewardAccountV1],
983 ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
984 self.as_ref()
985 .get_reward_accounts_v1(instance, height, view, accounts)
986 .await
987 }
988
989 async fn get_frontier(
990 &self,
991 instance: &NodeState,
992 height: u64,
993 view: ViewNumber,
994 ) -> anyhow::Result<BlocksFrontier> {
995 self.as_ref().get_frontier(instance, height, view).await
996 }
997
998 async fn get_chain_config(
999 &self,
1000 commitment: Commitment<ChainConfig>,
1001 ) -> anyhow::Result<ChainConfig> {
1002 self.as_ref().get_chain_config(commitment).await
1003 }
1004 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
1005 self.as_ref().get_leaf_chain(height).await
1006 }
1007
1008 async fn load_earliest_cert2(
1009 &self,
1010 height: u64,
1011 ) -> anyhow::Result<Option<espresso_types::Certificate2<SeqTypes>>> {
1012 self.as_ref().load_earliest_cert2(height).await
1013 }
1014
1015 async fn get_leaf(&self, height: u64) -> anyhow::Result<Leaf2> {
1016 self.as_ref().get_leaf(height).await
1017 }
1018}
1019
1020#[async_trait]
1021impl ChainConfigPersistence for Transaction<Write> {
1022 async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()> {
1023 let commitment = chain_config.commitment();
1024 let data = bincode::serialize(&chain_config)?;
1025 self.upsert(
1026 "chain_config",
1027 ["commitment", "data"],
1028 ["commitment"],
1029 [(commitment.to_string(), data)],
1030 )
1031 .await
1032 }
1033}
1034
1035impl super::data_source::PruningDataSource for SqlStorage {
1036 async fn get_oldest_block(
1037 &self,
1038 ) -> anyhow::Result<Option<hotshot_query_service::availability::BlockQueryData<SeqTypes>>> {
1039 let mut tx = self
1040 .read()
1041 .await
1042 .context("opening transaction to fetch oldest block")?;
1043 let row = sqlx::query("SELECT MIN(height) AS height FROM header")
1044 .fetch_one(tx.as_mut())
1045 .await
1046 .context("failed to query oldest block height")?;
1047 let height: Option<i64> = row.try_get("height")?;
1048 match height {
1049 None => Ok(None),
1050 Some(h) => {
1051 let h = usize::try_from(h).context("block height out of range")?;
1052 Ok(Some(
1053 tx.get_block(hotshot_query_service::availability::BlockId::<SeqTypes>::from(h))
1054 .await
1055 .context(format!("block {h} not available"))?,
1056 ))
1057 },
1058 }
1059 }
1060
1061 async fn get_oldest_leaf(
1062 &self,
1063 ) -> anyhow::Result<Option<hotshot_query_service::availability::LeafQueryData<SeqTypes>>> {
1064 let mut tx = self
1065 .read()
1066 .await
1067 .context("opening transaction to fetch oldest leaf")?;
1068 let row = sqlx::query("SELECT MIN(height) AS height FROM leaf2")
1069 .fetch_one(tx.as_mut())
1070 .await
1071 .context("failed to query oldest leaf height")?;
1072 let height: Option<i64> = row.try_get("height")?;
1073 match height {
1074 None => Ok(None),
1075 Some(h) => {
1076 let h = usize::try_from(h).context("leaf height out of range")?;
1077 Ok(Some(
1078 tx.get_leaf(LeafId::<SeqTypes>::from(h))
1079 .await
1080 .context(format!("leaf {h} not available"))?,
1081 ))
1082 },
1083 }
1084 }
1085}
1086
1087impl super::data_source::DatabaseMetadataSource for SqlStorage {
1088 async fn get_table_sizes(&self) -> anyhow::Result<Vec<super::data_source::TableSize>> {
1089 let mut tx = self
1090 .read()
1091 .await
1092 .context("opening transaction to fetch table sizes")?;
1093
1094 #[cfg(not(feature = "embedded-db"))]
1095 {
1096 let query = r#"
1097 SELECT
1098 schemaname || '.' || relname AS table_name,
1099 n_live_tup AS row_count,
1100 pg_total_relation_size(relid) AS total_size_bytes
1101 FROM pg_stat_user_tables
1102 ORDER BY pg_total_relation_size(relid) DESC
1103 "#;
1104
1105 let rows = sqlx::query(query)
1106 .fetch_all(tx.as_mut())
1107 .await
1108 .context("failed to query table sizes")?;
1109
1110 let mut table_sizes = Vec::new();
1111 for row in rows {
1112 let table_name: String = row.try_get("table_name")?;
1113 let row_count: i64 = row.try_get("row_count").unwrap_or(-1);
1114 let total_size_bytes: Option<i64> = row.try_get("total_size_bytes").ok();
1115
1116 table_sizes.push(super::data_source::TableSize {
1117 table_name,
1118 row_count,
1119 total_size_bytes,
1120 });
1121 }
1122
1123 Ok(table_sizes)
1124 }
1125
1126 #[cfg(feature = "embedded-db")]
1127 {
1128 let table_names_query = r#"
1130 SELECT name
1131 FROM sqlite_master
1132 WHERE type = 'table'
1133 AND name NOT LIKE 'sqlite_%'
1134 ORDER BY name
1135 "#;
1136
1137 let table_rows = sqlx::query(table_names_query)
1138 .fetch_all(tx.as_mut())
1139 .await
1140 .context("failed to query table names")?;
1141
1142 let mut table_sizes = Vec::new();
1143
1144 for row in table_rows {
1146 let table_name: String = row.try_get("name")?;
1147
1148 let count_query = format!("SELECT COUNT(*) as count FROM \"{}\"", table_name);
1151 let count_row = sqlx::query(&count_query)
1152 .fetch_one(tx.as_mut())
1153 .await
1154 .context(format!(
1155 "failed to query row count for table {}",
1156 table_name
1157 ))?;
1158
1159 let row_count: i64 = count_row.try_get("count").unwrap_or(0);
1160
1161 table_sizes.push(super::data_source::TableSize {
1162 table_name,
1163 row_count,
1164 total_size_bytes: None,
1165 });
1166 }
1167
1168 Ok(table_sizes)
1169 }
1170 }
1171}
1172
1173impl super::data_source::DatabaseMetadataSource for DataSource {
1174 async fn get_table_sizes(&self) -> anyhow::Result<Vec<super::data_source::TableSize>> {
1175 self.as_ref().get_table_sizes().await
1176 }
1177}
1178
1179impl super::data_source::PruningDataSource for DataSource {
1180 async fn get_oldest_block(
1181 &self,
1182 ) -> anyhow::Result<Option<hotshot_query_service::availability::BlockQueryData<SeqTypes>>> {
1183 self.as_ref().get_oldest_block().await
1184 }
1185
1186 async fn get_oldest_leaf(
1187 &self,
1188 ) -> anyhow::Result<Option<hotshot_query_service::availability::LeafQueryData<SeqTypes>>> {
1189 self.as_ref().get_oldest_leaf().await
1190 }
1191}
1192
1193async fn load_frontier<Mode: TransactionMode>(
1194 tx: &mut Transaction<Mode>,
1195 height: u64,
1196) -> anyhow::Result<BlocksFrontier> {
1197 tx.get_path(
1198 Snapshot::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::Index(height),
1199 height
1200 .checked_sub(1)
1201 .ok_or(anyhow::anyhow!("Subtract with overflow ({height})!"))?,
1202 )
1203 .await
1204 .context(format!("fetching frontier at height {height}"))
1205}
1206
1207async fn load_v1_reward_accounts(
1208 db: &SqlStorage,
1209 height: u64,
1210 accounts: &[RewardAccountV1],
1211) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
1212 let mut tx = db
1214 .read()
1215 .await
1216 .with_context(|| "failed to open read transaction")?;
1217
1218 let leaf = tx
1220 .get_leaf(LeafId::<SeqTypes>::from(height as usize))
1221 .await
1222 .context(format!("leaf {height} not available"))?;
1223 let header = leaf.header();
1224
1225 if header.version() < EPOCH_VERSION || header.version() >= DRB_AND_HEADER_UPGRADE_VERSION {
1226 return Ok((
1227 RewardMerkleTreeV1::new(REWARD_MERKLE_TREE_V1_HEIGHT),
1228 leaf.leaf().clone(),
1229 ));
1230 }
1231
1232 let merkle_root = header.reward_merkle_tree_root().unwrap_left();
1234 let mut snapshot = RewardMerkleTreeV1::from_commitment(merkle_root);
1235
1236 let mut join_set = BoundedJoinSet::new(10);
1238
1239 let mut task_id_to_account = HashMap::new();
1241
1242 for account in accounts {
1244 let db_clone = db.clone();
1246 let account_clone = *account;
1247 let header_height = header.height();
1248
1249 let func = async move {
1251 let mut tx = db_clone
1253 .read()
1254 .await
1255 .with_context(|| "failed to open read transaction")?;
1256
1257 let proof = tx
1259 .get_path(
1260 Snapshot::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::Index(
1261 header_height,
1262 ),
1263 account_clone,
1264 )
1265 .await
1266 .with_context(|| {
1267 format!(
1268 "failed to get path for v1 reward account {account_clone:?}; height \
1269 {height}"
1270 )
1271 })?;
1272
1273 Ok::<_, anyhow::Error>(proof)
1274 };
1275
1276 let id = join_set.spawn(func).id();
1278
1279 task_id_to_account.insert(id, account);
1281 }
1282
1283 while let Some(result) = join_set.join_next_with_id().await {
1285 let (id, result) = result.with_context(|| "failed to join task")?;
1287
1288 let proof = result?;
1290
1291 let account = task_id_to_account
1293 .remove(&id)
1294 .with_context(|| "task ID for spawned task not found")?;
1295
1296 match proof.proof.first().with_context(|| {
1297 format!("empty proof for v1 reward account {account:?}; height {height}")
1298 })? {
1299 MerkleNode::Leaf { pos, elem, .. } => {
1300 snapshot.remember(*pos, *elem, proof)?;
1301 },
1302 MerkleNode::Empty => {
1303 snapshot.non_membership_remember(*account, proof)?;
1304 },
1305 _ => {
1306 bail!("invalid proof for v1 reward account {account:?}; height {height}");
1307 },
1308 }
1309 }
1310
1311 Ok((snapshot, leaf.leaf().clone()))
1312}
1313
1314async fn load_reward_merkle_tree_v2(
1316 db: &SqlStorage,
1317 height: u64,
1318) -> anyhow::Result<(PermittedRewardMerkleTreeV2, Leaf2)> {
1319 let mut tx = db
1321 .read()
1322 .await
1323 .with_context(|| "failed to open read transaction")?;
1324
1325 let leaf = tx
1327 .get_leaf(LeafId::<SeqTypes>::from(height as usize))
1328 .await
1329 .with_context(|| format!("leaf {height} not available"))?;
1330
1331 let snapshot = db.load_reward_merkle_tree_v2(height).await?;
1332
1333 Ok((snapshot, leaf.leaf().clone()))
1334}
1335
1336async fn load_accounts<Mode: TransactionMode>(
1337 tx: &mut Transaction<Mode>,
1338 height: u64,
1339 accounts: &[FeeAccount],
1340) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
1341 let leaf = tx
1342 .get_leaf(LeafId::<SeqTypes>::from(height as usize))
1343 .await
1344 .context(format!("leaf {height} not available"))?;
1345 let header = leaf.header();
1346
1347 let mut snapshot = FeeMerkleTree::from_commitment(header.fee_merkle_tree_root());
1348 for account in accounts {
1349 let proof = tx
1350 .get_path(
1351 Snapshot::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::Index(
1352 header.height(),
1353 ),
1354 *account,
1355 )
1356 .await
1357 .context(format!(
1358 "fetching account {account}; height {}",
1359 header.height()
1360 ))?;
1361 match proof.proof.first().context(format!(
1362 "empty proof for account {account}; height {}",
1363 header.height()
1364 ))? {
1365 MerkleNode::Leaf { pos, elem, .. } => {
1366 snapshot.remember(*pos, *elem, proof)?;
1367 },
1368 MerkleNode::Empty => {
1369 snapshot.non_membership_remember(*account, proof)?;
1370 },
1371 _ => {
1372 bail!("Invalid proof");
1373 },
1374 }
1375 }
1376
1377 Ok((snapshot, leaf.leaf().clone()))
1378}
1379
1380async fn load_chain_config<Mode: TransactionMode>(
1381 tx: &mut Transaction<Mode>,
1382 commitment: Commitment<ChainConfig>,
1383) -> anyhow::Result<ChainConfig> {
1384 let (data,) = query_as::<(Vec<u8>,)>("SELECT data from chain_config where commitment = $1")
1385 .bind(commitment.to_string())
1386 .fetch_one(tx.as_mut())
1387 .await
1388 .unwrap();
1389
1390 bincode::deserialize(&data[..]).context("failed to deserialize")
1391}
1392
1393#[tracing::instrument(skip(instance, db, tx))]
1404pub(crate) async fn reconstruct_state<Mode: TransactionMode>(
1405 instance: &NodeState,
1406 db: &SqlStorage,
1407 tx: &mut Transaction<Mode>,
1408 from_height: u64,
1409 to_view: ViewNumber,
1410 fee_accounts: &[FeeAccount],
1411 reward_accounts: &[RewardAccountV2],
1412) -> anyhow::Result<(ValidatedState, Leaf2)> {
1413 tracing::info!("attempting to reconstruct fee state");
1414 let from_leaf = tx
1415 .get_leaf((from_height as usize).into())
1416 .await
1417 .context(format!("leaf {from_height} not available"))?;
1418 let from_leaf: Leaf2 = from_leaf.leaf().clone();
1419 ensure!(
1420 from_leaf.view_number() < to_view,
1421 "state reconstruction: starting state {:?} must be before ending state {to_view:?}",
1422 from_leaf.view_number(),
1423 );
1424
1425 let mut leaves = VecDeque::new();
1427 let to_leaf = get_leaf_from_proposal(tx, "view = $1", &(to_view.u64() as i64))
1428 .await
1429 .context(format!(
1430 "unable to reconstruct state because leaf {to_view:?} is not available"
1431 ))?;
1432 let mut parent = to_leaf.parent_commitment();
1433 tracing::debug!(?to_leaf, ?parent, view = ?to_view, "have required leaf");
1434 leaves.push_front(to_leaf.clone());
1435 while parent != Committable::commit(&from_leaf) {
1436 let leaf = get_leaf_from_proposal(tx, "leaf_hash = $1", &parent.to_string())
1437 .await
1438 .context(format!(
1439 "unable to reconstruct state because leaf {parent} is not available"
1440 ))?;
1441 parent = leaf.parent_commitment();
1442 tracing::debug!(?leaf, ?parent, "have required leaf");
1443 leaves.push_front(leaf);
1444 }
1445
1446 let mut parent = from_leaf;
1448 let mut state = ValidatedState::from_header(parent.block_header());
1449
1450 let mut catchup = NullStateCatchup::default();
1453
1454 let mut fee_accounts = fee_accounts.iter().copied().collect::<HashSet<_>>();
1455 tracing::info!(
1458 "reconstructing fee accounts state for from height {from_height} to view {to_view}"
1459 );
1460
1461 let dependencies =
1462 fee_header_dependencies(&mut catchup, tx, instance, &parent, &leaves).await?;
1463 fee_accounts.extend(dependencies);
1464 let fee_accounts = fee_accounts.into_iter().collect::<Vec<_>>();
1465 state.fee_merkle_tree = load_accounts(tx, from_height, &fee_accounts)
1466 .await
1467 .context("unable to reconstruct state because accounts are not available at origin")?
1468 .0;
1469 ensure!(
1470 state.fee_merkle_tree.commitment() == parent.block_header().fee_merkle_tree_root(),
1471 "loaded fee state does not match parent header"
1472 );
1473
1474 tracing::info!(
1475 "reconstructing reward accounts for from height {from_height} to view {to_view}"
1476 );
1477
1478 let mut reward_accounts = reward_accounts.iter().copied().collect::<HashSet<_>>();
1479
1480 let dependencies = reward_header_dependencies(instance, &leaves).await?;
1483 reward_accounts.extend(dependencies);
1484 let reward_accounts = reward_accounts.into_iter().collect::<Vec<_>>();
1485
1486 match parent.block_header().reward_merkle_tree_root() {
1488 either::Either::Left(expected_root) => {
1489 let accts = reward_accounts
1490 .into_iter()
1491 .map(RewardAccountV1::from)
1492 .collect::<Vec<_>>();
1493 state.reward_merkle_tree_v1 = load_v1_reward_accounts(db, from_height, &accts)
1494 .await
1495 .context(
1496 "unable to reconstruct state because v1 reward accounts are not available at \
1497 origin",
1498 )?
1499 .0;
1500 ensure!(
1501 state.reward_merkle_tree_v1.commitment() == expected_root,
1502 "loaded v1 reward state does not match parent header"
1503 );
1504 },
1505 either::Either::Right(expected_root) => {
1506 let version = parent.block_header().version();
1507 let epoch_height = instance
1508 .epoch_height
1509 .context("epoch_height not set but parent has V2 reward tree")?;
1510
1511 if version >= EPOCH_REWARD_VERSION && !is_last_block(from_height, epoch_height) {
1518 let tree = db
1519 .load_latest_reward_merkle_tree_v2(from_height)
1520 .await
1521 .context("RewardMerkleTreeV2 not available at or below origin")?;
1522 state.reward_merkle_tree_v2 = tree.tree;
1523 } else {
1524 state.reward_merkle_tree_v2 = load_reward_merkle_tree_v2(db, from_height)
1525 .await
1526 .context("RewardMerkleTreeV2 not available at origin")?
1527 .0
1528 .tree;
1529 }
1530 ensure!(
1531 state.reward_merkle_tree_v2.commitment() == expected_root,
1532 "loaded reward state does not match parent header"
1533 );
1534 },
1535 }
1536
1537 let frontier = load_frontier(tx, from_height)
1539 .await
1540 .context("unable to reconstruct state because frontier is not available at origin")?;
1541 match frontier
1542 .proof
1543 .first()
1544 .context("empty proof for frontier at origin")?
1545 {
1546 MerkleNode::Leaf { pos, elem, .. } => state
1547 .block_merkle_tree
1548 .remember(*pos, *elem, frontier)
1549 .context("failed to remember frontier")?,
1550 _ => bail!("invalid frontier proof"),
1551 }
1552
1553 for proposal in leaves {
1555 state = compute_state_update(&state, instance, &catchup, &parent, &proposal)
1556 .await
1557 .context(format!(
1558 "unable to reconstruct state because state update {} failed",
1559 proposal.height(),
1560 ))?
1561 .0;
1562 parent = proposal;
1563 }
1564
1565 tracing::info!(from_height, ?to_view, "successfully reconstructed state");
1566 Ok((state, to_leaf))
1567}
1568
1569async fn fee_header_dependencies<Mode: TransactionMode>(
1576 catchup: &mut NullStateCatchup,
1577 tx: &mut Transaction<Mode>,
1578 instance: &NodeState,
1579 mut parent: &Leaf2,
1580 leaves: impl IntoIterator<Item = &Leaf2>,
1581) -> anyhow::Result<HashSet<FeeAccount>> {
1582 let mut accounts = HashSet::default();
1583
1584 for proposal in leaves {
1585 let header = proposal.block_header();
1586 let height = header.height();
1587 let view = proposal.view_number();
1588 tracing::debug!(height, ?view, "fetching dependencies for proposal");
1589
1590 let header_cf = header.chain_config();
1591 let chain_config = if header_cf.commit() == instance.chain_config.commit() {
1592 instance.chain_config
1593 } else {
1594 match header_cf.resolve() {
1595 Some(cf) => cf,
1596 None => {
1597 tracing::info!(
1598 height,
1599 ?view,
1600 commit = %header_cf.commit(),
1601 "chain config not available, attempting to load from storage",
1602 );
1603 let cf = load_chain_config(tx, header_cf.commit())
1604 .await
1605 .context(format!(
1606 "loading chain config {} for header {},{:?}",
1607 header_cf.commit(),
1608 header.height(),
1609 proposal.view_number()
1610 ))?;
1611
1612 catchup.add_chain_config(cf);
1615 cf
1616 },
1617 }
1618 };
1619
1620 accounts.insert(chain_config.fee_recipient);
1621 accounts.extend(
1622 get_l1_deposits(instance, header, parent, chain_config.fee_contract)
1623 .await
1624 .into_iter()
1625 .map(|fee| fee.account()),
1626 );
1627 accounts.extend(header.fee_info().accounts());
1628 parent = proposal;
1629 }
1630 Ok(accounts)
1631}
1632
1633async fn reward_header_dependencies(
1637 instance: &NodeState,
1638 leaves: impl IntoIterator<Item = &Leaf2>,
1639) -> anyhow::Result<HashSet<RewardAccountV2>> {
1640 let mut reward_accounts = HashSet::default();
1641 let epoch_height = instance.epoch_height;
1642
1643 let Some(epoch_height) = epoch_height else {
1644 tracing::info!("epoch height is not set. returning empty reward_header_dependencies");
1645 return Ok(HashSet::new());
1646 };
1647
1648 let coordinator = instance.coordinator.clone();
1649 let first_epoch = coordinator.membership().first_epoch();
1650 for proposal in leaves {
1652 let header = proposal.block_header();
1653
1654 let height = header.height();
1655 let view = proposal.view_number();
1656 tracing::debug!(height, ?view, "fetching dependencies for proposal");
1657
1658 let version = header.version();
1659 if version < EPOCH_VERSION || version >= EPOCH_REWARD_VERSION {
1661 continue;
1662 }
1663
1664 let first_epoch = first_epoch.context("first epoch not found")?;
1665
1666 let proposal_epoch = EpochNumber::new(epoch_from_block_number(height, epoch_height));
1667
1668 if proposal_epoch <= first_epoch + 1 {
1670 continue;
1671 }
1672
1673 let epoch_membership = match coordinator.membership_for_epoch(Some(proposal_epoch)) {
1674 Ok(e) => e,
1675 Err(err) => {
1676 tracing::info!(
1677 "failed to get membership for epoch={proposal_epoch:?}. err={err:#}"
1678 );
1679
1680 coordinator
1681 .wait_for_catchup(proposal_epoch)
1682 .await
1683 .context(format!("failed to catchup for epoch={proposal_epoch}"))?
1684 },
1685 };
1686
1687 let snapshot = epoch_membership
1688 .snapshot()
1689 .with_context(|| format!("no committee for epoch={proposal_epoch}"))?;
1690 let leader = snapshot.lookup_leader(proposal.view_number())?;
1691 let validator = snapshot.validator_config(&leader)?;
1692
1693 reward_accounts.insert(RewardAccountV2(validator.account));
1694
1695 let delegators: Vec<RewardAccountV2> = validator
1696 .delegators
1697 .keys()
1698 .map(|d| RewardAccountV2(*d))
1699 .collect();
1700
1701 reward_accounts.extend(delegators);
1702 }
1703 Ok(reward_accounts)
1704}
1705
1706async fn get_leaf_from_proposal<Mode, P>(
1707 tx: &mut Transaction<Mode>,
1708 where_clause: &str,
1709 param: P,
1710) -> anyhow::Result<Leaf2>
1711where
1712 P: Type<Db> + for<'q> Encode<'q, Db>,
1713{
1714 let (data,) = query_as::<(Vec<u8>,)>(&format!(
1715 "SELECT data FROM quorum_proposals2 WHERE {where_clause} LIMIT 1",
1716 ))
1717 .bind(param)
1718 .fetch_one(tx.as_mut())
1719 .await?;
1720 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1721 bincode::deserialize(&data)?;
1722 Ok(Leaf2::from_quorum_proposal(&proposal.data))
1723}
1724
1725#[cfg(any(test, feature = "testing"))]
1726pub(crate) mod impl_testable_data_source {
1727
1728 use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
1729 use light_client::state::LightClientOptions;
1730
1731 use super::*;
1732 use crate::api::{self, data_source::testing::TestableSequencerDataSource, options::Query};
1733
1734 pub fn tmp_options(db: &TmpDb) -> Options {
1735 #[cfg(not(feature = "embedded-db"))]
1736 {
1737 let opt = crate::persistence::sql::PostgresOptions {
1738 port: Some(db.port()),
1739 host: Some(db.host()),
1740 user: Some("postgres".into()),
1741 password: Some("password".into()),
1742 ..Default::default()
1743 };
1744
1745 opt.into()
1746 }
1747
1748 #[cfg(feature = "embedded-db")]
1749 {
1750 let opt = crate::persistence::sql::SqliteOptions { path: db.path() };
1751 opt.into()
1752 }
1753 }
1754
1755 #[async_trait]
1756 impl TestableSequencerDataSource for DataSource {
1757 type Storage = TmpDb;
1758
1759 async fn create_storage() -> Self::Storage {
1760 TmpDb::init().await
1761 }
1762
1763 fn persistence_options(storage: &Self::Storage) -> Self::Options {
1764 tmp_options(storage)
1765 }
1766
1767 fn leaf_only_ds_options(
1768 storage: &Self::Storage,
1769 opt: api::Options,
1770 ) -> anyhow::Result<api::Options> {
1771 let mut ds_opts = tmp_options(storage);
1772 ds_opts.lightweight = true;
1773 Ok(opt.query_sql(Default::default(), ds_opts))
1774 }
1775
1776 fn options(storage: &Self::Storage, opt: api::Options) -> api::Options {
1777 opt.query_sql(
1778 Query {
1779 light_client: LightClientOptions {
1780 decaf: true,
1782 ..Default::default()
1783 },
1784 ..Default::default()
1785 },
1786 tmp_options(storage),
1787 )
1788 }
1789 }
1790}
1791
1792#[cfg(test)]
1793mod tests {
1794 use alloy::primitives::Address;
1795 use espresso_types::{
1796 v0_3::RewardAmount,
1797 v0_4::{REWARD_MERKLE_TREE_V2_HEIGHT, RewardAccountV2, RewardMerkleTreeV2},
1798 };
1799 use hotshot_query_service::{
1800 data_source::{
1801 Transaction, VersionedDataSource,
1802 sql::Config,
1803 storage::{
1804 MerklizedStateStorage, UpdateAvailabilityStorage,
1805 sql::{
1806 SqlStorage, StorageConnectionType, Transaction as SqlTransaction, Write,
1807 testing::TmpDb,
1808 },
1809 },
1810 },
1811 merklized_state::{MerklizedState, Snapshot, UpdateStateData},
1812 };
1813 use jf_merkle_tree_compat::{
1814 LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme,
1815 };
1816 use light_client::testing::{leaf_chain, leaf_chain_with_upgrade};
1817 use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION, Upgrade};
1818
1819 use super::impl_testable_data_source::tmp_options;
1820 use crate::{SeqTypes, api::RewardMerkleTreeDataSource};
1821
1822 fn make_reward_account(i: usize) -> RewardAccountV2 {
1823 let mut addr_bytes = [0u8; 20];
1824 addr_bytes[16..20].copy_from_slice(&(i as u32).to_be_bytes());
1825 RewardAccountV2(Address::from(addr_bytes))
1826 }
1827
1828 async fn insert_test_header(
1829 tx: &mut SqlTransaction<Write>,
1830 block_height: u64,
1831 reward_tree: &RewardMerkleTreeV2,
1832 ) {
1833 let reward_commitment = serde_json::to_value(reward_tree.commitment()).unwrap();
1834 let test_data = serde_json::json!({
1835 "block_merkle_tree_root": format!("block_root_{}", block_height),
1836 "fee_merkle_tree_root": format!("fee_root_{}", block_height),
1837 "fields": {
1838 RewardMerkleTreeV2::header_state_commitment_field(): reward_commitment
1839 }
1840 });
1841 tx.upsert(
1842 "header",
1843 [
1844 "height",
1845 "hash",
1846 "payload_hash",
1847 "timestamp",
1848 "data",
1849 "ns_table",
1850 ],
1851 ["height"],
1852 [(
1853 block_height as i64,
1854 format!("hash_{}", block_height),
1855 format!("payload_{}", block_height),
1856 block_height as i64,
1857 test_data,
1858 "ns_table".to_string(),
1859 )],
1860 )
1861 .await
1862 .unwrap();
1863 }
1864
1865 async fn batch_insert_proofs(
1866 tx: &mut SqlTransaction<Write>,
1867 reward_tree: &RewardMerkleTreeV2,
1868 accounts: &[RewardAccountV2],
1869 block_height: u64,
1870 ) {
1871 let proofs_and_paths: Vec<_> = accounts
1872 .iter()
1873 .map(|account| {
1874 let proof = match reward_tree.universal_lookup(*account) {
1875 LookupResult::Ok(_, proof) => proof,
1876 LookupResult::NotInMemory => panic!("account not in memory"),
1877 LookupResult::NotFound(proof) => proof,
1878 };
1879 let traversal_path = <RewardAccountV2 as ToTraversalPath<
1880 { RewardMerkleTreeV2::ARITY },
1881 >>::to_traversal_path(
1882 account, reward_tree.height()
1883 );
1884 (proof, traversal_path)
1885 })
1886 .collect();
1887
1888 UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::insert_merkle_nodes_batch(
1889 tx,
1890 proofs_and_paths,
1891 block_height,
1892 )
1893 .await
1894 .expect("failed to batch insert proofs");
1895 }
1896
1897 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1898 async fn test_reward_accounts_batch_insertion() {
1899 let db = TmpDb::init().await;
1906 let opt = tmp_options(&db);
1907 let cfg = Config::try_from(&opt).expect("failed to create config from options");
1908 let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
1909 .await
1910 .expect("failed to connect to storage");
1911
1912 let num_initial_accounts = 1000usize;
1913
1914 let initial_accounts: Vec<RewardAccountV2> =
1915 (0..num_initial_accounts).map(make_reward_account).collect();
1916
1917 tracing::info!(
1918 "Height 1: Inserting {} initial accounts",
1919 num_initial_accounts
1920 );
1921
1922 let mut reward_tree_h1 = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT);
1923
1924 for (i, account) in initial_accounts.iter().enumerate() {
1925 let reward_amount = RewardAmount::from(((i + 1) * 1000) as u64);
1926 reward_tree_h1.update(*account, reward_amount).unwrap();
1927 }
1928
1929 let mut tx = storage.write().await.unwrap();
1930 insert_test_header(&mut tx, 1, &reward_tree_h1).await;
1931 batch_insert_proofs(&mut tx, &reward_tree_h1, &initial_accounts, 1).await;
1932
1933 UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 1)
1934 .await
1935 .unwrap();
1936 tx.commit().await.unwrap();
1937
1938 tracing::info!("Height 2: Updating balances and adding new accounts");
1939
1940 let mut reward_tree_h2 = reward_tree_h1.clone();
1941
1942 let updated_accounts_h2: Vec<RewardAccountV2> = (0..100).map(make_reward_account).collect();
1944 for (i, account) in updated_accounts_h2.iter().enumerate() {
1945 let new_reward = RewardAmount::from(((i + 1) * 2000) as u64);
1946 reward_tree_h2.update(*account, new_reward).unwrap();
1947 }
1948
1949 let new_accounts_h2: Vec<RewardAccountV2> = (1000..1100).map(make_reward_account).collect();
1951 for (i, account) in new_accounts_h2.iter().enumerate() {
1952 let reward_amount = RewardAmount::from(((i + 1001) * 500) as u64);
1953 reward_tree_h2.update(*account, reward_amount).unwrap();
1954 }
1955
1956 let mut changed_accounts_h2 = updated_accounts_h2.clone();
1957 changed_accounts_h2.extend(new_accounts_h2.clone());
1958
1959 let mut tx = storage.write().await.unwrap();
1960 insert_test_header(&mut tx, 2, &reward_tree_h2).await;
1961 batch_insert_proofs(&mut tx, &reward_tree_h2, &changed_accounts_h2, 2).await;
1962
1963 UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 2)
1964 .await
1965 .unwrap();
1966 tx.commit().await.unwrap();
1967
1968 tracing::info!("Height 3: More balance updates");
1969
1970 let mut reward_tree_h3 = reward_tree_h2.clone();
1971
1972 let updated_accounts_h3: Vec<RewardAccountV2> =
1974 (500..600).map(make_reward_account).collect();
1975 for (i, account) in updated_accounts_h3.iter().enumerate() {
1976 let new_reward = RewardAmount::from(((500 + i + 1) * 3000) as u64);
1977 reward_tree_h3.update(*account, new_reward).unwrap();
1978 }
1979
1980 let mut tx = storage.write().await.unwrap();
1981 insert_test_header(&mut tx, 3, &reward_tree_h3).await;
1982 batch_insert_proofs(&mut tx, &reward_tree_h3, &updated_accounts_h3, 3).await;
1983
1984 UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 3)
1985 .await
1986 .unwrap();
1987 tx.commit().await.unwrap();
1988
1989 tracing::info!("Verifying all account proofs at each height");
1990
1991 let snapshot_h1 =
1994 Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(1);
1995 for i in 0..num_initial_accounts {
1996 let account = make_reward_account(i);
1997 let proof = storage
1998 .read()
1999 .await
2000 .unwrap()
2001 .get_path(snapshot_h1, account)
2002 .await
2003 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h1: {e}"));
2004
2005 let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
2006 let actual_reward = proof.elem().expect("account should exist");
2007 assert_eq!(*actual_reward, expected_reward,);
2008
2009 assert!(
2010 RewardMerkleTreeV2::verify(reward_tree_h1.commitment(), account, proof)
2011 .unwrap()
2012 .is_ok(),
2013 );
2014 }
2015 tracing::info!("Verified height=1 {num_initial_accounts} accounts with proofs",);
2016
2017 for i in 1000..1100 {
2019 let account = make_reward_account(i);
2020 let proof = storage
2021 .read()
2022 .await
2023 .unwrap()
2024 .get_path(snapshot_h1, account)
2025 .await
2026 .unwrap();
2027 assert!(proof.elem().is_none(),);
2028
2029 assert!(
2031 RewardMerkleTreeV2::non_membership_verify(
2032 reward_tree_h1.commitment(),
2033 account,
2034 proof
2035 )
2036 .unwrap(),
2037 );
2038 }
2039 tracing::info!("Height 1: Verified 100 non-membership proofs");
2040
2041 let snapshot_h2 =
2043 Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(2);
2044
2045 for i in 0..100 {
2047 let account = make_reward_account(i);
2048 let proof = storage
2049 .read()
2050 .await
2051 .unwrap()
2052 .get_path(snapshot_h2, account)
2053 .await
2054 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
2055
2056 let expected_reward = RewardAmount::from(((i + 1) * 2000) as u64);
2057 let actual_reward = proof.elem().expect("account should exist");
2058 assert_eq!(*actual_reward, expected_reward,);
2059
2060 assert!(
2061 RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
2062 .unwrap()
2063 .is_ok(),
2064 );
2065 }
2066
2067 for i in 100..1000 {
2069 let account = make_reward_account(i);
2070 let proof = storage
2071 .read()
2072 .await
2073 .unwrap()
2074 .get_path(snapshot_h2, account)
2075 .await
2076 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
2077
2078 let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
2079 let actual_reward = proof.elem().expect("account should exist");
2080 assert_eq!(*actual_reward, expected_reward,);
2081
2082 assert!(
2083 RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
2084 .unwrap()
2085 .is_ok(),
2086 );
2087 }
2088
2089 for i in 1000..1100 {
2092 let account = make_reward_account(i);
2093 let proof = storage
2094 .read()
2095 .await
2096 .unwrap()
2097 .get_path(snapshot_h2, account)
2098 .await
2099 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
2100
2101 let expected_reward = RewardAmount::from(((i + 1) * 500) as u64);
2102 let actual_reward = proof.elem().expect("account should exist");
2103 assert_eq!(*actual_reward, expected_reward,);
2104
2105 assert!(
2106 RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
2107 .unwrap()
2108 .is_ok(),
2109 );
2110 }
2111 tracing::info!("Height 2: Verified all 1100 accounts with proofs");
2112
2113 let snapshot_h3 =
2115 Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(3);
2116
2117 for i in 0..100 {
2119 let account = make_reward_account(i);
2120 let proof = storage
2121 .read()
2122 .await
2123 .unwrap()
2124 .get_path(snapshot_h3, account)
2125 .await
2126 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
2127
2128 let expected_reward = RewardAmount::from(((i + 1) * 2000) as u64);
2129 let actual_reward = proof.elem().expect("account should exist");
2130 assert_eq!(*actual_reward, expected_reward,);
2131
2132 assert!(
2133 RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
2134 .unwrap()
2135 .is_ok(),
2136 );
2137 }
2138
2139 for i in 100..500 {
2140 let account = make_reward_account(i);
2141 let proof = storage
2142 .read()
2143 .await
2144 .unwrap()
2145 .get_path(snapshot_h3, account)
2146 .await
2147 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
2148
2149 let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
2150 let actual_reward = proof.elem().expect("account should exist");
2151 assert_eq!(*actual_reward, expected_reward,);
2152
2153 assert!(
2154 RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
2155 .unwrap()
2156 .is_ok(),
2157 );
2158 }
2159
2160 for i in 500..600 {
2162 let account = make_reward_account(i);
2163 let proof = storage
2164 .read()
2165 .await
2166 .unwrap()
2167 .get_path(snapshot_h3, account)
2168 .await
2169 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
2170
2171 let expected_reward = RewardAmount::from(((i + 1) * 3000) as u64);
2172 let actual_reward = proof.elem().expect("account should exist");
2173 assert_eq!(*actual_reward, expected_reward,);
2174
2175 assert!(
2176 RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
2177 .unwrap()
2178 .is_ok(),
2179 );
2180 }
2181
2182 for i in 600..1000 {
2184 let account = make_reward_account(i);
2185 let proof = storage
2186 .read()
2187 .await
2188 .unwrap()
2189 .get_path(snapshot_h3, account)
2190 .await
2191 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
2192
2193 let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
2194 let actual_reward = proof.elem().expect("account should exist");
2195 assert_eq!(*actual_reward, expected_reward,);
2196
2197 assert!(
2198 RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
2199 .unwrap()
2200 .is_ok(),
2201 );
2202 }
2203
2204 for i in 1000..1100 {
2206 let account = make_reward_account(i);
2207 let proof = storage
2208 .read()
2209 .await
2210 .unwrap()
2211 .get_path(snapshot_h3, account)
2212 .await
2213 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
2214
2215 let expected_reward = RewardAmount::from(((i + 1) * 500) as u64);
2216 let actual_reward = proof.elem().expect("account should exist");
2217 assert_eq!(*actual_reward, expected_reward,);
2218
2219 assert!(
2220 RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
2221 .unwrap()
2222 .is_ok(),
2223 );
2224 }
2225 tracing::info!("Height 3: Verified all 1100 accounts with proofs");
2226
2227 for i in 1100..1110 {
2229 let account = make_reward_account(i);
2230 let proof = storage
2231 .read()
2232 .await
2233 .unwrap()
2234 .get_path(snapshot_h3, account)
2235 .await
2236 .unwrap();
2237
2238 assert!(
2239 proof.elem().is_none(),
2240 "Account {i} should not exist at height 3"
2241 );
2242
2243 assert!(
2244 RewardMerkleTreeV2::non_membership_verify(
2245 reward_tree_h3.commitment(),
2246 account,
2247 proof
2248 )
2249 .unwrap(),
2250 );
2251 }
2252 tracing::info!("Height 3: Verified 10 non-membership proofs");
2253 }
2254
2255 #[tokio::test]
2256 #[test_log::test]
2257 async fn test_merkle_proof_gc() {
2258 let db = TmpDb::init().await;
2259 let opt = tmp_options(&db);
2260 let cfg = Config::try_from(&opt).expect("failed to create config from options");
2261 let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2262 .await
2263 .expect("failed to connect to storage");
2264
2265 let account = vec![0; 32];
2266
2267 let leaves = leaf_chain(0..=2, DRB_AND_HEADER_UPGRADE_VERSION).await;
2269 let mut tx = storage.write().await.unwrap();
2270 for leaf in &leaves {
2271 tx.insert_leaf(leaf).await.unwrap();
2272 }
2273 Transaction::commit(tx).await.unwrap();
2274
2275 for h in 0..=2 {
2277 storage
2278 .persist_proofs(h, [(account.clone(), h.to_le_bytes().to_vec())].into_iter())
2279 .await
2280 .unwrap();
2281 }
2282
2283 storage.garbage_collect(2).await.unwrap();
2285
2286 assert_eq!(
2288 storage.load_proof(2, account.clone(), 0).await.unwrap(),
2289 2u64.to_le_bytes()
2290 );
2291
2292 for h in 0..2 {
2294 let err = storage.load_proof(h, account.clone(), 0).await.unwrap_err();
2295 assert!(err.to_string().contains("Missing proof"), "{err:#}");
2296 }
2297
2298 storage.garbage_collect(3).await.unwrap();
2300 let err = storage.load_proof(2, account, 0).await.unwrap_err();
2301 assert!(err.to_string().contains("Missing proof"), "{err:#}");
2302 }
2303
2304 #[tokio::test]
2305 #[test_log::test]
2306 async fn test_load_proof_v5_epoch_boundary() {
2307 let db = TmpDb::init().await;
2308 let opt = tmp_options(&db);
2309 let cfg = Config::try_from(&opt).expect("failed to create config from options");
2310 let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2311 .await
2312 .expect("failed to connect to storage");
2313
2314 let epoch_height = 10u64;
2315 let account = vec![0; 32];
2316
2317 let leaves = leaf_chain(0..=15, EPOCH_REWARD_VERSION).await;
2319 let mut tx = storage.write().await.unwrap();
2320 for leaf in &leaves {
2321 tx.insert_leaf(leaf).await.unwrap();
2322 }
2323 Transaction::commit(tx).await.unwrap();
2324
2325 let boundary_proof = b"proof_at_10".to_vec();
2327 {
2328 let mut tx = storage.write().await.unwrap();
2329 tx.upsert(
2330 "reward_merkle_tree_v2_proofs",
2331 ["height", "account", "proof"],
2332 ["height", "account"],
2333 [(10i64, account.clone(), boundary_proof.clone())],
2334 )
2335 .await
2336 .unwrap();
2337 Transaction::commit(tx).await.unwrap();
2338 }
2339
2340 assert_eq!(
2342 storage
2343 .load_proof(10, account.clone(), epoch_height)
2344 .await
2345 .unwrap(),
2346 boundary_proof,
2347 );
2348
2349 assert_eq!(
2352 storage
2353 .load_proof(15, account.clone(), epoch_height)
2354 .await
2355 .unwrap(),
2356 boundary_proof,
2357 );
2358 }
2359
2360 #[tokio::test]
2361 #[test_log::test]
2362 async fn test_load_proof_v4_to_v5_upgrade_boundary() {
2363 let db = TmpDb::init().await;
2364 let opt = tmp_options(&db);
2365 let cfg = Config::try_from(&opt).expect("failed to create config from options");
2366 let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2367 .await
2368 .expect("failed to connect to storage");
2369
2370 let epoch_height = 10u64;
2371 let account = vec![0; 32];
2372
2373 let upgrade = Upgrade::new(DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION);
2376 let leaves = leaf_chain_with_upgrade(0..=15, 11, upgrade).await;
2377 {
2378 let mut tx = storage.write().await.unwrap();
2379 for leaf in &leaves {
2380 tx.insert_leaf(leaf).await.unwrap();
2381 }
2382 Transaction::commit(tx).await.unwrap();
2383 }
2384
2385 storage
2388 .load_proof(15, account.clone(), epoch_height)
2389 .await
2390 .unwrap_err();
2391
2392 let v4_proof = b"v4_proof_at_5".to_vec();
2393 {
2394 let mut tx = storage.write().await.unwrap();
2395 tx.upsert(
2396 "reward_merkle_tree_v2_proofs",
2397 ["height", "account", "proof"],
2398 ["height", "account"],
2399 [(5i64, account.clone(), v4_proof.clone())],
2400 )
2401 .await
2402 .unwrap();
2403 Transaction::commit(tx).await.unwrap();
2404 }
2405 assert_eq!(
2406 storage.load_proof(5, account.clone(), 0).await.unwrap(),
2407 v4_proof,
2408 );
2409 }
2410
2411 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2412 async fn test_get_table_sizes() {
2413 use super::super::data_source::DatabaseMetadataSource;
2414
2415 let db = TmpDb::init().await;
2416 let opt = tmp_options(&db);
2417 let cfg = Config::try_from(&opt).expect("failed to create config from options");
2418 let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2419 .await
2420 .expect("failed to connect to storage");
2421
2422 let mut tx = storage.write().await.unwrap();
2424
2425 let reward_tree = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT);
2427 insert_test_header(&mut tx, 1, &reward_tree).await;
2428
2429 tx.commit().await.unwrap();
2430
2431 let table_sizes = storage
2433 .get_table_sizes()
2434 .await
2435 .expect("get_table_sizes should succeed");
2436
2437 assert!(
2439 !table_sizes.is_empty(),
2440 "should have at least one table in the database"
2441 );
2442 }
2443}