1use std::collections::{HashMap, HashSet, VecDeque};
2
3use anyhow::{Context, bail, ensure};
4use async_trait::async_trait;
5use committable::{Commitment, Committable};
6use espresso_types::{
7 BlockMerkleTree, ChainConfig, FeeAccount, FeeMerkleTree, Leaf2, NodeState, ValidatedState,
8 get_l1_deposits,
9 v0_1::IterableFeeInfo,
10 v0_3::{
11 REWARD_MERKLE_TREE_V1_HEIGHT, RewardAccountProofV1, RewardAccountQueryDataV1,
12 RewardAccountV1, RewardMerkleTreeV1,
13 },
14 v0_4::{PermittedRewardMerkleTreeV2, RewardAccountV2, RewardMerkleTreeV2},
15 v0_6::RewardAccountQueryDataV2,
16};
17use futures::future::Future;
18use hotshot::traits::ValidatedState as _;
19use hotshot_query_service::{
20 Resolvable,
21 availability::LeafId,
22 data_source::{
23 VersionedDataSource,
24 sql::{Config, SqlDataSource, Transaction},
25 storage::{
26 AvailabilityStorage, MerklizedStateStorage, NodeStorage, SqlStorage,
27 pruning::PrunerConfig,
28 sql::{Db, TransactionMode, Write, query_as},
29 },
30 },
31 merklized_state::Snapshot,
32};
33use hotshot_types::{
34 data::{EpochNumber, QuorumProposalWrapper, ViewNumber},
35 message::Proposal,
36 utils::{epoch_from_block_number, is_last_block},
37 vote::HasViewNumber,
38};
39use jf_merkle_tree_compat::{
40 ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, LookupResult,
41 MerkleTreeScheme, prelude::MerkleNode,
42};
43use sqlx::{Encode, Row, Type};
44use vbs::version::Version;
45use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION, EPOCH_VERSION};
46
47use super::{
48 BlocksFrontier,
49 data_source::{Provider, SequencerDataSource},
50};
51use crate::{
52 SeqTypes,
53 api::RewardMerkleTreeDataSource,
54 catchup::{CatchupStorage, NullStateCatchup},
55 persistence::{ChainConfigPersistence, sql::Options},
56 state::compute_state_update,
57 util::BoundedJoinSet,
58};
59
60pub type DataSource = SqlDataSource<SeqTypes, Provider>;
61
62#[async_trait]
63impl SequencerDataSource for DataSource {
64 type Options = Options;
65
66 async fn create(opt: Self::Options, provider: Provider, reset: bool) -> anyhow::Result<Self> {
67 let fetch_limit = opt.fetch_rate_limit;
68 let active_fetch_delay = opt.active_fetch_delay;
69 let chunk_fetch_delay = opt.chunk_fetch_delay;
70 let mut cfg = Config::try_from(&opt)?;
71
72 if reset {
73 cfg = cfg.reset_schema();
74 }
75
76 let mut builder = cfg.builder(provider).await?;
77
78 if let Some(limit) = fetch_limit {
79 builder = builder.with_rate_limit(limit);
80 }
81
82 if opt.lightweight {
83 tracing::warn!("enabling light weight mode..");
84 builder = builder.leaf_only();
85 }
86
87 if let Some(delay) = active_fetch_delay {
88 builder = builder.with_active_fetch_delay(delay);
89 }
90 if let Some(delay) = chunk_fetch_delay {
91 builder = builder.with_chunk_fetch_delay(delay);
92 }
93 if let Some(chunk_size) = opt.sync_status_chunk_size {
94 builder = builder.with_sync_status_chunk_size(chunk_size);
95 }
96 if let Some(ttl) = opt.sync_status_ttl {
97 builder = builder.with_sync_status_ttl(ttl);
98 }
99 if let Some(chunk_size) = opt.proactive_scan_chunk_size {
100 builder = builder.with_proactive_range_chunk_size(chunk_size);
101 }
102 if let Some(interval) = opt.proactive_scan_interval {
103 builder = builder.with_proactive_interval(interval);
104 }
105 if opt.disable_proactive_fetching {
106 builder = builder.disable_proactive_fetching();
107 }
108
109 builder.build().await
110 }
111}
112
113impl RewardMerkleTreeDataSource for SqlStorage {
114 async fn load_v1_reward_account_proof(
115 &self,
116 height: u64,
117 account: RewardAccountV1,
118 ) -> anyhow::Result<RewardAccountQueryDataV1> {
119 let mut tx = self.read().await.context(format!(
120 "opening transaction to fetch v1 reward account {account:?}; height {height}"
121 ))?;
122
123 let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
124 .await
125 .context("getting block height")? as u64;
126 ensure!(
127 block_height > 0,
128 "cannot get accounts for height {height}: no blocks available"
129 );
130
131 if height < block_height {
134 let (tree, _) = load_v1_reward_accounts(self, height, &[account])
135 .await
136 .with_context(|| {
137 format!("failed to load v1 reward account {account:?} at height {height}")
138 })?;
139
140 let (proof, balance) = RewardAccountProofV1::prove(&tree, account.into())
141 .with_context(|| {
142 format!("reward account {account:?} not available at height {height}")
143 })?;
144
145 Ok(RewardAccountQueryDataV1 { balance, proof })
146 } else {
147 bail!(
148 "requested height {height} is not yet available (latest block height: \
149 {block_height})"
150 );
151 }
152 }
153
154 fn persist_tree(
155 &self,
156 height: u64,
157 merkle_tree: Vec<u8>,
158 ) -> impl Send + Future<Output = anyhow::Result<()>> {
159 async move {
160 let mut tx = self
161 .write()
162 .await
163 .context("opening transaction for reward merkle tree v2")?;
164
165 tx.upsert(
166 "reward_merkle_tree_v2_data",
167 ["height", "balances"],
168 ["height"],
169 [(height as i64, merkle_tree)],
170 )
171 .await?;
172
173 hotshot_query_service::data_source::Transaction::commit(tx)
174 .await
175 .context("Transaction to store reward merkle tree v2 failed.")
176 }
177 }
178
179 fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
180 async move {
181 let mut tx = self
182 .read()
183 .await
184 .context("opening transaction for state update")?;
185
186 let row = sqlx::query(
187 r#"
188 SELECT balances
189 FROM reward_merkle_tree_v2_data
190 WHERE height = $1
191 "#,
192 )
193 .bind(height as i64)
194 .fetch_optional(tx.as_mut())
195 .await?
196 .context(format!(
197 "No reward merkle tree for height {} in storage",
198 height
199 ))?;
200
201 row.try_get::<Vec<u8>, _>("balances")
202 .context("Missing field balances from row; this should never happen")
203 }
204 }
205
206 fn persist_proofs(
207 &self,
208 height: u64,
209 proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
210 ) -> impl Send + Future<Output = anyhow::Result<()>> {
211 async move {
212 let mut iter = proofs.map(|(account, proof)| (height as i64, account, proof));
213
214 loop {
215 let mut chunk = Vec::with_capacity(20);
216
217 for _ in 0..20 {
218 let Some(row) = iter.next() else {
219 continue;
220 };
221 chunk.push(row);
222 }
223
224 if chunk.is_empty() {
225 break;
226 }
227
228 let mut tx = self
229 .write()
230 .await
231 .context("opening transaction for state update")?;
232
233 tokio::spawn(async move {
234 tx.upsert(
235 "reward_merkle_tree_v2_proofs",
236 ["height", "account", "proof"],
237 ["height", "account"],
238 chunk,
239 )
240 .await?;
241
242 hotshot_query_service::data_source::Transaction::commit(tx)
243 .await
244 .context("Transaction to store reward merkle tree failed.")?;
245
246 Ok::<_, anyhow::Error>(())
247 });
248 }
249 Ok(())
250 }
251 }
252
253 fn load_proof(
260 &self,
261 height: u64,
262 account: Vec<u8>,
263 epoch_height: u64,
264 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
265 async move {
266 let mut tx = self
267 .read()
268 .await
269 .context("opening transaction for load_proof")?;
270
271 let leaf = tx
272 .get_leaf(LeafId::<SeqTypes>::from(height as usize))
273 .await
274 .context(format!("leaf {height} not available"))?;
275
276 let proof_height = if leaf.header().version() < EPOCH_REWARD_VERSION
279 || is_last_block(height, epoch_height)
280 {
281 height
282 } else {
283 let prev_epoch_last_block =
284 epoch_from_block_number(height, epoch_height).saturating_sub(1) * epoch_height;
285 let boundary_leaf = tx
286 .get_leaf(LeafId::<SeqTypes>::from(prev_epoch_last_block as usize))
287 .await
288 .context(format!(
289 "leaf at epoch boundary {prev_epoch_last_block} not available"
290 ))?;
291 ensure!(
292 boundary_leaf.header().version() >= EPOCH_REWARD_VERSION,
293 "no epoch reward proofs available at boundary {prev_epoch_last_block}"
294 );
295 prev_epoch_last_block
296 };
297
298 sqlx::query_scalar(
299 "SELECT proof FROM reward_merkle_tree_v2_proofs WHERE height = $1 AND account = $2",
300 )
301 .bind(proof_height as i64)
302 .bind(account)
303 .fetch_optional(tx.as_mut())
304 .await?
305 .context(format!(
306 "Missing proofs at height {proof_height} (resolved from {height})"
307 ))
308 }
309 }
310
311 fn load_latest_proof(
312 &self,
313 account: Vec<u8>,
314 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
315 async move {
316 let mut tx = self
317 .read()
318 .await
319 .context("opening transaction for state update")?;
320
321 let row = sqlx::query(
322 r#"
323 SELECT proof
324 FROM reward_merkle_tree_v2_proofs
325 WHERE account = $1
326 ORDER BY height DESC
327 LIMIT 1
328 "#,
329 )
330 .bind(account)
331 .fetch_optional(tx.as_mut())
332 .await?
333 .context("Missing proofs")?;
334
335 row.try_get::<Vec<u8>, _>("proof")
336 .context("Missing field proof from row; this should never happen")
337 }
338 }
339
340 fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
341 async move {
342 let batch_size = self.get_pruning_config().unwrap_or_default().batch_size();
343
344 #[cfg(not(feature = "embedded-db"))]
348 let for_update = "FOR UPDATE";
349 #[cfg(feature = "embedded-db")]
350 let for_update = "";
351
352 loop {
354 let mut tx = self
355 .write()
356 .await
357 .context("opening transaction for state update")?;
358
359 let res = sqlx::query(&format!(
360 "
361 WITH delete_batch AS (
362 SELECT d.height FROM reward_merkle_tree_v2_data AS d
363 WHERE d.height < $1
364 ORDER BY d.height DESC
365 LIMIT $2
366 {for_update}
367 )
368 DELETE FROM reward_merkle_tree_v2_data AS del
369 WHERE del.height IN (SELECT * FROM delete_batch)
370 "
371 ))
372 .bind(height as i64)
373 .bind(batch_size as i64)
374 .execute(tx.as_mut())
375 .await?;
376
377 hotshot_query_service::data_source::Transaction::commit(tx)
378 .await
379 .context(
380 "Transaction to garbage collect reward merkle trees from storage failed.",
381 )?;
382
383 if res.rows_affected() == 0 {
384 break;
385 } else {
386 tracing::debug!(
387 "deleted {} rows from reward_merkle_tree_v2_data",
388 res.rows_affected()
389 );
390 }
391 }
392
393 loop {
395 let mut tx = self
396 .write()
397 .await
398 .context("opening transaction for state update")?;
399
400 let res = sqlx::query(&format!(
401 "
402 WITH delete_batch AS (
403 SELECT d.height, d.account FROM reward_merkle_tree_v2_proofs AS d
404 WHERE d.height < $1
405 ORDER BY d.height, d.account DESC
406 LIMIT $2
407 {for_update}
408 )
409 DELETE FROM reward_merkle_tree_v2_proofs AS del
410 WHERE (del.height, del.account) IN (SELECT * FROM delete_batch)
411 ",
412 ))
413 .bind(height as i64)
414 .bind(batch_size as i64)
415 .execute(tx.as_mut())
416 .await?;
417
418 hotshot_query_service::data_source::Transaction::commit(tx)
419 .await
420 .context("Transaction to garbage collect proofs from storage failed.")?;
421
422 if res.rows_affected() == 0 {
423 break;
424 } else {
425 tracing::debug!(
426 "deleted {} rows from reward_merkle_tree_v2_proofs",
427 res.rows_affected()
428 );
429 }
430 }
431
432 Ok(())
433 }
434 }
435
436 fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool> {
437 async move {
438 let Ok(mut tx) = self.write().await else {
439 return false;
440 };
441
442 sqlx::query_as(
443 r#"
444 SELECT EXISTS(
445 SELECT 1 FROM reward_merkle_tree_v2_proofs
446 WHERE height = $1
447 )
448 "#,
449 )
450 .bind(height as i64)
451 .fetch_one(tx.as_mut())
452 .await
453 .ok()
454 .unwrap_or((false,))
455 .0
456 }
457 }
458 fn persist_reward_proofs(
465 &self,
466 node_state: &NodeState,
467 height: u64,
468 version: Version,
469 ) -> impl Send + Future<Output = anyhow::Result<()>> {
470 async move {
471 if !cfg!(any(test, feature = "testing"))
478 && version < EPOCH_REWARD_VERSION
479 && !(height + node_state.node_id).is_multiple_of(30)
480 {
481 return Ok(());
482 }
483
484 let finalized_hotshot_height = if cfg!(any(test, feature = "testing")) {
488 height
489 } else {
490 match node_state.finalized_hotshot_height().await {
491 Ok(h) => h,
492 Err(err) => {
493 tracing::warn!("failed to get finalized hotshot height: {err:#}");
494 return Ok(());
495 },
496 }
497 };
498
499 if self.proof_exists(finalized_hotshot_height).await {
500 return Ok(());
501 }
502
503 let mut tree_height = finalized_hotshot_height;
506 if version >= EPOCH_REWARD_VERSION {
507 let epoch_height = node_state
508 .epoch_height
509 .context("epoch_height not set in node state")?;
510 if !is_last_block(finalized_hotshot_height, epoch_height) {
511 tree_height = epoch_from_block_number(finalized_hotshot_height, epoch_height)
512 .saturating_sub(1)
513 * epoch_height;
514 if tree_height == 0 {
515 return Ok(());
516 }
517 let mut tx = self.read().await.context("opening read transaction")?;
519 let leaf = tx
520 .get_leaf(LeafId::<SeqTypes>::from(tree_height as usize))
521 .await
522 .context(format!(
523 "leaf at epoch boundary {tree_height} not available"
524 ))?;
525 if leaf.header().version() < EPOCH_REWARD_VERSION {
526 return Ok(());
527 }
528 }
529 }
530
531 let permitted_tree = match self.load_reward_merkle_tree_v2(tree_height).await {
532 Ok(tree) => tree,
533 Err(err) => {
534 tracing::warn!(tree_height, "failed to load reward merkle tree: {err:#}");
535 return Ok(());
536 },
537 };
538
539 let tree = permitted_tree.tree;
540 let iter = tree
541 .iter()
542 .filter_map(|(account, balance): (&RewardAccountV2, _)| {
543 let proof = espresso_types::v0_6::RewardAccountProofV2::prove(
544 &tree,
545 (*account).into(),
546 )?;
547 let proof = RewardAccountQueryDataV2 {
548 balance: (*balance).into(),
549 proof: proof.0,
550 };
551 Some((
552 bincode::serialize(&account).ok()?,
553 bincode::serialize(&proof).ok()?,
554 ))
555 });
556
557 if let Err(err) = self.persist_proofs(finalized_hotshot_height, iter).await {
558 tracing::warn!(
559 finalized_hotshot_height,
560 "failed to persist proofs: {err:#}"
561 );
562 }
563
564 Ok(())
565 }
566 }
567}
568
569impl CatchupStorage for SqlStorage {
570 async fn get_reward_accounts_v1(
571 &self,
572 instance: &NodeState,
573 height: u64,
574 view: ViewNumber,
575 accounts: &[RewardAccountV1],
576 ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
577 let mut tx = self.read().await.context(format!(
578 "opening transaction to fetch v1 reward account {accounts:?}; height {height}"
579 ))?;
580
581 let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
582 .await
583 .context("getting block height")? as u64;
584 ensure!(
585 block_height > 0,
586 "cannot get accounts for height {height}: no blocks available"
587 );
588
589 if height < block_height {
592 load_v1_reward_accounts(self, height, accounts).await
593 } else {
594 let accounts: Vec<_> = accounts
595 .iter()
596 .map(|acct| RewardAccountV2::from(*acct))
597 .collect();
598 let (state, leaf) = reconstruct_state(
601 instance,
602 self,
603 &mut tx,
604 block_height - 1,
605 view,
606 &[],
607 &accounts,
608 )
609 .await?;
610 Ok((state.reward_merkle_tree_v1, leaf))
611 }
612 }
613
614 async fn get_reward_accounts_v2(
615 &self,
616 instance: &NodeState,
617 height: u64,
618 view: ViewNumber,
619 accounts: &[RewardAccountV2],
620 ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
621 let mut tx = self.read().await.context(format!(
622 "opening transaction to fetch reward account {accounts:?}; height {height}"
623 ))?;
624
625 let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
626 .await
627 .context("getting block height")? as u64;
628 ensure!(
629 block_height > 0,
630 "cannot get accounts for height {height}: no blocks available"
631 );
632
633 if height < block_height {
636 load_reward_merkle_tree_v2(self, height)
637 .await
638 .map(|(permitted_tree, leaf)| (permitted_tree.tree, leaf))
639 } else {
640 let (state, leaf) = reconstruct_state(
643 instance,
644 self,
645 &mut tx,
646 block_height - 1,
647 view,
648 &[],
649 accounts,
650 )
651 .await?;
652 Ok((state.reward_merkle_tree_v2, leaf))
653 }
654 }
655
656 async fn get_accounts(
657 &self,
658 instance: &NodeState,
659 height: u64,
660 view: ViewNumber,
661 accounts: &[FeeAccount],
662 ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
663 let mut tx = self.read().await.context(format!(
664 "opening transaction to fetch account {accounts:?}; height {height}"
665 ))?;
666
667 let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
668 .await
669 .context("getting block height")? as u64;
670 ensure!(
671 block_height > 0,
672 "cannot get accounts for height {height}: no blocks available"
673 );
674
675 if height < block_height {
678 load_accounts(&mut tx, height, accounts).await
679 } else {
680 let (state, leaf) = reconstruct_state(
683 instance,
684 self,
685 &mut tx,
686 block_height - 1,
687 view,
688 accounts,
689 &[],
690 )
691 .await?;
692 Ok((state.fee_merkle_tree, leaf))
693 }
694 }
695
696 async fn get_frontier(
697 &self,
698 instance: &NodeState,
699 height: u64,
700 view: ViewNumber,
701 ) -> anyhow::Result<BlocksFrontier> {
702 let mut tx = self.read().await.context(format!(
703 "opening transaction to fetch frontier at height {height}"
704 ))?;
705
706 let block_height = NodeStorage::<SeqTypes>::block_height(&mut tx)
707 .await
708 .context("getting block height")? as u64;
709 ensure!(
710 block_height > 0,
711 "cannot get frontier for height {height}: no blocks available"
712 );
713
714 if height < block_height {
717 load_frontier(&mut tx, height).await
718 } else {
719 let (state, _) =
722 reconstruct_state(instance, self, &mut tx, block_height - 1, view, &[], &[])
723 .await?;
724 match state.block_merkle_tree.lookup(height - 1) {
725 LookupResult::Ok(_, proof) => Ok(proof),
726 _ => {
727 bail!(
728 "state snapshot {view:?},{height} was found but does not contain frontier \
729 at height {}; this should not be possible",
730 height - 1
731 );
732 },
733 }
734 }
735 }
736
737 async fn get_chain_config(
738 &self,
739 commitment: Commitment<ChainConfig>,
740 ) -> anyhow::Result<ChainConfig> {
741 let mut tx = self.read().await.context(format!(
742 "opening transaction to fetch chain config {commitment}"
743 ))?;
744 load_chain_config(&mut tx, commitment).await
745 }
746
747 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
748 let mut tx = self
749 .read()
750 .await
751 .context(format!("opening transaction to fetch leaf at {height}"))?;
752 let leaf = tx
753 .get_leaf((height as usize).into())
754 .await
755 .context(format!("leaf {height} not available"))?;
756 let mut last_leaf: Leaf2 = leaf.leaf().clone();
757 let mut chain = vec![last_leaf.clone()];
758 let mut h = height + 1;
759
760 loop {
761 let lqd = tx.get_leaf((h as usize).into()).await?;
762 let leaf = lqd.leaf();
763
764 if leaf.justify_qc().view_number() == last_leaf.view_number() {
765 chain.push(leaf.clone());
766 } else {
767 h += 1;
768 continue;
769 }
770
771 if leaf.view_number() == last_leaf.view_number() + 1 {
773 last_leaf = leaf.clone();
774 h += 1;
775 break;
776 }
777 h += 1;
778 last_leaf = leaf.clone();
779 }
780
781 loop {
782 let lqd = tx.get_leaf((h as usize).into()).await?;
783 let leaf = lqd.leaf();
784 if leaf.justify_qc().view_number() == last_leaf.view_number() {
785 chain.push(leaf.clone());
786 break;
787 }
788 h += 1;
789 }
790
791 Ok(chain)
792 }
793}
794
795impl RewardMerkleTreeDataSource for DataSource {
796 async fn load_v1_reward_account_proof(
797 &self,
798 height: u64,
799 account: RewardAccountV1,
800 ) -> anyhow::Result<RewardAccountQueryDataV1> {
801 self.as_ref()
802 .load_v1_reward_account_proof(height, account)
803 .await
804 }
805
806 fn persist_tree(
807 &self,
808 height: u64,
809 merkle_tree: Vec<u8>,
810 ) -> impl Send + Future<Output = anyhow::Result<()>> {
811 async move { self.as_ref().persist_tree(height, merkle_tree).await }
812 }
813
814 fn load_tree(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
815 async move { self.as_ref().load_tree(height).await }
816 }
817
818 fn garbage_collect(&self, height: u64) -> impl Send + Future<Output = anyhow::Result<()>> {
819 async move { self.as_ref().garbage_collect(height).await }
820 }
821
822 fn persist_proofs(
823 &self,
824 height: u64,
825 proofs: impl Iterator<Item = (Vec<u8>, Vec<u8>)> + Send,
826 ) -> impl Send + Future<Output = anyhow::Result<()>> {
827 async move { self.as_ref().persist_proofs(height, proofs).await }
828 }
829
830 fn load_proof(
831 &self,
832 height: u64,
833 account: Vec<u8>,
834 epoch_height: u64,
835 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
836 async move {
837 self.as_ref()
838 .load_proof(height, account, epoch_height)
839 .await
840 }
841 }
842
843 fn load_latest_proof(
844 &self,
845 account: Vec<u8>,
846 ) -> impl Send + Future<Output = anyhow::Result<Vec<u8>>> {
847 async move { self.as_ref().load_latest_proof(account).await }
848 }
849
850 fn proof_exists(&self, height: u64) -> impl Send + Future<Output = bool> {
851 async move { self.as_ref().proof_exists(height).await }
852 }
853
854 fn persist_reward_proofs(
855 &self,
856 node_state: &NodeState,
857 height: u64,
858 version: Version,
859 ) -> impl Send + Future<Output = anyhow::Result<()>> {
860 async move {
861 self.as_ref()
862 .persist_reward_proofs(node_state, height, version)
863 .await
864 }
865 }
866}
867
868impl CatchupStorage for DataSource {
869 async fn get_accounts(
870 &self,
871 instance: &NodeState,
872 height: u64,
873 view: ViewNumber,
874 accounts: &[FeeAccount],
875 ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
876 self.as_ref()
877 .get_accounts(instance, height, view, accounts)
878 .await
879 }
880
881 async fn get_reward_accounts_v2(
882 &self,
883 instance: &NodeState,
884 height: u64,
885 view: ViewNumber,
886 accounts: &[RewardAccountV2],
887 ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
888 self.as_ref()
889 .get_reward_accounts_v2(instance, height, view, accounts)
890 .await
891 }
892
893 async fn get_reward_accounts_v1(
894 &self,
895 instance: &NodeState,
896 height: u64,
897 view: ViewNumber,
898 accounts: &[RewardAccountV1],
899 ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
900 self.as_ref()
901 .get_reward_accounts_v1(instance, height, view, accounts)
902 .await
903 }
904
905 async fn get_frontier(
906 &self,
907 instance: &NodeState,
908 height: u64,
909 view: ViewNumber,
910 ) -> anyhow::Result<BlocksFrontier> {
911 self.as_ref().get_frontier(instance, height, view).await
912 }
913
914 async fn get_chain_config(
915 &self,
916 commitment: Commitment<ChainConfig>,
917 ) -> anyhow::Result<ChainConfig> {
918 self.as_ref().get_chain_config(commitment).await
919 }
920 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
921 self.as_ref().get_leaf_chain(height).await
922 }
923}
924
925#[async_trait]
926impl ChainConfigPersistence for Transaction<Write> {
927 async fn insert_chain_config(&mut self, chain_config: ChainConfig) -> anyhow::Result<()> {
928 let commitment = chain_config.commitment();
929 let data = bincode::serialize(&chain_config)?;
930 self.upsert(
931 "chain_config",
932 ["commitment", "data"],
933 ["commitment"],
934 [(commitment.to_string(), data)],
935 )
936 .await
937 }
938}
939
940impl super::data_source::DatabaseMetadataSource for SqlStorage {
941 async fn get_table_sizes(&self) -> anyhow::Result<Vec<super::data_source::TableSize>> {
942 let mut tx = self
943 .read()
944 .await
945 .context("opening transaction to fetch table sizes")?;
946
947 #[cfg(not(feature = "embedded-db"))]
948 {
949 let query = r#"
950 SELECT
951 schemaname || '.' || relname AS table_name,
952 n_live_tup AS row_count,
953 pg_total_relation_size(relid) AS total_size_bytes
954 FROM pg_stat_user_tables
955 ORDER BY pg_total_relation_size(relid) DESC
956 "#;
957
958 let rows = sqlx::query(query)
959 .fetch_all(tx.as_mut())
960 .await
961 .context("failed to query table sizes")?;
962
963 let mut table_sizes = Vec::new();
964 for row in rows {
965 let table_name: String = row.try_get("table_name")?;
966 let row_count: i64 = row.try_get("row_count").unwrap_or(-1);
967 let total_size_bytes: Option<i64> = row.try_get("total_size_bytes").ok();
968
969 table_sizes.push(super::data_source::TableSize {
970 table_name,
971 row_count,
972 total_size_bytes,
973 });
974 }
975
976 Ok(table_sizes)
977 }
978
979 #[cfg(feature = "embedded-db")]
980 {
981 let table_names_query = r#"
983 SELECT name
984 FROM sqlite_master
985 WHERE type = 'table'
986 AND name NOT LIKE 'sqlite_%'
987 ORDER BY name
988 "#;
989
990 let table_rows = sqlx::query(table_names_query)
991 .fetch_all(tx.as_mut())
992 .await
993 .context("failed to query table names")?;
994
995 let mut table_sizes = Vec::new();
996
997 for row in table_rows {
999 let table_name: String = row.try_get("name")?;
1000
1001 let count_query = format!("SELECT COUNT(*) as count FROM \"{}\"", table_name);
1004 let count_row = sqlx::query(&count_query)
1005 .fetch_one(tx.as_mut())
1006 .await
1007 .context(format!(
1008 "failed to query row count for table {}",
1009 table_name
1010 ))?;
1011
1012 let row_count: i64 = count_row.try_get("count").unwrap_or(0);
1013
1014 table_sizes.push(super::data_source::TableSize {
1015 table_name,
1016 row_count,
1017 total_size_bytes: None,
1018 });
1019 }
1020
1021 Ok(table_sizes)
1022 }
1023 }
1024}
1025
1026impl super::data_source::DatabaseMetadataSource for DataSource {
1027 async fn get_table_sizes(&self) -> anyhow::Result<Vec<super::data_source::TableSize>> {
1028 self.as_ref().get_table_sizes().await
1029 }
1030}
1031
1032async fn load_frontier<Mode: TransactionMode>(
1033 tx: &mut Transaction<Mode>,
1034 height: u64,
1035) -> anyhow::Result<BlocksFrontier> {
1036 tx.get_path(
1037 Snapshot::<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>::Index(height),
1038 height
1039 .checked_sub(1)
1040 .ok_or(anyhow::anyhow!("Subtract with overflow ({height})!"))?,
1041 )
1042 .await
1043 .context(format!("fetching frontier at height {height}"))
1044}
1045
1046async fn load_v1_reward_accounts(
1047 db: &SqlStorage,
1048 height: u64,
1049 accounts: &[RewardAccountV1],
1050) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
1051 let mut tx = db
1053 .read()
1054 .await
1055 .with_context(|| "failed to open read transaction")?;
1056
1057 let leaf = tx
1059 .get_leaf(LeafId::<SeqTypes>::from(height as usize))
1060 .await
1061 .context(format!("leaf {height} not available"))?;
1062 let header = leaf.header();
1063
1064 if header.version() < EPOCH_VERSION || header.version() >= DRB_AND_HEADER_UPGRADE_VERSION {
1065 return Ok((
1066 RewardMerkleTreeV1::new(REWARD_MERKLE_TREE_V1_HEIGHT),
1067 leaf.leaf().clone(),
1068 ));
1069 }
1070
1071 let merkle_root = header.reward_merkle_tree_root().unwrap_left();
1073 let mut snapshot = RewardMerkleTreeV1::from_commitment(merkle_root);
1074
1075 let mut join_set = BoundedJoinSet::new(10);
1077
1078 let mut task_id_to_account = HashMap::new();
1080
1081 for account in accounts {
1083 let db_clone = db.clone();
1085 let account_clone = *account;
1086 let header_height = header.height();
1087
1088 let func = async move {
1090 let mut tx = db_clone
1092 .read()
1093 .await
1094 .with_context(|| "failed to open read transaction")?;
1095
1096 let proof = tx
1098 .get_path(
1099 Snapshot::<SeqTypes, RewardMerkleTreeV1, { RewardMerkleTreeV1::ARITY }>::Index(
1100 header_height,
1101 ),
1102 account_clone,
1103 )
1104 .await
1105 .with_context(|| {
1106 format!(
1107 "failed to get path for v1 reward account {account_clone:?}; height \
1108 {height}"
1109 )
1110 })?;
1111
1112 Ok::<_, anyhow::Error>(proof)
1113 };
1114
1115 let id = join_set.spawn(func).id();
1117
1118 task_id_to_account.insert(id, account);
1120 }
1121
1122 while let Some(result) = join_set.join_next_with_id().await {
1124 let (id, result) = result.with_context(|| "failed to join task")?;
1126
1127 let proof = result?;
1129
1130 let account = task_id_to_account
1132 .remove(&id)
1133 .with_context(|| "task ID for spawned task not found")?;
1134
1135 match proof.proof.first().with_context(|| {
1136 format!("empty proof for v1 reward account {account:?}; height {height}")
1137 })? {
1138 MerkleNode::Leaf { pos, elem, .. } => {
1139 snapshot.remember(*pos, *elem, proof)?;
1140 },
1141 MerkleNode::Empty => {
1142 snapshot.non_membership_remember(*account, proof)?;
1143 },
1144 _ => {
1145 bail!("invalid proof for v1 reward account {account:?}; height {height}");
1146 },
1147 }
1148 }
1149
1150 Ok((snapshot, leaf.leaf().clone()))
1151}
1152
1153async fn load_reward_merkle_tree_v2(
1155 db: &SqlStorage,
1156 height: u64,
1157) -> anyhow::Result<(PermittedRewardMerkleTreeV2, Leaf2)> {
1158 let mut tx = db
1160 .read()
1161 .await
1162 .with_context(|| "failed to open read transaction")?;
1163
1164 let leaf = tx
1166 .get_leaf(LeafId::<SeqTypes>::from(height as usize))
1167 .await
1168 .with_context(|| format!("leaf {height} not available"))?;
1169
1170 let snapshot = db.load_reward_merkle_tree_v2(height).await?;
1171
1172 Ok((snapshot, leaf.leaf().clone()))
1173}
1174
1175async fn load_accounts<Mode: TransactionMode>(
1176 tx: &mut Transaction<Mode>,
1177 height: u64,
1178 accounts: &[FeeAccount],
1179) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
1180 let leaf = tx
1181 .get_leaf(LeafId::<SeqTypes>::from(height as usize))
1182 .await
1183 .context(format!("leaf {height} not available"))?;
1184 let header = leaf.header();
1185
1186 let mut snapshot = FeeMerkleTree::from_commitment(header.fee_merkle_tree_root());
1187 for account in accounts {
1188 let proof = tx
1189 .get_path(
1190 Snapshot::<SeqTypes, FeeMerkleTree, { FeeMerkleTree::ARITY }>::Index(
1191 header.height(),
1192 ),
1193 *account,
1194 )
1195 .await
1196 .context(format!(
1197 "fetching account {account}; height {}",
1198 header.height()
1199 ))?;
1200 match proof.proof.first().context(format!(
1201 "empty proof for account {account}; height {}",
1202 header.height()
1203 ))? {
1204 MerkleNode::Leaf { pos, elem, .. } => {
1205 snapshot.remember(*pos, *elem, proof)?;
1206 },
1207 MerkleNode::Empty => {
1208 snapshot.non_membership_remember(*account, proof)?;
1209 },
1210 _ => {
1211 bail!("Invalid proof");
1212 },
1213 }
1214 }
1215
1216 Ok((snapshot, leaf.leaf().clone()))
1217}
1218
1219async fn load_chain_config<Mode: TransactionMode>(
1220 tx: &mut Transaction<Mode>,
1221 commitment: Commitment<ChainConfig>,
1222) -> anyhow::Result<ChainConfig> {
1223 let (data,) = query_as::<(Vec<u8>,)>("SELECT data from chain_config where commitment = $1")
1224 .bind(commitment.to_string())
1225 .fetch_one(tx.as_mut())
1226 .await
1227 .unwrap();
1228
1229 bincode::deserialize(&data[..]).context("failed to deserialize")
1230}
1231
1232#[tracing::instrument(skip(instance, db, tx))]
1243pub(crate) async fn reconstruct_state<Mode: TransactionMode>(
1244 instance: &NodeState,
1245 db: &SqlStorage,
1246 tx: &mut Transaction<Mode>,
1247 from_height: u64,
1248 to_view: ViewNumber,
1249 fee_accounts: &[FeeAccount],
1250 reward_accounts: &[RewardAccountV2],
1251) -> anyhow::Result<(ValidatedState, Leaf2)> {
1252 tracing::info!("attempting to reconstruct fee state");
1253 let from_leaf = tx
1254 .get_leaf((from_height as usize).into())
1255 .await
1256 .context(format!("leaf {from_height} not available"))?;
1257 let from_leaf: Leaf2 = from_leaf.leaf().clone();
1258 ensure!(
1259 from_leaf.view_number() < to_view,
1260 "state reconstruction: starting state {:?} must be before ending state {to_view:?}",
1261 from_leaf.view_number(),
1262 );
1263
1264 let mut leaves = VecDeque::new();
1266 let to_leaf = get_leaf_from_proposal(tx, "view = $1", &(to_view.u64() as i64))
1267 .await
1268 .context(format!(
1269 "unable to reconstruct state because leaf {to_view:?} is not available"
1270 ))?;
1271 let mut parent = to_leaf.parent_commitment();
1272 tracing::debug!(?to_leaf, ?parent, view = ?to_view, "have required leaf");
1273 leaves.push_front(to_leaf.clone());
1274 while parent != Committable::commit(&from_leaf) {
1275 let leaf = get_leaf_from_proposal(tx, "leaf_hash = $1", &parent.to_string())
1276 .await
1277 .context(format!(
1278 "unable to reconstruct state because leaf {parent} is not available"
1279 ))?;
1280 parent = leaf.parent_commitment();
1281 tracing::debug!(?leaf, ?parent, "have required leaf");
1282 leaves.push_front(leaf);
1283 }
1284
1285 let mut parent = from_leaf;
1287 let mut state = ValidatedState::from_header(parent.block_header());
1288
1289 let mut catchup = NullStateCatchup::default();
1292
1293 let mut fee_accounts = fee_accounts.iter().copied().collect::<HashSet<_>>();
1294 tracing::info!(
1297 "reconstructing fee accounts state for from height {from_height} to view {to_view}"
1298 );
1299
1300 let dependencies =
1301 fee_header_dependencies(&mut catchup, tx, instance, &parent, &leaves).await?;
1302 fee_accounts.extend(dependencies);
1303 let fee_accounts = fee_accounts.into_iter().collect::<Vec<_>>();
1304 state.fee_merkle_tree = load_accounts(tx, from_height, &fee_accounts)
1305 .await
1306 .context("unable to reconstruct state because accounts are not available at origin")?
1307 .0;
1308 ensure!(
1309 state.fee_merkle_tree.commitment() == parent.block_header().fee_merkle_tree_root(),
1310 "loaded fee state does not match parent header"
1311 );
1312
1313 tracing::info!(
1314 "reconstructing reward accounts for from height {from_height} to view {to_view}"
1315 );
1316
1317 let mut reward_accounts = reward_accounts.iter().copied().collect::<HashSet<_>>();
1318
1319 let dependencies = reward_header_dependencies(instance, &leaves).await?;
1322 reward_accounts.extend(dependencies);
1323 let reward_accounts = reward_accounts.into_iter().collect::<Vec<_>>();
1324
1325 match parent.block_header().reward_merkle_tree_root() {
1327 either::Either::Left(expected_root) => {
1328 let accts = reward_accounts
1329 .into_iter()
1330 .map(RewardAccountV1::from)
1331 .collect::<Vec<_>>();
1332 state.reward_merkle_tree_v1 = load_v1_reward_accounts(db, from_height, &accts)
1333 .await
1334 .context(
1335 "unable to reconstruct state because v1 reward accounts are not available at \
1336 origin",
1337 )?
1338 .0;
1339 ensure!(
1340 state.reward_merkle_tree_v1.commitment() == expected_root,
1341 "loaded v1 reward state does not match parent header"
1342 );
1343 },
1344 either::Either::Right(expected_root) => {
1345 state.reward_merkle_tree_v2 = load_reward_merkle_tree_v2(db, from_height)
1346 .await
1347 .context(
1348 "unable to reconstruct state because RewardMerkleTreeV2 not available at \
1349 origin",
1350 )?
1351 .0
1352 .tree;
1353 ensure!(
1354 state.reward_merkle_tree_v2.commitment() == expected_root,
1355 "loaded reward state does not match parent header"
1356 );
1357 },
1358 }
1359
1360 let frontier = load_frontier(tx, from_height)
1362 .await
1363 .context("unable to reconstruct state because frontier is not available at origin")?;
1364 match frontier
1365 .proof
1366 .first()
1367 .context("empty proof for frontier at origin")?
1368 {
1369 MerkleNode::Leaf { pos, elem, .. } => state
1370 .block_merkle_tree
1371 .remember(*pos, *elem, frontier)
1372 .context("failed to remember frontier")?,
1373 _ => bail!("invalid frontier proof"),
1374 }
1375
1376 for proposal in leaves {
1378 state = compute_state_update(&state, instance, &catchup, &parent, &proposal)
1379 .await
1380 .context(format!(
1381 "unable to reconstruct state because state update {} failed",
1382 proposal.height(),
1383 ))?
1384 .0;
1385 parent = proposal;
1386 }
1387
1388 tracing::info!(from_height, ?to_view, "successfully reconstructed state");
1389 Ok((state, to_leaf))
1390}
1391
1392async fn fee_header_dependencies<Mode: TransactionMode>(
1399 catchup: &mut NullStateCatchup,
1400 tx: &mut Transaction<Mode>,
1401 instance: &NodeState,
1402 mut parent: &Leaf2,
1403 leaves: impl IntoIterator<Item = &Leaf2>,
1404) -> anyhow::Result<HashSet<FeeAccount>> {
1405 let mut accounts = HashSet::default();
1406
1407 for proposal in leaves {
1408 let header = proposal.block_header();
1409 let height = header.height();
1410 let view = proposal.view_number();
1411 tracing::debug!(height, ?view, "fetching dependencies for proposal");
1412
1413 let header_cf = header.chain_config();
1414 let chain_config = if header_cf.commit() == instance.chain_config.commit() {
1415 instance.chain_config
1416 } else {
1417 match header_cf.resolve() {
1418 Some(cf) => cf,
1419 None => {
1420 tracing::info!(
1421 height,
1422 ?view,
1423 commit = %header_cf.commit(),
1424 "chain config not available, attempting to load from storage",
1425 );
1426 let cf = load_chain_config(tx, header_cf.commit())
1427 .await
1428 .context(format!(
1429 "loading chain config {} for header {},{:?}",
1430 header_cf.commit(),
1431 header.height(),
1432 proposal.view_number()
1433 ))?;
1434
1435 catchup.add_chain_config(cf);
1438 cf
1439 },
1440 }
1441 };
1442
1443 accounts.insert(chain_config.fee_recipient);
1444 accounts.extend(
1445 get_l1_deposits(instance, header, parent, chain_config.fee_contract)
1446 .await
1447 .into_iter()
1448 .map(|fee| fee.account()),
1449 );
1450 accounts.extend(header.fee_info().accounts());
1451 parent = proposal;
1452 }
1453 Ok(accounts)
1454}
1455
1456async fn reward_header_dependencies(
1460 instance: &NodeState,
1461 leaves: impl IntoIterator<Item = &Leaf2>,
1462) -> anyhow::Result<HashSet<RewardAccountV2>> {
1463 let mut reward_accounts = HashSet::default();
1464 let epoch_height = instance.epoch_height;
1465
1466 let Some(epoch_height) = epoch_height else {
1467 tracing::info!("epoch height is not set. returning empty reward_header_dependencies");
1468 return Ok(HashSet::new());
1469 };
1470
1471 let coordinator = instance.coordinator.clone();
1472 let membership_lock = coordinator.membership().read().await;
1473 let first_epoch = membership_lock.first_epoch();
1474 drop(membership_lock);
1475 for proposal in leaves {
1477 let header = proposal.block_header();
1478
1479 let height = header.height();
1480 let view = proposal.view_number();
1481 tracing::debug!(height, ?view, "fetching dependencies for proposal");
1482
1483 let version = header.version();
1484 if version < EPOCH_VERSION || version >= EPOCH_REWARD_VERSION {
1486 continue;
1487 }
1488
1489 let first_epoch = first_epoch.context("first epoch not found")?;
1490
1491 let proposal_epoch = EpochNumber::new(epoch_from_block_number(height, epoch_height));
1492
1493 if proposal_epoch <= first_epoch + 1 {
1495 continue;
1496 }
1497
1498 let epoch_membership = match coordinator.membership_for_epoch(Some(proposal_epoch)).await {
1499 Ok(e) => e,
1500 Err(err) => {
1501 tracing::info!(
1502 "failed to get membership for epoch={proposal_epoch:?}. err={err:#}"
1503 );
1504
1505 coordinator
1506 .wait_for_catchup(proposal_epoch)
1507 .await
1508 .context(format!("failed to catchup for epoch={proposal_epoch}"))?
1509 },
1510 };
1511
1512 let leader = epoch_membership.leader(proposal.view_number()).await?;
1513 let membership_lock = coordinator.membership().read().await;
1514 let validator = membership_lock.get_validator_config(&proposal_epoch, leader)?;
1515 drop(membership_lock);
1516
1517 reward_accounts.insert(RewardAccountV2(validator.account));
1518
1519 let delegators: Vec<RewardAccountV2> = validator
1520 .delegators
1521 .keys()
1522 .map(|d| RewardAccountV2(*d))
1523 .collect();
1524
1525 reward_accounts.extend(delegators);
1526 }
1527 Ok(reward_accounts)
1528}
1529
1530async fn get_leaf_from_proposal<Mode, P>(
1531 tx: &mut Transaction<Mode>,
1532 where_clause: &str,
1533 param: P,
1534) -> anyhow::Result<Leaf2>
1535where
1536 P: Type<Db> + for<'q> Encode<'q, Db>,
1537{
1538 let (data,) = query_as::<(Vec<u8>,)>(&format!(
1539 "SELECT data FROM quorum_proposals2 WHERE {where_clause} LIMIT 1",
1540 ))
1541 .bind(param)
1542 .fetch_one(tx.as_mut())
1543 .await?;
1544 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1545 bincode::deserialize(&data)?;
1546 Ok(Leaf2::from_quorum_proposal(&proposal.data))
1547}
1548
1549#[cfg(any(test, feature = "testing"))]
1550pub(crate) mod impl_testable_data_source {
1551
1552 use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
1553
1554 use super::*;
1555 use crate::api::{self, data_source::testing::TestableSequencerDataSource};
1556
1557 pub fn tmp_options(db: &TmpDb) -> Options {
1558 #[cfg(not(feature = "embedded-db"))]
1559 {
1560 let opt = crate::persistence::sql::PostgresOptions {
1561 port: Some(db.port()),
1562 host: Some(db.host()),
1563 user: Some("postgres".into()),
1564 password: Some("password".into()),
1565 ..Default::default()
1566 };
1567
1568 opt.into()
1569 }
1570
1571 #[cfg(feature = "embedded-db")]
1572 {
1573 let opt = crate::persistence::sql::SqliteOptions { path: db.path() };
1574 opt.into()
1575 }
1576 }
1577
1578 #[async_trait]
1579 impl TestableSequencerDataSource for DataSource {
1580 type Storage = TmpDb;
1581
1582 async fn create_storage() -> Self::Storage {
1583 TmpDb::init().await
1584 }
1585
1586 fn persistence_options(storage: &Self::Storage) -> Self::Options {
1587 tmp_options(storage)
1588 }
1589
1590 fn leaf_only_ds_options(
1591 storage: &Self::Storage,
1592 opt: api::Options,
1593 ) -> anyhow::Result<api::Options> {
1594 let mut ds_opts = tmp_options(storage);
1595 ds_opts.lightweight = true;
1596 Ok(opt.query_sql(Default::default(), ds_opts))
1597 }
1598
1599 fn options(storage: &Self::Storage, opt: api::Options) -> api::Options {
1600 opt.query_sql(Default::default(), tmp_options(storage))
1601 }
1602 }
1603}
1604
1605#[cfg(test)]
1606mod tests {
1607 use alloy::primitives::Address;
1608 use espresso_types::{
1609 v0_3::RewardAmount,
1610 v0_4::{REWARD_MERKLE_TREE_V2_HEIGHT, RewardAccountV2, RewardMerkleTreeV2},
1611 };
1612 use hotshot_query_service::{
1613 data_source::{
1614 Transaction, VersionedDataSource,
1615 sql::Config,
1616 storage::{
1617 MerklizedStateStorage, UpdateAvailabilityStorage,
1618 sql::{
1619 SqlStorage, StorageConnectionType, Transaction as SqlTransaction, Write,
1620 testing::TmpDb,
1621 },
1622 },
1623 },
1624 merklized_state::{MerklizedState, Snapshot, UpdateStateData},
1625 };
1626 use jf_merkle_tree_compat::{
1627 LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme,
1628 };
1629 use light_client::testing::{leaf_chain, leaf_chain_with_upgrade};
1630 use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION, Upgrade};
1631
1632 use super::impl_testable_data_source::tmp_options;
1633 use crate::{SeqTypes, api::RewardMerkleTreeDataSource};
1634
1635 fn make_reward_account(i: usize) -> RewardAccountV2 {
1636 let mut addr_bytes = [0u8; 20];
1637 addr_bytes[16..20].copy_from_slice(&(i as u32).to_be_bytes());
1638 RewardAccountV2(Address::from(addr_bytes))
1639 }
1640
1641 async fn insert_test_header(
1642 tx: &mut SqlTransaction<Write>,
1643 block_height: u64,
1644 reward_tree: &RewardMerkleTreeV2,
1645 ) {
1646 let reward_commitment = serde_json::to_value(reward_tree.commitment()).unwrap();
1647 let test_data = serde_json::json!({
1648 "block_merkle_tree_root": format!("block_root_{}", block_height),
1649 "fee_merkle_tree_root": format!("fee_root_{}", block_height),
1650 "fields": {
1651 RewardMerkleTreeV2::header_state_commitment_field(): reward_commitment
1652 }
1653 });
1654 tx.upsert(
1655 "header",
1656 [
1657 "height",
1658 "hash",
1659 "payload_hash",
1660 "timestamp",
1661 "data",
1662 "ns_table",
1663 ],
1664 ["height"],
1665 [(
1666 block_height as i64,
1667 format!("hash_{}", block_height),
1668 format!("payload_{}", block_height),
1669 block_height as i64,
1670 test_data,
1671 "ns_table".to_string(),
1672 )],
1673 )
1674 .await
1675 .unwrap();
1676 }
1677
1678 async fn batch_insert_proofs(
1679 tx: &mut SqlTransaction<Write>,
1680 reward_tree: &RewardMerkleTreeV2,
1681 accounts: &[RewardAccountV2],
1682 block_height: u64,
1683 ) {
1684 let proofs_and_paths: Vec<_> = accounts
1685 .iter()
1686 .map(|account| {
1687 let proof = match reward_tree.universal_lookup(*account) {
1688 LookupResult::Ok(_, proof) => proof,
1689 LookupResult::NotInMemory => panic!("account not in memory"),
1690 LookupResult::NotFound(proof) => proof,
1691 };
1692 let traversal_path = <RewardAccountV2 as ToTraversalPath<
1693 { RewardMerkleTreeV2::ARITY },
1694 >>::to_traversal_path(
1695 account, reward_tree.height()
1696 );
1697 (proof, traversal_path)
1698 })
1699 .collect();
1700
1701 UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::insert_merkle_nodes_batch(
1702 tx,
1703 proofs_and_paths,
1704 block_height,
1705 )
1706 .await
1707 .expect("failed to batch insert proofs");
1708 }
1709
1710 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1711 async fn test_reward_accounts_batch_insertion() {
1712 let db = TmpDb::init().await;
1719 let opt = tmp_options(&db);
1720 let cfg = Config::try_from(&opt).expect("failed to create config from options");
1721 let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
1722 .await
1723 .expect("failed to connect to storage");
1724
1725 let num_initial_accounts = 1000usize;
1726
1727 let initial_accounts: Vec<RewardAccountV2> =
1728 (0..num_initial_accounts).map(make_reward_account).collect();
1729
1730 tracing::info!(
1731 "Height 1: Inserting {} initial accounts",
1732 num_initial_accounts
1733 );
1734
1735 let mut reward_tree_h1 = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT);
1736
1737 for (i, account) in initial_accounts.iter().enumerate() {
1738 let reward_amount = RewardAmount::from(((i + 1) * 1000) as u64);
1739 reward_tree_h1.update(*account, reward_amount).unwrap();
1740 }
1741
1742 let mut tx = storage.write().await.unwrap();
1743 insert_test_header(&mut tx, 1, &reward_tree_h1).await;
1744 batch_insert_proofs(&mut tx, &reward_tree_h1, &initial_accounts, 1).await;
1745
1746 UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 1)
1747 .await
1748 .unwrap();
1749 tx.commit().await.unwrap();
1750
1751 tracing::info!("Height 2: Updating balances and adding new accounts");
1752
1753 let mut reward_tree_h2 = reward_tree_h1.clone();
1754
1755 let updated_accounts_h2: Vec<RewardAccountV2> = (0..100).map(make_reward_account).collect();
1757 for (i, account) in updated_accounts_h2.iter().enumerate() {
1758 let new_reward = RewardAmount::from(((i + 1) * 2000) as u64);
1759 reward_tree_h2.update(*account, new_reward).unwrap();
1760 }
1761
1762 let new_accounts_h2: Vec<RewardAccountV2> = (1000..1100).map(make_reward_account).collect();
1764 for (i, account) in new_accounts_h2.iter().enumerate() {
1765 let reward_amount = RewardAmount::from(((i + 1001) * 500) as u64);
1766 reward_tree_h2.update(*account, reward_amount).unwrap();
1767 }
1768
1769 let mut changed_accounts_h2 = updated_accounts_h2.clone();
1770 changed_accounts_h2.extend(new_accounts_h2.clone());
1771
1772 let mut tx = storage.write().await.unwrap();
1773 insert_test_header(&mut tx, 2, &reward_tree_h2).await;
1774 batch_insert_proofs(&mut tx, &reward_tree_h2, &changed_accounts_h2, 2).await;
1775
1776 UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 2)
1777 .await
1778 .unwrap();
1779 tx.commit().await.unwrap();
1780
1781 tracing::info!("Height 3: More balance updates");
1782
1783 let mut reward_tree_h3 = reward_tree_h2.clone();
1784
1785 let updated_accounts_h3: Vec<RewardAccountV2> =
1787 (500..600).map(make_reward_account).collect();
1788 for (i, account) in updated_accounts_h3.iter().enumerate() {
1789 let new_reward = RewardAmount::from(((500 + i + 1) * 3000) as u64);
1790 reward_tree_h3.update(*account, new_reward).unwrap();
1791 }
1792
1793 let mut tx = storage.write().await.unwrap();
1794 insert_test_header(&mut tx, 3, &reward_tree_h3).await;
1795 batch_insert_proofs(&mut tx, &reward_tree_h3, &updated_accounts_h3, 3).await;
1796
1797 UpdateStateData::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::set_last_state_height(&mut tx, 3)
1798 .await
1799 .unwrap();
1800 tx.commit().await.unwrap();
1801
1802 tracing::info!("Verifying all account proofs at each height");
1803
1804 let snapshot_h1 =
1807 Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(1);
1808 for i in 0..num_initial_accounts {
1809 let account = make_reward_account(i);
1810 let proof = storage
1811 .read()
1812 .await
1813 .unwrap()
1814 .get_path(snapshot_h1, account)
1815 .await
1816 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h1: {e}"));
1817
1818 let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
1819 let actual_reward = proof.elem().expect("account should exist");
1820 assert_eq!(*actual_reward, expected_reward,);
1821
1822 assert!(
1823 RewardMerkleTreeV2::verify(reward_tree_h1.commitment(), account, proof)
1824 .unwrap()
1825 .is_ok(),
1826 );
1827 }
1828 tracing::info!("Verified height=1 {num_initial_accounts} accounts with proofs",);
1829
1830 for i in 1000..1100 {
1832 let account = make_reward_account(i);
1833 let proof = storage
1834 .read()
1835 .await
1836 .unwrap()
1837 .get_path(snapshot_h1, account)
1838 .await
1839 .unwrap();
1840 assert!(proof.elem().is_none(),);
1841
1842 assert!(
1844 RewardMerkleTreeV2::non_membership_verify(
1845 reward_tree_h1.commitment(),
1846 account,
1847 proof
1848 )
1849 .unwrap(),
1850 );
1851 }
1852 tracing::info!("Height 1: Verified 100 non-membership proofs");
1853
1854 let snapshot_h2 =
1856 Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(2);
1857
1858 for i in 0..100 {
1860 let account = make_reward_account(i);
1861 let proof = storage
1862 .read()
1863 .await
1864 .unwrap()
1865 .get_path(snapshot_h2, account)
1866 .await
1867 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
1868
1869 let expected_reward = RewardAmount::from(((i + 1) * 2000) as u64);
1870 let actual_reward = proof.elem().expect("account should exist");
1871 assert_eq!(*actual_reward, expected_reward,);
1872
1873 assert!(
1874 RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
1875 .unwrap()
1876 .is_ok(),
1877 );
1878 }
1879
1880 for i in 100..1000 {
1882 let account = make_reward_account(i);
1883 let proof = storage
1884 .read()
1885 .await
1886 .unwrap()
1887 .get_path(snapshot_h2, account)
1888 .await
1889 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
1890
1891 let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
1892 let actual_reward = proof.elem().expect("account should exist");
1893 assert_eq!(*actual_reward, expected_reward,);
1894
1895 assert!(
1896 RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
1897 .unwrap()
1898 .is_ok(),
1899 );
1900 }
1901
1902 for i in 1000..1100 {
1905 let account = make_reward_account(i);
1906 let proof = storage
1907 .read()
1908 .await
1909 .unwrap()
1910 .get_path(snapshot_h2, account)
1911 .await
1912 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}"));
1913
1914 let expected_reward = RewardAmount::from(((i + 1) * 500) as u64);
1915 let actual_reward = proof.elem().expect("account should exist");
1916 assert_eq!(*actual_reward, expected_reward,);
1917
1918 assert!(
1919 RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof)
1920 .unwrap()
1921 .is_ok(),
1922 );
1923 }
1924 tracing::info!("Height 2: Verified all 1100 accounts with proofs");
1925
1926 let snapshot_h3 =
1928 Snapshot::<SeqTypes, RewardMerkleTreeV2, { RewardMerkleTreeV2::ARITY }>::Index(3);
1929
1930 for i in 0..100 {
1932 let account = make_reward_account(i);
1933 let proof = storage
1934 .read()
1935 .await
1936 .unwrap()
1937 .get_path(snapshot_h3, account)
1938 .await
1939 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
1940
1941 let expected_reward = RewardAmount::from(((i + 1) * 2000) as u64);
1942 let actual_reward = proof.elem().expect("account should exist");
1943 assert_eq!(*actual_reward, expected_reward,);
1944
1945 assert!(
1946 RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
1947 .unwrap()
1948 .is_ok(),
1949 );
1950 }
1951
1952 for i in 100..500 {
1953 let account = make_reward_account(i);
1954 let proof = storage
1955 .read()
1956 .await
1957 .unwrap()
1958 .get_path(snapshot_h3, account)
1959 .await
1960 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
1961
1962 let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
1963 let actual_reward = proof.elem().expect("account should exist");
1964 assert_eq!(*actual_reward, expected_reward,);
1965
1966 assert!(
1967 RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
1968 .unwrap()
1969 .is_ok(),
1970 );
1971 }
1972
1973 for i in 500..600 {
1975 let account = make_reward_account(i);
1976 let proof = storage
1977 .read()
1978 .await
1979 .unwrap()
1980 .get_path(snapshot_h3, account)
1981 .await
1982 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
1983
1984 let expected_reward = RewardAmount::from(((i + 1) * 3000) as u64);
1985 let actual_reward = proof.elem().expect("account should exist");
1986 assert_eq!(*actual_reward, expected_reward,);
1987
1988 assert!(
1989 RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
1990 .unwrap()
1991 .is_ok(),
1992 );
1993 }
1994
1995 for i in 600..1000 {
1997 let account = make_reward_account(i);
1998 let proof = storage
1999 .read()
2000 .await
2001 .unwrap()
2002 .get_path(snapshot_h3, account)
2003 .await
2004 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
2005
2006 let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64);
2007 let actual_reward = proof.elem().expect("account should exist");
2008 assert_eq!(*actual_reward, expected_reward,);
2009
2010 assert!(
2011 RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
2012 .unwrap()
2013 .is_ok(),
2014 );
2015 }
2016
2017 for i in 1000..1100 {
2019 let account = make_reward_account(i);
2020 let proof = storage
2021 .read()
2022 .await
2023 .unwrap()
2024 .get_path(snapshot_h3, account)
2025 .await
2026 .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}"));
2027
2028 let expected_reward = RewardAmount::from(((i + 1) * 500) as u64);
2029 let actual_reward = proof.elem().expect("account should exist");
2030 assert_eq!(*actual_reward, expected_reward,);
2031
2032 assert!(
2033 RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof)
2034 .unwrap()
2035 .is_ok(),
2036 );
2037 }
2038 tracing::info!("Height 3: Verified all 1100 accounts with proofs");
2039
2040 for i in 1100..1110 {
2042 let account = make_reward_account(i);
2043 let proof = storage
2044 .read()
2045 .await
2046 .unwrap()
2047 .get_path(snapshot_h3, account)
2048 .await
2049 .unwrap();
2050
2051 assert!(
2052 proof.elem().is_none(),
2053 "Account {i} should not exist at height 3"
2054 );
2055
2056 assert!(
2057 RewardMerkleTreeV2::non_membership_verify(
2058 reward_tree_h3.commitment(),
2059 account,
2060 proof
2061 )
2062 .unwrap(),
2063 );
2064 }
2065 tracing::info!("Height 3: Verified 10 non-membership proofs");
2066 }
2067
2068 #[tokio::test]
2069 #[test_log::test]
2070 async fn test_merkle_proof_gc() {
2071 let db = TmpDb::init().await;
2072 let opt = tmp_options(&db);
2073 let cfg = Config::try_from(&opt).expect("failed to create config from options");
2074 let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2075 .await
2076 .expect("failed to connect to storage");
2077
2078 let account = vec![0; 32];
2079
2080 let leaves = leaf_chain(0..=2, DRB_AND_HEADER_UPGRADE_VERSION).await;
2082 let mut tx = storage.write().await.unwrap();
2083 for leaf in &leaves {
2084 tx.insert_leaf(leaf).await.unwrap();
2085 }
2086 Transaction::commit(tx).await.unwrap();
2087
2088 for h in 0..=2 {
2090 storage
2091 .persist_proofs(h, [(account.clone(), h.to_le_bytes().to_vec())].into_iter())
2092 .await
2093 .unwrap();
2094 }
2095
2096 storage.garbage_collect(2).await.unwrap();
2098
2099 assert_eq!(
2101 storage.load_proof(2, account.clone(), 0).await.unwrap(),
2102 2u64.to_le_bytes()
2103 );
2104
2105 for h in 0..2 {
2107 let err = storage.load_proof(h, account.clone(), 0).await.unwrap_err();
2108 assert!(err.to_string().contains("Missing proof"), "{err:#}");
2109 }
2110
2111 storage.garbage_collect(3).await.unwrap();
2113 let err = storage.load_proof(2, account, 0).await.unwrap_err();
2114 assert!(err.to_string().contains("Missing proof"), "{err:#}");
2115 }
2116
2117 #[tokio::test]
2118 #[test_log::test]
2119 async fn test_load_proof_v5_epoch_boundary() {
2120 let db = TmpDb::init().await;
2121 let opt = tmp_options(&db);
2122 let cfg = Config::try_from(&opt).expect("failed to create config from options");
2123 let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2124 .await
2125 .expect("failed to connect to storage");
2126
2127 let epoch_height = 10u64;
2128 let account = vec![0; 32];
2129
2130 let leaves = leaf_chain(0..=15, EPOCH_REWARD_VERSION).await;
2132 let mut tx = storage.write().await.unwrap();
2133 for leaf in &leaves {
2134 tx.insert_leaf(leaf).await.unwrap();
2135 }
2136 Transaction::commit(tx).await.unwrap();
2137
2138 let boundary_proof = b"proof_at_10".to_vec();
2140 {
2141 let mut tx = storage.write().await.unwrap();
2142 tx.upsert(
2143 "reward_merkle_tree_v2_proofs",
2144 ["height", "account", "proof"],
2145 ["height", "account"],
2146 [(10i64, account.clone(), boundary_proof.clone())],
2147 )
2148 .await
2149 .unwrap();
2150 Transaction::commit(tx).await.unwrap();
2151 }
2152
2153 assert_eq!(
2155 storage
2156 .load_proof(10, account.clone(), epoch_height)
2157 .await
2158 .unwrap(),
2159 boundary_proof,
2160 );
2161
2162 assert_eq!(
2165 storage
2166 .load_proof(15, account.clone(), epoch_height)
2167 .await
2168 .unwrap(),
2169 boundary_proof,
2170 );
2171 }
2172
2173 #[tokio::test]
2174 #[test_log::test]
2175 async fn test_load_proof_v4_to_v5_upgrade_boundary() {
2176 let db = TmpDb::init().await;
2177 let opt = tmp_options(&db);
2178 let cfg = Config::try_from(&opt).expect("failed to create config from options");
2179 let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2180 .await
2181 .expect("failed to connect to storage");
2182
2183 let epoch_height = 10u64;
2184 let account = vec![0; 32];
2185
2186 let upgrade = Upgrade::new(DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION);
2189 let leaves = leaf_chain_with_upgrade(0..=15, 11, upgrade).await;
2190 {
2191 let mut tx = storage.write().await.unwrap();
2192 for leaf in &leaves {
2193 tx.insert_leaf(leaf).await.unwrap();
2194 }
2195 Transaction::commit(tx).await.unwrap();
2196 }
2197
2198 storage
2201 .load_proof(15, account.clone(), epoch_height)
2202 .await
2203 .unwrap_err();
2204
2205 let v4_proof = b"v4_proof_at_5".to_vec();
2206 {
2207 let mut tx = storage.write().await.unwrap();
2208 tx.upsert(
2209 "reward_merkle_tree_v2_proofs",
2210 ["height", "account", "proof"],
2211 ["height", "account"],
2212 [(5i64, account.clone(), v4_proof.clone())],
2213 )
2214 .await
2215 .unwrap();
2216 Transaction::commit(tx).await.unwrap();
2217 }
2218 assert_eq!(
2219 storage.load_proof(5, account.clone(), 0).await.unwrap(),
2220 v4_proof,
2221 );
2222 }
2223
2224 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2225 async fn test_get_table_sizes() {
2226 use super::super::data_source::DatabaseMetadataSource;
2227
2228 let db = TmpDb::init().await;
2229 let opt = tmp_options(&db);
2230 let cfg = Config::try_from(&opt).expect("failed to create config from options");
2231 let storage = SqlStorage::connect(cfg, StorageConnectionType::Query)
2232 .await
2233 .expect("failed to connect to storage");
2234
2235 let mut tx = storage.write().await.unwrap();
2237
2238 let reward_tree = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT);
2240 insert_test_header(&mut tx, 1, &reward_tree).await;
2241
2242 tx.commit().await.unwrap();
2243
2244 let table_sizes = storage
2246 .get_table_sizes()
2247 .await
2248 .expect("get_table_sizes should succeed");
2249
2250 assert!(
2252 !table_sizes.is_empty(),
2253 "should have at least one table in the database"
2254 );
2255 }
2256}