1use std::{
2 collections::BTreeMap,
3 path::PathBuf,
4 str::FromStr,
5 sync::Arc,
6 time::{Duration, Instant},
7};
8
9use alloy::primitives::Address;
10use anyhow::{Context, bail, ensure};
11use async_trait::async_trait;
12use clap::Parser;
13use committable::Committable;
14use derivative::Derivative;
15use derive_more::derive::{From, Into};
16use either::Either;
17use espresso_types::{
18 AuthenticatedValidatorMap, BackoffParams, BlockMerkleTree, FeeMerkleTree, Leaf, Leaf2,
19 NetworkConfig, Payload, PubKey, Ratio, RegisteredValidatorMap, StakeTableHash, parse_duration,
20 parse_size,
21 traits::{EventsPersistenceRead, MembershipPersistence, StakeTuple},
22 v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence, StateCatchup},
23 v0_3::{
24 AuthenticatedValidator, EventKey, IndexedStake, RegisteredValidator, RewardAmount,
25 StakeTableEvent,
26 },
27 v0_4::{REWARD_MERKLE_TREE_V2_HEIGHT, RewardAccountV2, RewardMerkleTreeV2},
28};
29use futures::stream::StreamExt;
30use hotshot::InitializerEpochInfo;
31use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
32 DhtPersistentStorage, SerializableRecord,
33};
34use hotshot_new_protocol::message::Certificate2;
35use hotshot_query_service::{
36 availability::BlockId,
37 data_source::{
38 Transaction as _, VersionedDataSource,
39 storage::{
40 AvailabilityStorage,
41 pruning::PrunerCfg,
42 sql::{
43 Config, Db, Read, SqlStorage, StorageConnectionType, Transaction, Write,
44 include_migrations, query_as, syntax_helpers::MAX_FN,
45 },
46 },
47 },
48 fetching::{
49 Provider,
50 request::{PayloadRequest, VidCommonRequest},
51 },
52 merklized_state::MerklizedState,
53};
54use hotshot_types::{
55 data::{
56 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
57 QuorumProposalWrapperLegacy, VidCommitment, VidCommon, VidDisperseShare, VidDisperseShare0,
58 },
59 drb::{DrbInput, DrbResult},
60 event::{Event, EventType, HotShotAction, LeafInfo},
61 message::{Proposal, convert_proposal},
62 new_protocol::CoordinatorEvent,
63 simple_certificate::{
64 CertificatePair, LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
65 NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
66 },
67 traits::{
68 block_contents::{BlockHeader, BlockPayload},
69 metrics::Metrics,
70 },
71 vote::HasViewNumber,
72};
73use indexmap::IndexMap;
74use itertools::Itertools;
75use jf_merkle_tree_compat::MerkleTreeScheme;
76use sqlx::{Executor, QueryBuilder, Row, query};
77
78use crate::{
79 NodeType, RECENT_STAKE_TABLES_LIMIT, SeqTypes, ViewNumber,
80 api::RewardMerkleTreeV2Data,
81 catchup::SqlStateCatchup,
82 persistence::{migrate_network_config, persistence_metrics::PersistenceMetricsValue},
83};
84
85#[derive(Parser, Clone, Derivative)]
87#[derivative(Debug)]
88pub struct PostgresOptions {
89 #[clap(long, env = "ESPRESSO_NODE_POSTGRES_HOST")]
91 pub(crate) host: Option<String>,
92
93 #[clap(long, env = "ESPRESSO_NODE_POSTGRES_PORT")]
95 pub(crate) port: Option<u16>,
96
97 #[clap(long, env = "ESPRESSO_NODE_POSTGRES_DATABASE")]
99 pub(crate) database: Option<String>,
100
101 #[clap(long, env = "ESPRESSO_NODE_POSTGRES_USER")]
103 pub(crate) user: Option<String>,
104
105 #[clap(long, env = "ESPRESSO_NODE_POSTGRES_PASSWORD")]
107 #[derivative(Debug = "ignore")]
109 pub(crate) password: Option<String>,
110
111 #[clap(long, env = "ESPRESSO_NODE_POSTGRES_USE_TLS")]
113 pub(crate) use_tls: bool,
114
115 #[clap(
122 long,
123 env = "ESPRESSO_NODE_POSTGRES_NO_DEFERRABLE",
124 default_value_t = false
125 )]
126 pub(crate) no_deferrable: bool,
127}
128
129impl Default for PostgresOptions {
130 fn default() -> Self {
131 Self::parse_from(std::iter::empty::<String>())
132 }
133}
134
135#[derive(Parser, Clone, Derivative, Default, From, Into)]
136#[derivative(Debug)]
137pub struct SqliteOptions {
138 #[clap(
141 long,
142 env = "ESPRESSO_NODE_STORAGE_PATH",
143 value_parser = build_sqlite_path
144 )]
145 pub(crate) path: PathBuf,
146}
147
148pub fn build_sqlite_path(path: &str) -> anyhow::Result<PathBuf> {
149 let sub_dir = PathBuf::from_str(path)?.join("sqlite");
150
151 if !sub_dir.exists() {
153 std::fs::create_dir_all(&sub_dir)
154 .with_context(|| format!("failed to create directory: {sub_dir:?}"))?;
155 }
156
157 Ok(sub_dir.join("database"))
158}
159
160#[derive(Parser, Clone, Derivative, From, Into)]
162#[derivative(Debug)]
163pub struct Options {
164 #[cfg(not(feature = "embedded-db"))]
165 #[clap(flatten)]
166 pub(crate) postgres_options: PostgresOptions,
167
168 #[cfg(feature = "embedded-db")]
169 #[clap(flatten)]
170 pub(crate) sqlite_options: SqliteOptions,
171
172 #[derivative(Debug = "ignore")]
185 pub(crate) uri: Option<String>,
186
187 #[clap(long, env = "ESPRESSO_NODE_DATABASE_PRUNE")]
196 pub(crate) prune: bool,
197
198 #[clap(flatten)]
200 pub(crate) pruning: PruningOptions,
201
202 #[clap(flatten)]
204 pub(crate) consensus_pruning: ConsensusPruningOptions,
205
206 #[clap(long, env = "ESPRESSO_NODE_FETCH_RATE_LIMIT")]
208 pub(crate) fetch_rate_limit: Option<usize>,
209
210 #[clap(long, env = "ESPRESSO_NODE_ACTIVE_FETCH_DELAY", value_parser = parse_duration)]
212 pub(crate) active_fetch_delay: Option<Duration>,
213
214 #[clap(long, env = "ESPRESSO_NODE_CHUNK_FETCH_DELAY", value_parser = parse_duration)]
216 pub(crate) chunk_fetch_delay: Option<Duration>,
217
218 #[clap(long, env = "ESPRESSO_NODE_SYNC_STATUS_CHUNK_SIZE")]
221 pub(crate) sync_status_chunk_size: Option<usize>,
222
223 #[clap(long, env = "ESPRESSO_NODE_SYNC_STATUS_TTL", value_parser = parse_duration)]
225 pub(crate) sync_status_ttl: Option<Duration>,
226
227 #[clap(long, env = "ESPRESSO_NODE_PROACTIVE_SCAN_CHUNK_SIZE")]
229 pub(crate) proactive_scan_chunk_size: Option<usize>,
230
231 #[clap(long, env = "ESPRESSO_NODE_PROACTIVE_SCAN_INTERVAL", value_parser = parse_duration)]
233 pub(crate) proactive_scan_interval: Option<Duration>,
234
235 #[clap(long, env = "ESPRESSO_NODE_DISABLE_PROACTIVE_FETCHING")]
237 pub(crate) disable_proactive_fetching: bool,
238
239 #[clap(long, env = "ESPRESSO_NODE_ARCHIVE", conflicts_with = "prune")]
246 pub(crate) archive: bool,
247
248 #[clap(
250 long,
251 env = "ESPRESSO_NODE_LIGHTWEIGHT",
252 default_value_t = false,
253 conflicts_with = "archive"
254 )]
255 pub(crate) lightweight: bool,
256
257 #[clap(long, env = "ESPRESSO_NODE_DATABASE_IDLE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "10m")]
262 pub(crate) idle_connection_timeout: Duration,
263
264 #[clap(long, env = "ESPRESSO_NODE_DATABASE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "30m")]
271 pub(crate) connection_timeout: Duration,
272
273 #[clap(long, env = "ESPRESSO_NODE_DATABASE_SLOW_STATEMENT_THRESHOLD", value_parser = parse_duration, default_value = "1s")]
274 pub(crate) slow_statement_threshold: Duration,
275
276 #[clap(long, env = "ESPRESSO_NODE_DATABASE_STATEMENT_TIMEOUT", value_parser = parse_duration, default_value = "10m")]
281 pub(crate) statement_timeout: Duration,
282
283 #[clap(
289 long,
290 env = "ESPRESSO_NODE_DATABASE_MIN_CONNECTIONS",
291 default_value = "0"
292 )]
293 pub(crate) min_connections: u32,
294
295 #[cfg(not(feature = "embedded-db"))]
298 #[clap(long, env = "ESPRESSO_NODE_DATABASE_QUERY_MIN_CONNECTIONS", default_value = None)]
299 pub(crate) query_min_connections: Option<u32>,
300
301 #[clap(
306 long,
307 env = "ESPRESSO_NODE_DATABASE_MAX_CONNECTIONS",
308 default_value = "25"
309 )]
310 pub(crate) max_connections: u32,
311
312 #[cfg(not(feature = "embedded-db"))]
315 #[clap(long, env = "ESPRESSO_NODE_DATABASE_QUERY_MAX_CONNECTIONS", default_value = None)]
316 pub(crate) query_max_connections: Option<u32>,
317
318 #[clap(skip)]
326 pub(crate) pool: Option<sqlx::Pool<Db>>,
327}
328
329impl Default for Options {
330 fn default() -> Self {
331 Self::parse_from(std::iter::empty::<String>())
332 }
333}
334
335#[cfg(not(feature = "embedded-db"))]
336impl From<PostgresOptions> for Config {
337 fn from(opt: PostgresOptions) -> Self {
338 let mut cfg = Config::default();
339
340 if let Some(host) = opt.host {
341 cfg = cfg.host(host);
342 }
343
344 if let Some(port) = opt.port {
345 cfg = cfg.port(port);
346 }
347
348 if let Some(database) = &opt.database {
349 cfg = cfg.database(database);
350 }
351
352 if let Some(user) = &opt.user {
353 cfg = cfg.user(user);
354 }
355
356 if let Some(password) = &opt.password {
357 cfg = cfg.password(password);
358 }
359
360 if opt.use_tls {
361 cfg = cfg.tls();
362 }
363
364 cfg = cfg.max_connections(20);
365 cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
366 cfg = cfg.connection_timeout(Duration::from_secs(10240));
367 cfg = cfg.slow_statement_threshold(Duration::from_secs(1));
368 cfg = cfg.statement_timeout(Duration::from_secs(600)); hotshot_query_service::data_source::storage::sql::set_no_deferrable_on_read(
371 opt.no_deferrable,
372 );
373
374 cfg
375 }
376}
377
378#[cfg(feature = "embedded-db")]
379impl From<SqliteOptions> for Config {
380 fn from(opt: SqliteOptions) -> Self {
381 let mut cfg = Config::default();
382
383 cfg = cfg.db_path(opt.path);
384
385 cfg = cfg.max_connections(20);
386 cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
387 cfg = cfg.connection_timeout(Duration::from_secs(10240));
388 cfg = cfg.slow_statement_threshold(Duration::from_secs(2));
389 cfg = cfg.statement_timeout(Duration::from_secs(600));
390 cfg
391 }
392}
393
394#[cfg(not(feature = "embedded-db"))]
395impl From<PostgresOptions> for Options {
396 fn from(opt: PostgresOptions) -> Self {
397 Options {
398 postgres_options: opt,
399 max_connections: 20,
400 idle_connection_timeout: Duration::from_secs(120),
401 connection_timeout: Duration::from_secs(10240),
402 slow_statement_threshold: Duration::from_secs(1),
403 statement_timeout: Duration::from_secs(600),
404 ..Default::default()
405 }
406 }
407}
408
409#[cfg(feature = "embedded-db")]
410impl From<SqliteOptions> for Options {
411 fn from(opt: SqliteOptions) -> Self {
412 Options {
413 sqlite_options: opt,
414 max_connections: 5,
415 idle_connection_timeout: Duration::from_secs(120),
416 connection_timeout: Duration::from_secs(10240),
417 slow_statement_threshold: Duration::from_secs(1),
418 uri: None,
419 statement_timeout: Duration::from_secs(600),
420 prune: false,
421 pruning: Default::default(),
422 consensus_pruning: Default::default(),
423 fetch_rate_limit: None,
424 active_fetch_delay: None,
425 chunk_fetch_delay: None,
426 sync_status_chunk_size: None,
427 sync_status_ttl: None,
428 proactive_scan_chunk_size: None,
429 proactive_scan_interval: None,
430 disable_proactive_fetching: false,
431 archive: false,
432 lightweight: false,
433 min_connections: 0,
434 pool: None,
435 }
436 }
437}
438impl TryFrom<&Options> for Config {
439 type Error = anyhow::Error;
440
441 fn try_from(opt: &Options) -> Result<Self, Self::Error> {
442 let mut cfg = match &opt.uri {
443 Some(uri) => uri.parse()?,
444 None => Self::default(),
445 };
446
447 if let Some(pool) = &opt.pool {
448 cfg = cfg.pool(pool.clone());
449 }
450
451 cfg = cfg.max_connections(opt.max_connections);
452 cfg = cfg.idle_connection_timeout(opt.idle_connection_timeout);
453 cfg = cfg.min_connections(opt.min_connections);
454
455 #[cfg(not(feature = "embedded-db"))]
456 {
457 cfg =
458 cfg.query_max_connections(opt.query_max_connections.unwrap_or(opt.max_connections));
459 cfg =
460 cfg.query_min_connections(opt.query_min_connections.unwrap_or(opt.min_connections));
461
462 hotshot_query_service::data_source::storage::sql::set_no_deferrable_on_read(
463 opt.postgres_options.no_deferrable,
464 );
465 }
466
467 cfg = cfg.connection_timeout(opt.connection_timeout);
468 cfg = cfg.slow_statement_threshold(opt.slow_statement_threshold);
469 cfg = cfg.statement_timeout(opt.statement_timeout);
470
471 #[cfg(not(feature = "embedded-db"))]
472 {
473 cfg = cfg.migrations(include_migrations!(
474 "$CARGO_MANIFEST_DIR/api/migrations/postgres"
475 ));
476
477 let pg_options = &opt.postgres_options;
478
479 if let Some(host) = &pg_options.host {
480 cfg = cfg.host(host.clone());
481 }
482
483 if let Some(port) = pg_options.port {
484 cfg = cfg.port(port);
485 }
486
487 if let Some(database) = &pg_options.database {
488 cfg = cfg.database(database);
489 }
490
491 if let Some(user) = &pg_options.user {
492 cfg = cfg.user(user);
493 }
494
495 if let Some(password) = &pg_options.password {
496 cfg = cfg.password(password);
497 }
498
499 if pg_options.use_tls {
500 cfg = cfg.tls();
501 }
502 }
503
504 #[cfg(feature = "embedded-db")]
505 {
506 cfg = cfg.migrations(include_migrations!(
507 "$CARGO_MANIFEST_DIR/api/migrations/sqlite"
508 ));
509
510 cfg = cfg.db_path(opt.sqlite_options.path.clone());
511 }
512
513 if opt.prune {
514 cfg = cfg.pruner_cfg(PrunerCfg::from(opt.pruning))?;
515 }
516 if opt.archive {
517 cfg = cfg.archive();
518 }
519
520 Ok(cfg)
521 }
522}
523
524#[derive(Parser, Clone, Copy, Debug)]
526pub struct PruningOptions {
527 #[clap(long, env = "ESPRESSO_NODE_PRUNER_PRUNING_THRESHOLD", value_parser = parse_size)]
531 pub(crate) pruning_threshold: Option<u64>,
532
533 #[clap(
536 long,
537 env = "ESPRESSO_NODE_PRUNER_MINIMUM_RETENTION",
538 value_parser = parse_duration,
539 )]
540 pub(crate) minimum_retention: Option<Duration>,
541
542 #[clap(
545 long,
546 env = "ESPRESSO_NODE_PRUNER_TARGET_RETENTION",
547 value_parser = parse_duration,
548 )]
549 pub(crate) target_retention: Option<Duration>,
550
551 #[clap(long, env = "ESPRESSO_NODE_PRUNER_BATCH_SIZE")]
554 pub(crate) batch_size: Option<u64>,
555
556 #[clap(long, env = "ESPRESSO_NODE_PRUNER_MAX_USAGE")]
562 pub(crate) max_usage: Option<u16>,
563
564 #[clap(
566 long,
567 env = "ESPRESSO_NODE_PRUNER_INTERVAL",
568 value_parser = parse_duration,
569 )]
570 pub(crate) interval: Option<Duration>,
571
572 #[clap(long, env = "ESPRESSO_NODE_PRUNER_INCREMENTAL_VACUUM_PAGES")]
576 pub(crate) pages: Option<u64>,
577}
578
579impl Default for PruningOptions {
580 fn default() -> Self {
581 Self::parse_from(std::iter::empty::<String>())
582 }
583}
584
585impl From<PruningOptions> for PrunerCfg {
586 fn from(opt: PruningOptions) -> Self {
587 let mut cfg = PrunerCfg::new();
588 if let Some(threshold) = opt.pruning_threshold {
589 cfg = cfg.with_pruning_threshold(threshold);
590 }
591 if let Some(min) = opt.minimum_retention {
592 cfg = cfg.with_minimum_retention(min);
593 }
594 if let Some(target) = opt.target_retention {
595 cfg = cfg.with_target_retention(target);
596 }
597 if let Some(batch) = opt.batch_size {
598 cfg = cfg.with_batch_size(batch);
599 }
600 if let Some(max) = opt.max_usage {
601 cfg = cfg.with_max_usage(max);
602 }
603 if let Some(interval) = opt.interval {
604 cfg = cfg.with_interval(interval);
605 }
606
607 if let Some(pages) = opt.pages {
608 cfg = cfg.with_incremental_vacuum_pages(pages)
609 }
610
611 cfg = cfg.with_state_tables(vec![
612 BlockMerkleTree::state_type().to_string(),
613 FeeMerkleTree::state_type().to_string(),
614 ]);
615
616 cfg
617 }
618}
619
620#[derive(Parser, Clone, Copy, Debug)]
622pub struct ConsensusPruningOptions {
623 #[clap(
639 name = "TARGET_RETENTION",
640 long = "consensus-storage-target-retention",
641 env = "ESPRESSO_NODE_CONSENSUS_STORAGE_TARGET_RETENTION",
642 default_value = "302000"
643 )]
644 pub(crate) target_retention: u64,
645
646 #[clap(
657 name = "MINIMUM_RETENTION",
658 long = "consensus-storage-minimum-retention",
659 env = "ESPRESSO_NODE_CONSENSUS_STORAGE_MINIMUM_RETENTION",
660 default_value = "130000"
661 )]
662 pub(crate) minimum_retention: u64,
663
664 #[clap(
669 name = "TARGET_USAGE",
670 long = "consensus-storage-target-usage",
671 env = "ESPRESSO_NODE_CONSENSUS_STORAGE_TARGET_USAGE",
672 default_value = "1000000000"
673 )]
674 pub(crate) target_usage: u64,
675}
676
677impl Default for ConsensusPruningOptions {
678 fn default() -> Self {
679 Self::parse_from(std::iter::empty::<String>())
680 }
681}
682
683#[async_trait]
684impl PersistenceOptions for Options {
685 type Persistence = Persistence;
686
687 fn set_view_retention(&mut self, view_retention: u64) {
688 self.consensus_pruning.target_retention = view_retention;
689 self.consensus_pruning.minimum_retention = view_retention;
690 }
691
692 async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
693 let config = (&*self).try_into()?;
694 let persistence = Persistence {
695 db: SqlStorage::connect(config, StorageConnectionType::Sequencer).await?,
696 gc_opt: self.consensus_pruning,
697 internal_metrics: PersistenceMetricsValue::default(),
698 };
699 persistence.migrate_quorum_proposal_leaf_hashes().await?;
700 self.pool = Some(persistence.db.pool());
701 Ok(persistence)
702 }
703
704 async fn reset(self) -> anyhow::Result<()> {
705 SqlStorage::connect(
706 Config::try_from(&self)?.reset_schema(),
707 StorageConnectionType::Sequencer,
708 )
709 .await?;
710 Ok(())
711 }
712}
713
714#[derive(Debug, Clone, Copy)]
715pub enum DataMigration {
716 X25519Keys,
717}
718
719impl DataMigration {
720 pub fn as_str(&self) -> &'static str {
721 match self {
722 Self::X25519Keys => "x25519_keys",
723 }
724 }
725}
726
727#[derive(Clone, Debug)]
729pub struct Persistence {
730 db: SqlStorage,
731 gc_opt: ConsensusPruningOptions,
732 internal_metrics: PersistenceMetricsValue,
734}
735
736const PG_SERIALIZATION_FAILURE_CODE: &str = "40001";
739
740#[derive(Debug)]
741struct DecidedLeaf {
742 info: LeafInfo<SeqTypes>,
743 cert: CertificatePair<SeqTypes>,
744}
745
746fn decide_events_from_chain(
747 mut chain: Vec<DecidedLeaf>,
748 cert2: Option<Certificate2<SeqTypes>>,
749 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
750) -> Vec<CoordinatorEvent<SeqTypes>> {
751 let split_idx = chain
752 .iter()
753 .position(|leaf| leaf.info.leaf.block_header().version() < versions::NEW_PROTOCOL_VERSION)
754 .unwrap_or(chain.len());
755 let legacy_leaves = chain.split_off(split_idx);
756 let new_leaves = chain;
757
758 let mut events = Vec::with_capacity(2);
759 if !legacy_leaves.is_empty() {
760 let committing_qc = legacy_leaves[0].cert.clone();
761 let deciding_qc = new_leaves
762 .is_empty()
763 .then_some(deciding_qc)
764 .flatten()
765 .filter(|qc| qc.view_number() == committing_qc.view_number() + 1);
766 let view_number = legacy_leaves[0].info.leaf.view_number();
767 let leaf_chain = legacy_leaves
768 .into_iter()
769 .map(|leaf| leaf.info)
770 .collect::<Vec<_>>();
771
772 events.push(CoordinatorEvent::LegacyEvent(Event {
773 view_number,
774 event: EventType::Decide {
775 leaf_chain: Arc::new(leaf_chain),
776 committing_qc: Arc::new(committing_qc),
777 deciding_qc,
778 block_size: None,
779 },
780 }));
781 }
782
783 if new_leaves.is_empty() && cert2.is_some() {
784 tracing::warn!(
785 "decide_events_from_chain called with cert2 but no new-protocol leaves; cert2 will be \
786 dropped"
787 );
788 }
789
790 if !new_leaves.is_empty() {
791 let cert1 = new_leaves[0].cert.qc().clone();
797 let leaf_infos = new_leaves.into_iter().map(|leaf| leaf.info).collect();
798
799 events.push(CoordinatorEvent::NewDecide {
800 leaf_infos,
801 cert1,
802 cert2,
803 });
804 }
805
806 events
807}
808
809impl Persistence {
810 async fn migrate_quorum_proposal_leaf_hashes(&self) -> anyhow::Result<()> {
817 WRITE_BACKOFF
818 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
819 let mut tx = self.db.write().await?;
820
821 let mut proposals = tx.fetch("SELECT * FROM quorum_proposals");
822
823 let mut updates = vec![];
824 while let Some(row) = proposals.next().await {
825 let row = row?;
826
827 let hash: Option<String> = row.try_get("leaf_hash")?;
828 if hash.is_none() {
829 let view: i64 = row.try_get("view")?;
830 let data: Vec<u8> = row.try_get("data")?;
831 let proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
832 bincode::deserialize(&data)?;
833 let leaf = Leaf::from_quorum_proposal(&proposal.data);
834 let leaf_hash = Committable::commit(&leaf);
835 tracing::info!(view, %leaf_hash, "populating quorum proposal leaf hash");
836 updates.push((view, leaf_hash.to_string()));
837 }
838 }
839 drop(proposals);
840
841 tx.upsert("quorum_proposals", ["view", "leaf_hash"], ["view"], updates)
842 .await?;
843
844 tx.commit().await
845 })
846 .await
847 }
848
849 async fn is_migration_complete(&self, name: &str, table_name: &str) -> anyhow::Result<bool> {
850 let mut tx = self.db.read().await?;
851 let (completed,): (bool,) =
852 query_as("SELECT completed FROM data_migrations WHERE name = $1 AND table_name = $2")
853 .bind(name)
854 .bind(table_name)
855 .fetch_one(tx.as_mut())
856 .await
857 .context("migration tracking row missing - schema may be out of sync")?;
858 Ok(completed)
859 }
860
861 async fn mark_migration_complete(
862 tx: &mut Transaction<Write>,
863 name: &str,
864 table_name: &str,
865 migrated_rows: usize,
866 ) -> anyhow::Result<()> {
867 tx.execute(
868 query(
869 "UPDATE data_migrations SET completed = true, migrated_rows = $1 WHERE name = $2 \
870 AND table_name = $3",
871 )
872 .bind(migrated_rows as i64)
873 .bind(name)
874 .bind(table_name),
875 )
876 .await?;
877 Ok(())
878 }
879
880 async fn generate_decide_events(
881 &self,
882 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
883 consumer: &impl EventConsumer,
884 ) -> anyhow::Result<()> {
885 let mut last_processed_view: Option<i64> = self
886 .db
887 .read()
888 .await?
889 .fetch_optional("SELECT last_processed_view FROM event_stream WHERE id = 1 LIMIT 1")
890 .await?
891 .map(|row| row.get("last_processed_view"));
892 loop {
893 let mut tx = self.db.read().await?;
900
901 let from_view = match last_processed_view {
906 Some(v) => v + 1,
907 None => 0,
908 };
909 tracing::debug!(?from_view, "generate decide event");
910
911 let mut parent = None;
912 let mut rows = query(
913 "SELECT leaf, qc, next_epoch_qc FROM anchor_leaf2 WHERE view >= $1 ORDER BY view",
914 )
915 .bind(from_view)
916 .fetch(tx.as_mut());
917 let mut leaves: Vec<(Leaf2, CertificatePair<SeqTypes>)> = vec![];
918 let mut final_qc = None;
919 while let Some(row) = rows.next().await {
920 let row = match row {
921 Ok(row) => row,
922 Err(err) => {
923 tracing::warn!("error loading row: {err:#}");
926 break;
927 },
928 };
929
930 let leaf_data: Vec<u8> = row.get("leaf");
931 let leaf = bincode::deserialize::<Leaf2>(&leaf_data)?;
932 let qc_data: Vec<u8> = row.get("qc");
933 let qc = bincode::deserialize::<QuorumCertificate2<SeqTypes>>(&qc_data)?;
934 let next_epoch_qc = match row.get::<Option<Vec<u8>>, _>("next_epoch_qc") {
935 Some(bytes) => {
936 Some(bincode::deserialize::<NextEpochQuorumCertificate2<SeqTypes>>(&bytes)?)
937 },
938 None => None,
939 };
940 let height = leaf.block_header().block_number();
941
942 if let Some(parent) = parent
946 && height != parent + 1
947 {
948 tracing::debug!(
949 height,
950 parent,
951 "ending decide event at non-consecutive leaf"
952 );
953 break;
954 }
955 parent = Some(height);
956 let cert = CertificatePair::new(qc, next_epoch_qc);
957 final_qc = Some(cert.clone());
958 leaves.push((leaf, cert));
959 }
960 drop(rows);
961
962 let Some(final_qc) = final_qc else {
963 tracing::debug!(from_view, "no new leaves at decide");
965 return Ok(());
966 };
967
968 let from_view = leaves[0].0.view_number();
971 let to_view = leaves[leaves.len() - 1].0.view_number();
972
973 let mut vid_shares = tx
975 .fetch_all(
976 query("SELECT view, data FROM vid_share2 where view >= $1 AND view <= $2")
977 .bind(from_view.u64() as i64)
978 .bind(to_view.u64() as i64),
979 )
980 .await?
981 .into_iter()
982 .map(|row| {
983 let view: i64 = row.get("view");
984 let data: Vec<u8> = row.get("data");
985 let vid_proposal = bincode::deserialize::<
986 Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
987 >(&data)?;
988 Ok((view as u64, vid_proposal))
989 })
990 .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
991
992 let mut da_proposals = tx
994 .fetch_all(
995 query("SELECT view, data FROM da_proposal2 where view >= $1 AND view <= $2")
996 .bind(from_view.u64() as i64)
997 .bind(to_view.u64() as i64),
998 )
999 .await?
1000 .into_iter()
1001 .map(|row| {
1002 let view: i64 = row.get("view");
1003 let data: Vec<u8> = row.get("data");
1004 let da_proposal =
1005 bincode::deserialize::<Proposal<SeqTypes, DaProposal2<SeqTypes>>>(&data)?;
1006 Ok((view as u64, da_proposal.data))
1007 })
1008 .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
1009
1010 let state_certs = Self::load_state_certs(&mut tx, from_view, to_view)
1012 .await
1013 .inspect_err(|err| {
1014 tracing::error!(
1015 ?from_view,
1016 ?to_view,
1017 "failed to load state certificates. error={err:#}"
1018 );
1019 })?;
1020
1021 let cert2 = tx
1022 .fetch_optional(
1023 query("SELECT data FROM decided_cert2 WHERE view = $1")
1024 .bind(to_view.u64() as i64),
1025 )
1026 .await?
1027 .map(|row| {
1028 let bytes: Vec<u8> = row.get("data");
1029 bincode::deserialize::<Certificate2<SeqTypes>>(&bytes)
1030 .context("deserializing decided cert2")
1031 })
1032 .transpose()?;
1033 drop(tx);
1034
1035 let chain = leaves
1037 .into_iter()
1038 .rev()
1040 .map(|(mut leaf, cert)| {
1041 let view = leaf.view_number();
1042
1043 let vid_proposal = vid_shares.remove(&view);
1045 if vid_proposal.is_none() {
1046 tracing::debug!(?view, "VID share not available at decide");
1047 }
1048 let vid_share = vid_proposal.as_ref().map(|proposal| proposal.data.clone());
1049
1050 if let Some(proposal) = da_proposals.remove(&view) {
1052 let payload =
1053 Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata);
1054 leaf.fill_block_payload_unchecked(payload);
1055 } else if view == ViewNumber::genesis() {
1056 leaf.fill_block_payload_unchecked(Payload::empty().0);
1059 } else {
1060 tracing::debug!(?view, "DA proposal not available at decide");
1061 }
1062
1063 let state_cert = state_certs.get(&view).cloned();
1064
1065 let info = LeafInfo {
1066 leaf,
1067 vid_share,
1068 state_cert,
1069 state: Default::default(),
1072 delta: Default::default(),
1073 };
1074 DecidedLeaf { info, cert }
1075 })
1076 .collect();
1077
1078 tracing::debug!(
1079 ?from_view,
1080 ?to_view,
1081 ?final_qc,
1082 ?chain,
1083 "generating decide event"
1084 );
1085
1086 for event in decide_events_from_chain(chain, cert2, deciding_qc.clone()) {
1087 consumer.handle_event(&event).await?;
1088 }
1089
1090 let from_view_i64 = from_view.u64() as i64;
1091 let to_view_i64 = to_view.u64() as i64;
1092 let serialized_state_certs = state_certs
1093 .into_iter()
1094 .map(|(epoch, cert)| Ok((epoch as i64, bincode::serialize(&cert)?)))
1095 .collect::<anyhow::Result<Vec<(i64, Vec<u8>)>>>()?;
1096
1097 WRITE_BACKOFF
1103 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1104 let mut tx = self.db.write().await?;
1105 tx.upsert(
1106 "event_stream",
1107 ["id", "last_processed_view"],
1108 ["id"],
1109 [(1i32, to_view_i64)],
1110 )
1111 .await?;
1112
1113 for (epoch, state_cert_bytes) in &serialized_state_certs {
1115 tx.upsert(
1116 "finalized_state_cert",
1117 ["epoch", "state_cert"],
1118 ["epoch"],
1119 [(*epoch, state_cert_bytes.clone())],
1120 )
1121 .await?;
1122 }
1123
1124 tx.execute(
1126 query("DELETE FROM vid_share2 where view >= $1 AND view <= $2")
1127 .bind(from_view_i64)
1128 .bind(to_view_i64),
1129 )
1130 .await?;
1131 tx.execute(
1132 query("DELETE FROM da_proposal2 where view >= $1 AND view <= $2")
1133 .bind(from_view_i64)
1134 .bind(to_view_i64),
1135 )
1136 .await?;
1137 tx.execute(
1138 query("DELETE FROM quorum_proposals2 where view >= $1 AND view <= $2")
1139 .bind(from_view_i64)
1140 .bind(to_view_i64),
1141 )
1142 .await?;
1143 tx.execute(
1144 query("DELETE FROM quorum_certificate2 where view >= $1 AND view <= $2")
1145 .bind(from_view_i64)
1146 .bind(to_view_i64),
1147 )
1148 .await?;
1149 tx.execute(
1150 query("DELETE FROM state_cert where view >= $1 AND view <= $2")
1151 .bind(from_view_i64)
1152 .bind(to_view_i64),
1153 )
1154 .await?;
1155 tx.execute(
1156 query("DELETE FROM decided_cert2 where view >= $1 AND view <= $2")
1157 .bind(from_view_i64)
1158 .bind(to_view_i64),
1159 )
1160 .await?;
1161
1162 tx.execute(
1166 query("DELETE FROM anchor_leaf2 WHERE view >= $1 AND view < $2")
1167 .bind(from_view_i64)
1168 .bind(to_view_i64),
1169 )
1170 .await?;
1171
1172 tx.commit().await?;
1173 Ok(())
1174 })
1175 .await?;
1176 last_processed_view = Some(to_view_i64);
1177 }
1178 }
1179
1180 async fn load_state_certs(
1181 tx: &mut Transaction<Read>,
1182 from_view: ViewNumber,
1183 to_view: ViewNumber,
1184 ) -> anyhow::Result<BTreeMap<u64, LightClientStateUpdateCertificateV2<SeqTypes>>> {
1185 let rows = tx
1186 .fetch_all(
1187 query("SELECT view, state_cert FROM state_cert WHERE view >= $1 AND view <= $2")
1188 .bind(from_view.u64() as i64)
1189 .bind(to_view.u64() as i64),
1190 )
1191 .await?;
1192
1193 let mut result = BTreeMap::new();
1194
1195 for row in rows {
1196 let data: Vec<u8> = row.get("state_cert");
1197
1198 let cert: LightClientStateUpdateCertificateV2<SeqTypes> = bincode::deserialize(&data)
1199 .or_else(|err_v2| {
1200 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&data)
1201 .map(Into::into)
1202 .context(format!(
1203 "Failed to deserialize LightClientStateUpdateCertificate: with v1 and v2. \
1204 error: {err_v2}"
1205 ))
1206 })?;
1207
1208 result.insert(cert.epoch.u64(), cert);
1209 }
1210
1211 Ok(result)
1212 }
1213
1214 #[tracing::instrument(skip(self))]
1215 async fn prune(&self, cur_view: ViewNumber) -> anyhow::Result<()> {
1216 WRITE_BACKOFF
1217 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1218 let mut tx = self.db.write().await?;
1219
1220 prune_to_view(
1222 &mut tx,
1223 cur_view.u64().saturating_sub(self.gc_opt.target_retention),
1224 )
1225 .await?;
1226
1227 #[cfg(feature = "embedded-db")]
1230 let usage_query = format!(
1231 "SELECT sum(pgsize) FROM dbstat WHERE name IN ({})",
1232 PRUNE_TABLES
1233 .iter()
1234 .map(|table| format!("'{table}'"))
1235 .join(",")
1236 );
1237
1238 #[cfg(not(feature = "embedded-db"))]
1239 let usage_query = {
1240 let table_sizes = PRUNE_TABLES
1241 .iter()
1242 .map(|table| format!("pg_table_size('{table}')"))
1243 .join(" + ");
1244 format!("SELECT {table_sizes}")
1245 };
1246
1247 let (usage,): (i64,) = query_as(&usage_query).fetch_one(tx.as_mut()).await?;
1248 tracing::debug!(usage, "consensus storage usage after pruning");
1249
1250 if (usage as u64) > self.gc_opt.target_usage {
1251 tracing::warn!(
1252 usage,
1253 gc_opt = ?self.gc_opt,
1254 "consensus storage is running out of space, pruning to minimum retention"
1255 );
1256 prune_to_view(
1257 &mut tx,
1258 cur_view.u64().saturating_sub(self.gc_opt.minimum_retention),
1259 )
1260 .await?;
1261 }
1262
1263 tx.commit().await
1264 })
1265 .await
1266 }
1267}
1268
1269const WRITE_RETRY_MAX: u32 = 5;
1271
1272const WRITE_BACKOFF: BackoffParams = BackoffParams::new(
1274 Duration::from_millis(10),
1275 Duration::from_millis(500),
1276 2,
1277 Ratio {
1278 numerator: 5,
1279 denominator: 10,
1280 },
1281);
1282
1283fn is_serialization_error(err: &anyhow::Error) -> bool {
1284 err.chain()
1285 .filter_map(|e| e.downcast_ref::<sqlx::Error>())
1286 .filter_map(|e| e.as_database_error())
1287 .any(|e| e.code().as_deref() == Some(PG_SERIALIZATION_FAILURE_CODE))
1288}
1289
1290const PRUNE_TABLES: &[&str] = &[
1291 "anchor_leaf2",
1292 "vid_share2",
1293 "da_proposal2",
1294 "quorum_proposals2",
1295 "quorum_certificate2",
1296];
1297
1298async fn prune_to_view(tx: &mut Transaction<Write>, view: u64) -> anyhow::Result<()> {
1299 if view == 0 {
1300 return Ok(());
1302 }
1303 tracing::debug!(view, "pruning consensus storage");
1304
1305 for table in PRUNE_TABLES {
1306 let res = query(&format!("DELETE FROM {table} WHERE view < $1"))
1307 .bind(view as i64)
1308 .execute(tx.as_mut())
1309 .await
1310 .context(format!("pruning {table}"))?;
1311 if res.rows_affected() > 0 {
1312 tracing::info!(
1313 "garbage collected {} rows from {table}",
1314 res.rows_affected()
1315 );
1316 }
1317 }
1318
1319 Ok(())
1320}
1321
1322#[async_trait]
1323impl SequencerPersistence for Persistence {
1324 async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()> {
1325 let batch_size: i64 = 1000;
1326
1327 let result = {
1328 let mut tx = self.db.read().await?;
1329 query_as::<(bool, i64)>(
1330 "SELECT completed, migrated_rows FROM epoch_migration WHERE table_name = \
1331 'reward_merkle_tree_v2_data'",
1332 )
1333 .fetch_optional(tx.as_mut())
1334 .await?
1335 };
1336
1337 let (is_completed, mut offset) = result.unwrap_or((false, 0));
1338
1339 if is_completed {
1340 tracing::info!("reward_merkle_tree_v2 migration already done");
1341 return Ok(());
1342 }
1343
1344 let max_height: Option<i64> = {
1345 let mut tx = self.db.read().await?;
1346 query_as::<(Option<i64>,)>("SELECT MAX(created) FROM reward_merkle_tree_v2")
1347 .fetch_one(tx.as_mut())
1348 .await?
1349 .0
1350 };
1351
1352 let max_height = match max_height {
1353 Some(h) => h,
1354 None => {
1355 tracing::info!("no reward data found in reward_merkle_tree_v2, skipping migration");
1356 return Ok(());
1357 },
1358 };
1359
1360 tracing::warn!(
1361 "migrating reward_merkle_tree_v2 to reward_merkle_tree_v2_data at height \
1362 {max_height}..."
1363 );
1364
1365 let mut balances: Vec<(RewardAccountV2, RewardAmount)> = Vec::new();
1366
1367 loop {
1368 let mut tx = self.db.read().await?;
1369
1370 #[cfg(not(feature = "embedded-db"))]
1371 let rows = query_as::<(serde_json::Value, serde_json::Value)>(
1372 "SELECT DISTINCT ON (idx) idx, entry
1373 FROM reward_merkle_tree_v2
1374 WHERE idx IS NOT NULL AND entry IS NOT NULL
1375 ORDER BY idx, created DESC
1376 LIMIT $1 OFFSET $2",
1377 )
1378 .bind(batch_size)
1379 .bind(offset)
1380 .fetch_all(tx.as_mut())
1381 .await
1382 .context("loading reward accounts from reward_merkle_tree_v2")?;
1383
1384 #[cfg(feature = "embedded-db")]
1385 let rows = query_as::<(serde_json::Value, serde_json::Value)>(
1386 "SELECT idx, entry FROM (
1387 SELECT idx, entry, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY created DESC) \
1388 as rn
1389 FROM reward_merkle_tree_v2
1390 WHERE idx IS NOT NULL AND entry IS NOT NULL
1391 ) sub
1392 WHERE rn = 1 ORDER BY idx
1393 LIMIT $1 OFFSET $2",
1394 )
1395 .bind(batch_size)
1396 .bind(offset)
1397 .fetch_all(tx.as_mut())
1398 .await
1399 .context("loading reward accounts from reward_merkle_tree_v2")?;
1400
1401 drop(tx);
1402
1403 if rows.is_empty() {
1404 break;
1405 }
1406
1407 let rows_count = rows.len();
1408
1409 for (idx, entry) in rows {
1410 let account: RewardAccountV2 =
1411 serde_json::from_value(idx).context("deserializing reward account")?;
1412 let balance: RewardAmount = serde_json::from_value(entry).context(format!(
1413 "deserializing reward balance for account {account}"
1414 ))?;
1415 balances.push((account, balance));
1416 }
1417
1418 offset += rows_count as i64;
1419 let mut tx = self.db.write().await?;
1420 tx.upsert(
1421 "epoch_migration",
1422 ["table_name", "completed", "migrated_rows"],
1423 ["table_name"],
1424 [("reward_merkle_tree_v2_data".to_string(), false, offset)],
1425 )
1426 .await?;
1427 tx.commit().await?;
1428
1429 tracing::info!(
1430 "reward_merkle_tree_v2 progress: rows={} offset={}",
1431 rows_count,
1432 offset
1433 );
1434
1435 if rows_count < batch_size as usize {
1436 break;
1437 }
1438 }
1439
1440 if balances.is_empty() {
1441 tracing::info!("no reward accounts found, skipping tree rebuild");
1442 return Ok(());
1443 }
1444
1445 tracing::info!(
1446 "rebuilding RewardMerkleTreeV2 from {} accounts",
1447 balances.len()
1448 );
1449
1450 let tree = RewardMerkleTreeV2::from_kv_set(REWARD_MERKLE_TREE_V2_HEIGHT, balances)
1451 .context("failed to rebuild RewardMerkleTreeV2 from balances")?;
1452
1453 let mut tx = self.db.read().await?;
1454 let header = tx
1455 .get_header(BlockId::<SeqTypes>::from(max_height as usize))
1456 .await
1457 .context(format!("header {max_height} not available"))?;
1458 drop(tx);
1459
1460 match header.reward_merkle_tree_root() {
1461 Either::Right(expected_root) => {
1462 ensure!(
1463 tree.commitment() == expected_root,
1464 "rebuilt RewardMerkleTreeV2 commitment {} does not match header commitment {} \
1465 at height {max_height}",
1466 tree.commitment(),
1467 expected_root,
1468 );
1469 },
1470 Either::Left(_) => {
1471 bail!(
1472 "header at height {max_height} has a v1 reward merkle tree root, expected v2"
1473 );
1474 },
1475 }
1476
1477 let tree_data: RewardMerkleTreeV2Data = (&tree)
1478 .try_into()
1479 .context("failed to convert RewardMerkleTreeV2 to RewardMerkleTreeV2Data")?;
1480 let serialized =
1481 bincode::serialize(&tree_data).context("failed to serialize RewardMerkleTreeV2Data")?;
1482
1483 let mut tx = self.db.write().await?;
1484 tx.upsert(
1485 "reward_merkle_tree_v2_data",
1486 ["height", "balances"],
1487 ["height"],
1488 [(max_height, serialized)],
1489 )
1490 .await?;
1491 tx.commit().await?;
1492
1493 let mut tx = self.db.write().await?;
1495 tx.upsert(
1496 "epoch_migration",
1497 ["table_name", "completed", "migrated_rows"],
1498 ["table_name"],
1499 [("reward_merkle_tree_v2_data".to_string(), true, offset)],
1500 )
1501 .await?;
1502 let truncate = if cfg!(feature = "embedded-db") {
1503 "DELETE FROM"
1504 } else {
1505 "TRUNCATE"
1506 };
1507 query(&format!("{truncate} reward_merkle_tree_v2"))
1508 .execute(tx.as_mut())
1509 .await?;
1510 query(&format!("{truncate} reward_merkle_tree"))
1511 .execute(tx.as_mut())
1512 .await?;
1513 tx.commit().await?;
1514
1515 tracing::warn!("migrated reward_merkle_tree_v2 at height {max_height}");
1516
1517 Ok(())
1518 }
1519
1520 fn into_catchup_provider(
1521 self,
1522 backoff: BackoffParams,
1523 ) -> anyhow::Result<Arc<dyn StateCatchup>> {
1524 Ok(Arc::new(SqlStateCatchup::new(Arc::new(self.db), backoff)))
1525 }
1526
1527 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
1528 tracing::info!("loading config from Postgres");
1529
1530 let Some(row) = self
1532 .db
1533 .read()
1534 .await?
1535 .fetch_optional("SELECT config FROM network_config ORDER BY id DESC LIMIT 1")
1536 .await?
1537 else {
1538 tracing::info!("config not found");
1539 return Ok(None);
1540 };
1541 let json = row.try_get("config")?;
1542
1543 let json = migrate_network_config(json).context("migration of network config failed")?;
1544 let config = serde_json::from_value(json).context("malformed config file")?;
1545
1546 Ok(Some(config))
1547 }
1548
1549 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
1550 tracing::info!("saving config to database");
1551 let json = serde_json::to_value(cfg)?;
1552
1553 WRITE_BACKOFF
1554 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1555 let mut tx = self.db.write().await?;
1556 tx.execute(
1557 query("INSERT INTO network_config (config) VALUES ($1)").bind(json.clone()),
1558 )
1559 .await?;
1560 tx.commit().await
1561 })
1562 .await
1563 }
1564
1565 async fn append_decided_leaves(
1566 &self,
1567 view: ViewNumber,
1568 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
1569 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
1570 consumer: &(impl EventConsumer + 'static),
1571 ) -> anyhow::Result<()> {
1572 let values = leaf_chain
1573 .into_iter()
1574 .map(|(info, cert)| {
1575 let mut leaf = info.leaf.clone();
1580 leaf.unfill_block_payload();
1581
1582 let view = cert.view_number().u64() as i64;
1583 let leaf_bytes = bincode::serialize(&leaf)?;
1584 let qc_bytes = bincode::serialize(cert.qc())?;
1585 let next_epoch_qc_bytes = match cert.next_epoch_qc() {
1586 Some(qc) => Some(bincode::serialize(qc)?),
1587 None => None,
1588 };
1589 Ok((view, leaf_bytes, qc_bytes, next_epoch_qc_bytes))
1590 })
1591 .collect::<anyhow::Result<Vec<_>>>()?;
1592
1593 WRITE_BACKOFF
1596 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1597 let mut tx = self.db.write().await?;
1598 tx.upsert(
1599 "anchor_leaf2",
1600 ["view", "leaf", "qc", "next_epoch_qc"],
1601 ["view"],
1602 values.clone(),
1603 )
1604 .await?;
1605 tx.commit().await
1606 })
1607 .await?;
1608
1609 if let Err(err) = self.generate_decide_events(deciding_qc, consumer).await {
1612 tracing::warn!(?view, "event processing failed: {err:#}");
1616 return Ok(());
1617 }
1618
1619 if let Err(err) = self.prune(view).await {
1622 tracing::warn!(?view, "pruning failed: {err:#}");
1623 }
1624
1625 Ok(())
1626 }
1627
1628 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1629 Ok(self
1630 .db
1631 .read()
1632 .await?
1633 .fetch_optional(query("SELECT view FROM highest_voted_view WHERE id = 0"))
1634 .await?
1635 .map(|row| {
1636 let view: i64 = row.get("view");
1637 ViewNumber::new(view as u64)
1638 }))
1639 }
1640
1641 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1642 Ok(self
1643 .db
1644 .read()
1645 .await?
1646 .fetch_optional(query("SELECT view FROM restart_view WHERE id = 0"))
1647 .await?
1648 .map(|row| {
1649 let view: i64 = row.get("view");
1650 ViewNumber::new(view as u64)
1651 }))
1652 }
1653
1654 async fn load_anchor_leaf(
1655 &self,
1656 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
1657 let Some(row) = self
1658 .db
1659 .read()
1660 .await?
1661 .fetch_optional("SELECT leaf, qc FROM anchor_leaf2 ORDER BY view DESC LIMIT 1")
1662 .await?
1663 else {
1664 return Ok(None);
1665 };
1666
1667 let leaf_bytes: Vec<u8> = row.get("leaf");
1668 let leaf2: Leaf2 = bincode::deserialize(&leaf_bytes)?;
1669
1670 let qc_bytes: Vec<u8> = row.get("qc");
1671 let qc2: QuorumCertificate2<SeqTypes> = bincode::deserialize(&qc_bytes)?;
1672
1673 Ok(Some((leaf2, qc2)))
1674 }
1675
1676 async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
1677 let mut tx = self.db.read().await?;
1678 let (view,) = query_as::<(i64,)>("SELECT coalesce(max(view), 0) FROM anchor_leaf2")
1679 .fetch_one(tx.as_mut())
1680 .await?;
1681 Ok(ViewNumber::new(view as u64))
1682 }
1683
1684 async fn load_da_proposal(
1685 &self,
1686 view: ViewNumber,
1687 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
1688 let result = self
1689 .db
1690 .read()
1691 .await?
1692 .fetch_optional(
1693 query("SELECT data FROM da_proposal2 where view = $1").bind(view.u64() as i64),
1694 )
1695 .await?;
1696
1697 result
1698 .map(|row| {
1699 let bytes: Vec<u8> = row.get("data");
1700 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1701 })
1702 .transpose()
1703 }
1704
1705 async fn load_vid_share(
1706 &self,
1707 view: ViewNumber,
1708 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
1709 let result = self
1710 .db
1711 .read()
1712 .await?
1713 .fetch_optional(
1714 query("SELECT data FROM vid_share2 where view = $1").bind(view.u64() as i64),
1715 )
1716 .await?;
1717
1718 result
1719 .map(|row| {
1720 let bytes: Vec<u8> = row.get("data");
1721 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1722 })
1723 .transpose()
1724 }
1725
1726 async fn load_quorum_proposals(
1727 &self,
1728 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
1729 {
1730 let rows = self
1731 .db
1732 .read()
1733 .await?
1734 .fetch_all("SELECT * FROM quorum_proposals2")
1735 .await?;
1736
1737 Ok(BTreeMap::from_iter(
1738 rows.into_iter()
1739 .map(|row| {
1740 let view: i64 = row.get("view");
1741 let view_number: ViewNumber = ViewNumber::new(view.try_into()?);
1742 let bytes: Vec<u8> = row.get("data");
1743 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1744 bincode::deserialize(&bytes).or_else(|error| {
1745 bincode::deserialize::<
1746 Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>,
1747 >(&bytes)
1748 .map(convert_proposal)
1749 .inspect_err(|err_v3| {
1750 tracing::warn!(
1751 ?view_number,
1752 %error,
1753 %err_v3,
1754 "ignoring malformed quorum proposal DB row"
1755 );
1756 })
1757 })?;
1758 Ok((view_number, proposal))
1759 })
1760 .collect::<anyhow::Result<Vec<_>>>()?,
1761 ))
1762 }
1763
1764 async fn load_quorum_proposal(
1765 &self,
1766 view: ViewNumber,
1767 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
1768 let mut tx = self.db.read().await?;
1769 let (data,) =
1770 query_as::<(Vec<u8>,)>("SELECT data FROM quorum_proposals2 WHERE view = $1 LIMIT 1")
1771 .bind(view.u64() as i64)
1772 .fetch_one(tx.as_mut())
1773 .await?;
1774 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1775 bincode::deserialize(&data).or_else(|error| {
1776 bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1777 &data,
1778 )
1779 .map(convert_proposal)
1780 .context(format!(
1781 "Failed to deserialize quorum proposal for view {view}. error={error}"
1782 ))
1783 })?;
1784
1785 Ok(proposal)
1786 }
1787
1788 async fn append_vid(
1789 &self,
1790 proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
1791 ) -> anyhow::Result<()> {
1792 let view = proposal.data.view_number().u64();
1793 let payload_hash = proposal.data.payload_commitment();
1794 let data_bytes = bincode::serialize(proposal).unwrap();
1795
1796 let now = Instant::now();
1797 let res = WRITE_BACKOFF
1798 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1799 let mut tx = self.db.write().await?;
1800 tx.upsert(
1801 "vid_share2",
1802 ["view", "data", "payload_hash"],
1803 ["view"],
1804 [(view as i64, data_bytes.clone(), payload_hash.to_string())],
1805 )
1806 .await?;
1807 tx.commit().await
1808 })
1809 .await;
1810 self.internal_metrics
1811 .internal_append_vid_duration
1812 .add_point(now.elapsed().as_secs_f64());
1813 res
1814 }
1815
1816 async fn append_da(
1817 &self,
1818 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1819 vid_commit: VidCommitment,
1820 ) -> anyhow::Result<()> {
1821 let data = &proposal.data;
1822 let view = data.view_number().u64();
1823 let data_bytes = bincode::serialize(proposal).unwrap();
1824
1825 let now = Instant::now();
1826 let res = WRITE_BACKOFF
1827 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1828 let mut tx = self.db.write().await?;
1829 tx.upsert(
1830 "da_proposal",
1831 ["view", "data", "payload_hash"],
1832 ["view"],
1833 [(view as i64, data_bytes.clone(), vid_commit.to_string())],
1834 )
1835 .await?;
1836 tx.commit().await
1837 })
1838 .await;
1839 self.internal_metrics
1840 .internal_append_da_duration
1841 .add_point(now.elapsed().as_secs_f64());
1842 res
1843 }
1844
1845 async fn record_action(
1846 &self,
1847 view: ViewNumber,
1848 _epoch: Option<EpochNumber>,
1849 action: HotShotAction,
1850 ) -> anyhow::Result<()> {
1851 if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
1853 return Ok(());
1854 }
1855
1856 WRITE_BACKOFF
1857 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1858 let stmt = format!(
1859 "INSERT INTO highest_voted_view (id, view) VALUES (0, $1)
1860 ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(highest_voted_view.view, \
1861 excluded.view)"
1862 );
1863
1864 let mut tx = self.db.write().await?;
1865 tx.execute(query(&stmt).bind(view.u64() as i64)).await?;
1866
1867 if matches!(action, HotShotAction::Vote) {
1868 let restart_view = view + 1;
1869 let stmt = format!(
1870 "INSERT INTO restart_view (id, view) VALUES (0, $1)
1871 ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(restart_view.view, \
1872 excluded.view)"
1873 );
1874 tx.execute(query(&stmt).bind(restart_view.u64() as i64))
1875 .await?;
1876 }
1877
1878 tx.commit().await
1879 })
1880 .await
1881 }
1882
1883 async fn append_quorum_proposal2(
1884 &self,
1885 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1886 ) -> anyhow::Result<()> {
1887 let view_number = proposal.data.view_number().u64();
1888
1889 let proposal_bytes = bincode::serialize(&proposal).context("serializing proposal")?;
1890 let leaf_hash = Committable::commit(&Leaf2::from_quorum_proposal(&proposal.data));
1891
1892 let justify_qc = proposal.data.justify_qc();
1894 let justify_qc_bytes = bincode::serialize(&justify_qc).context("serializing QC")?;
1895 let justify_qc_view = justify_qc.view_number.u64() as i64;
1896 let justify_qc_leaf_commit = justify_qc.data.leaf_commit.to_string();
1897
1898 let now = Instant::now();
1899 let res = WRITE_BACKOFF
1900 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1901 let mut tx = self.db.write().await?;
1902 tx.upsert(
1903 "quorum_proposals2",
1904 ["view", "leaf_hash", "data"],
1905 ["view"],
1906 [(
1907 view_number as i64,
1908 leaf_hash.to_string(),
1909 proposal_bytes.clone(),
1910 )],
1911 )
1912 .await?;
1913 tx.upsert(
1914 "quorum_certificate2",
1915 ["view", "leaf_hash", "data"],
1916 ["view"],
1917 [(
1918 justify_qc_view,
1919 justify_qc_leaf_commit.clone(),
1920 justify_qc_bytes.clone(),
1921 )],
1922 )
1923 .await?;
1924 tx.commit().await
1925 })
1926 .await;
1927 self.internal_metrics
1928 .internal_append_quorum2_duration
1929 .add_point(now.elapsed().as_secs_f64());
1930 res
1931 }
1932
1933 async fn append_cert2(
1934 &self,
1935 view: ViewNumber,
1936 cert2: Certificate2<SeqTypes>,
1937 ) -> anyhow::Result<()> {
1938 let data = bincode::serialize(&cert2).context("serializing cert2")?;
1939 let view_i64 = view.u64() as i64;
1940 WRITE_BACKOFF
1941 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1942 let mut tx = self.db.write().await?;
1943 tx.upsert(
1944 "decided_cert2",
1945 ["view", "data"],
1946 ["view"],
1947 [(view_i64, data.clone())],
1948 )
1949 .await?;
1950 tx.commit().await
1951 })
1952 .await
1953 }
1954
1955 async fn load_cert2(&self, view: ViewNumber) -> anyhow::Result<Option<Certificate2<SeqTypes>>> {
1956 let row = self
1957 .db
1958 .read()
1959 .await?
1960 .fetch_optional(
1961 query("SELECT data FROM decided_cert2 WHERE view = $1").bind(view.u64() as i64),
1962 )
1963 .await?;
1964 row.map(|row| {
1965 let bytes: Vec<u8> = row.get("data");
1966 bincode::deserialize::<Certificate2<SeqTypes>>(&bytes).context("deserializing cert2")
1967 })
1968 .transpose()
1969 }
1970
1971 async fn load_upgrade_certificate(
1972 &self,
1973 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
1974 let result = self
1975 .db
1976 .read()
1977 .await?
1978 .fetch_optional("SELECT * FROM upgrade_certificate where id = true")
1979 .await?;
1980
1981 result
1982 .map(|row| {
1983 let bytes: Vec<u8> = row.get("data");
1984 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1985 })
1986 .transpose()
1987 }
1988
1989 async fn store_upgrade_certificate(
1990 &self,
1991 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1992 ) -> anyhow::Result<()> {
1993 let certificate = match decided_upgrade_certificate {
1994 Some(cert) => cert,
1995 None => return Ok(()),
1996 };
1997 let upgrade_certificate_bytes =
1998 bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1999 WRITE_BACKOFF
2000 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2001 let mut tx = self.db.write().await?;
2002 tx.upsert(
2003 "upgrade_certificate",
2004 ["id", "data"],
2005 ["id"],
2006 [(true, upgrade_certificate_bytes.clone())],
2007 )
2008 .await?;
2009 tx.commit().await
2010 })
2011 .await
2012 }
2013
2014 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
2015 let batch_size: i64 = 10000;
2016 let mut tx = self.db.read().await?;
2017
2018 let (is_completed, mut offset) = query_as::<(bool, i64)>(
2023 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'anchor_leaf'",
2024 )
2025 .fetch_one(tx.as_mut())
2026 .await?;
2027
2028 if is_completed {
2029 tracing::info!("decided leaves already migrated");
2030 return Ok(());
2031 }
2032
2033 tracing::warn!("migrating decided leaves..");
2034 loop {
2035 let mut tx = self.db.read().await?;
2036 let rows = query(
2037 "SELECT view, leaf, qc FROM anchor_leaf WHERE view >= $1 ORDER BY view LIMIT $2",
2038 )
2039 .bind(offset)
2040 .bind(batch_size)
2041 .fetch_all(tx.as_mut())
2042 .await?;
2043
2044 drop(tx);
2045 if rows.is_empty() {
2046 break;
2047 }
2048 let mut values = Vec::new();
2049
2050 for row in rows.iter() {
2051 let leaf: Vec<u8> = row.try_get("leaf")?;
2052 let qc: Vec<u8> = row.try_get("qc")?;
2053 let leaf1: Leaf = bincode::deserialize(&leaf)?;
2054 let qc1: QuorumCertificate<SeqTypes> = bincode::deserialize(&qc)?;
2055 let view: i64 = row.try_get("view")?;
2056
2057 let leaf2: Leaf2 = leaf1.into();
2058 let qc2: QuorumCertificate2<SeqTypes> = qc1.to_qc2();
2059
2060 let leaf2_bytes = bincode::serialize(&leaf2)?;
2061 let qc2_bytes = bincode::serialize(&qc2)?;
2062
2063 values.push((view, leaf2_bytes, qc2_bytes));
2064 }
2065
2066 let mut query_builder: sqlx::QueryBuilder<Db> =
2067 sqlx::QueryBuilder::new("INSERT INTO anchor_leaf2 (view, leaf, qc) ");
2068
2069 offset = values.last().context("last row")?.0;
2070
2071 query_builder.push_values(values, |mut b, (view, leaf, qc)| {
2072 b.push_bind(view).push_bind(leaf).push_bind(qc);
2073 });
2074
2075 query_builder.push(" ON CONFLICT DO NOTHING");
2078
2079 let query = query_builder.build();
2080
2081 let mut tx = self.db.write().await?;
2082 query.execute(tx.as_mut()).await?;
2083
2084 tx.upsert(
2085 "epoch_migration",
2086 ["table_name", "completed", "migrated_rows"],
2087 ["table_name"],
2088 [("anchor_leaf".to_string(), false, offset)],
2089 )
2090 .await?;
2091 tx.commit().await?;
2092
2093 tracing::info!(
2094 "anchor leaf migration progress: rows={} offset={}",
2095 rows.len(),
2096 offset
2097 );
2098
2099 if rows.len() < batch_size as usize {
2100 break;
2101 }
2102 }
2103
2104 tracing::warn!("migrated decided leaves");
2105
2106 let mut tx = self.db.write().await?;
2107 tx.upsert(
2108 "epoch_migration",
2109 ["table_name", "completed", "migrated_rows"],
2110 ["table_name"],
2111 [("anchor_leaf".to_string(), true, offset)],
2112 )
2113 .await?;
2114 tx.commit().await?;
2115
2116 tracing::info!("updated epoch_migration table for anchor_leaf");
2117
2118 Ok(())
2119 }
2120
2121 async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
2122 let batch_size: i64 = 10000;
2123 let mut tx = self.db.read().await?;
2124
2125 let (is_completed, mut offset) = query_as::<(bool, i64)>(
2126 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'da_proposal'",
2127 )
2128 .fetch_one(tx.as_mut())
2129 .await?;
2130
2131 if is_completed {
2132 tracing::info!("da proposals migration already done");
2133 return Ok(());
2134 }
2135
2136 tracing::warn!("migrating da proposals..");
2137
2138 loop {
2139 let mut tx = self.db.read().await?;
2140 let rows = query(
2141 "SELECT payload_hash, data FROM da_proposal WHERE view >= $1 ORDER BY view LIMIT \
2142 $2",
2143 )
2144 .bind(offset)
2145 .bind(batch_size)
2146 .fetch_all(tx.as_mut())
2147 .await?;
2148
2149 drop(tx);
2150 if rows.is_empty() {
2151 break;
2152 }
2153 let mut values = Vec::new();
2154
2155 for row in rows.iter() {
2156 let data: Vec<u8> = row.try_get("data")?;
2157 let payload_hash: String = row.try_get("payload_hash")?;
2158
2159 let da_proposal: Proposal<SeqTypes, DaProposal<SeqTypes>> =
2160 bincode::deserialize(&data)?;
2161 let da_proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
2162 convert_proposal(da_proposal);
2163
2164 let view = da_proposal2.data.view_number.u64() as i64;
2165 let data = bincode::serialize(&da_proposal2)?;
2166
2167 values.push((view, payload_hash, data));
2168 }
2169
2170 let mut query_builder: sqlx::QueryBuilder<Db> =
2171 sqlx::QueryBuilder::new("INSERT INTO da_proposal2 (view, payload_hash, data) ");
2172
2173 offset = values.last().context("last row")?.0;
2174 query_builder.push_values(values, |mut b, (view, payload_hash, data)| {
2175 b.push_bind(view).push_bind(payload_hash).push_bind(data);
2176 });
2177 query_builder.push(" ON CONFLICT DO NOTHING");
2178 let query = query_builder.build();
2179
2180 let mut tx = self.db.write().await?;
2181 query.execute(tx.as_mut()).await?;
2182
2183 tx.upsert(
2184 "epoch_migration",
2185 ["table_name", "completed", "migrated_rows"],
2186 ["table_name"],
2187 [("da_proposal".to_string(), false, offset)],
2188 )
2189 .await?;
2190 tx.commit().await?;
2191
2192 tracing::info!(
2193 "DA proposals migration progress: rows={} offset={}",
2194 rows.len(),
2195 offset
2196 );
2197 if rows.len() < batch_size as usize {
2198 break;
2199 }
2200 }
2201
2202 tracing::warn!("migrated da proposals");
2203
2204 let mut tx = self.db.write().await?;
2205 tx.upsert(
2206 "epoch_migration",
2207 ["table_name", "completed", "migrated_rows"],
2208 ["table_name"],
2209 [("da_proposal".to_string(), true, offset)],
2210 )
2211 .await?;
2212 tx.commit().await?;
2213
2214 tracing::info!("updated epoch_migration table for da_proposal");
2215
2216 Ok(())
2217 }
2218
2219 async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
2220 let batch_size: i64 = 10000;
2221
2222 let mut tx = self.db.read().await?;
2223
2224 let (is_completed, mut offset) = query_as::<(bool, i64)>(
2225 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'vid_share'",
2226 )
2227 .fetch_one(tx.as_mut())
2228 .await?;
2229
2230 if is_completed {
2231 tracing::info!("vid_share migration already done");
2232 return Ok(());
2233 }
2234
2235 tracing::warn!("migrating vid shares..");
2236 loop {
2237 let mut tx = self.db.read().await?;
2238 let rows = query(
2239 "SELECT payload_hash, data FROM vid_share WHERE view >= $1 ORDER BY view LIMIT $2",
2240 )
2241 .bind(offset)
2242 .bind(batch_size)
2243 .fetch_all(tx.as_mut())
2244 .await?;
2245
2246 drop(tx);
2247 if rows.is_empty() {
2248 break;
2249 }
2250 let mut values = Vec::new();
2251
2252 for row in rows.iter() {
2253 let data: Vec<u8> = row.try_get("data")?;
2254 let payload_hash: String = row.try_get("payload_hash")?;
2255
2256 let vid_share: Proposal<SeqTypes, VidDisperseShare0<SeqTypes>> =
2257 bincode::deserialize(&data)?;
2258 let vid_share2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
2259 convert_proposal(vid_share);
2260
2261 let view = vid_share2.data.view_number().u64() as i64;
2262 let data = bincode::serialize(&vid_share2)?;
2263
2264 values.push((view, payload_hash, data));
2265 }
2266
2267 let mut query_builder: sqlx::QueryBuilder<Db> =
2268 sqlx::QueryBuilder::new("INSERT INTO vid_share2 (view, payload_hash, data) ");
2269
2270 offset = values.last().context("last row")?.0;
2271
2272 query_builder.push_values(values, |mut b, (view, payload_hash, data)| {
2273 b.push_bind(view).push_bind(payload_hash).push_bind(data);
2274 });
2275
2276 let query = query_builder.build();
2277
2278 let mut tx = self.db.write().await?;
2279 query.execute(tx.as_mut()).await?;
2280
2281 tx.upsert(
2282 "epoch_migration",
2283 ["table_name", "completed", "migrated_rows"],
2284 ["table_name"],
2285 [("vid_share".to_string(), false, offset)],
2286 )
2287 .await?;
2288 tx.commit().await?;
2289
2290 tracing::info!(
2291 "VID shares migration progress: rows={} offset={}",
2292 rows.len(),
2293 offset
2294 );
2295 if rows.len() < batch_size as usize {
2296 break;
2297 }
2298 }
2299
2300 tracing::warn!("migrated vid shares");
2301
2302 let mut tx = self.db.write().await?;
2303 tx.upsert(
2304 "epoch_migration",
2305 ["table_name", "completed", "migrated_rows"],
2306 ["table_name"],
2307 [("vid_share".to_string(), true, offset)],
2308 )
2309 .await?;
2310 tx.commit().await?;
2311
2312 tracing::info!("updated epoch_migration table for vid_share");
2313
2314 Ok(())
2315 }
2316
2317 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
2318 let batch_size: i64 = 10000;
2319 let mut tx = self.db.read().await?;
2320
2321 let (is_completed, mut offset) = query_as::<(bool, i64)>(
2322 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
2323 'quorum_proposals'",
2324 )
2325 .fetch_one(tx.as_mut())
2326 .await?;
2327
2328 if is_completed {
2329 tracing::info!("quorum proposals migration already done");
2330 return Ok(());
2331 }
2332
2333 tracing::warn!("migrating quorum proposals..");
2334
2335 loop {
2336 let mut tx = self.db.read().await?;
2337 let rows = query(
2338 "SELECT view, leaf_hash, data FROM quorum_proposals WHERE view >= $1 ORDER BY \
2339 view LIMIT $2",
2340 )
2341 .bind(offset)
2342 .bind(batch_size)
2343 .fetch_all(tx.as_mut())
2344 .await?;
2345
2346 drop(tx);
2347
2348 if rows.is_empty() {
2349 break;
2350 }
2351
2352 let mut values = Vec::new();
2353
2354 for row in rows.iter() {
2355 let leaf_hash: String = row.try_get("leaf_hash")?;
2356 let data: Vec<u8> = row.try_get("data")?;
2357
2358 let quorum_proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
2359 bincode::deserialize(&data)?;
2360 let quorum_proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
2361 convert_proposal(quorum_proposal);
2362
2363 let view = quorum_proposal2.data.view_number().u64() as i64;
2364 let data = bincode::serialize(&quorum_proposal2)?;
2365
2366 values.push((view, leaf_hash, data));
2367 }
2368
2369 let mut query_builder: sqlx::QueryBuilder<Db> =
2370 sqlx::QueryBuilder::new("INSERT INTO quorum_proposals2 (view, leaf_hash, data) ");
2371
2372 offset = values.last().context("last row")?.0;
2373 query_builder.push_values(values, |mut b, (view, leaf_hash, data)| {
2374 b.push_bind(view).push_bind(leaf_hash).push_bind(data);
2375 });
2376
2377 query_builder.push(" ON CONFLICT DO NOTHING");
2378
2379 let query = query_builder.build();
2380
2381 let mut tx = self.db.write().await?;
2382 query.execute(tx.as_mut()).await?;
2383
2384 tx.upsert(
2385 "epoch_migration",
2386 ["table_name", "completed", "migrated_rows"],
2387 ["table_name"],
2388 [("quorum_proposals".to_string(), false, offset)],
2389 )
2390 .await?;
2391 tx.commit().await?;
2392
2393 tracing::info!(
2394 "quorum proposals migration progress: rows={} offset={}",
2395 rows.len(),
2396 offset
2397 );
2398
2399 if rows.len() < batch_size as usize {
2400 break;
2401 }
2402 }
2403
2404 tracing::warn!("migrated quorum proposals");
2405
2406 let mut tx = self.db.write().await?;
2407 tx.upsert(
2408 "epoch_migration",
2409 ["table_name", "completed", "migrated_rows"],
2410 ["table_name"],
2411 [("quorum_proposals".to_string(), true, offset)],
2412 )
2413 .await?;
2414 tx.commit().await?;
2415
2416 tracing::info!("updated epoch_migration table for quorum_proposals");
2417
2418 Ok(())
2419 }
2420
2421 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
2422 let batch_size: i64 = 10000;
2423 let mut tx = self.db.read().await?;
2424
2425 let (is_completed, mut offset) = query_as::<(bool, i64)>(
2426 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
2427 'quorum_certificate'",
2428 )
2429 .fetch_one(tx.as_mut())
2430 .await?;
2431
2432 if is_completed {
2433 tracing::info!("quorum certificates migration already done");
2434 return Ok(());
2435 }
2436
2437 tracing::warn!("migrating quorum certificates..");
2438 loop {
2439 let mut tx = self.db.read().await?;
2440 let rows = query(
2441 "SELECT view, leaf_hash, data FROM quorum_certificate WHERE view >= $1 ORDER BY \
2442 view LIMIT $2",
2443 )
2444 .bind(offset)
2445 .bind(batch_size)
2446 .fetch_all(tx.as_mut())
2447 .await?;
2448
2449 drop(tx);
2450 if rows.is_empty() {
2451 break;
2452 }
2453 let mut values = Vec::new();
2454
2455 for row in rows.iter() {
2456 let leaf_hash: String = row.try_get("leaf_hash")?;
2457 let data: Vec<u8> = row.try_get("data")?;
2458
2459 let qc: QuorumCertificate<SeqTypes> = bincode::deserialize(&data)?;
2460 let qc2: QuorumCertificate2<SeqTypes> = qc.to_qc2();
2461
2462 let view = qc2.view_number().u64() as i64;
2463 let data = bincode::serialize(&qc2)?;
2464
2465 values.push((view, leaf_hash, data));
2466 }
2467
2468 let mut query_builder: sqlx::QueryBuilder<Db> =
2469 sqlx::QueryBuilder::new("INSERT INTO quorum_certificate2 (view, leaf_hash, data) ");
2470
2471 offset = values.last().context("last row")?.0;
2472
2473 query_builder.push_values(values, |mut b, (view, leaf_hash, data)| {
2474 b.push_bind(view).push_bind(leaf_hash).push_bind(data);
2475 });
2476
2477 query_builder.push(" ON CONFLICT DO NOTHING");
2478 let query = query_builder.build();
2479
2480 let mut tx = self.db.write().await?;
2481 query.execute(tx.as_mut()).await?;
2482
2483 tx.upsert(
2484 "epoch_migration",
2485 ["table_name", "completed", "migrated_rows"],
2486 ["table_name"],
2487 [("quorum_certificate".to_string(), false, offset)],
2488 )
2489 .await?;
2490 tx.commit().await?;
2491
2492 tracing::info!(
2493 "Quorum certificates migration progress: rows={} offset={}",
2494 rows.len(),
2495 offset
2496 );
2497
2498 if rows.len() < batch_size as usize {
2499 break;
2500 }
2501 }
2502
2503 tracing::warn!("migrated quorum certificates");
2504
2505 let mut tx = self.db.write().await?;
2506 tx.upsert(
2507 "epoch_migration",
2508 ["table_name", "completed", "migrated_rows"],
2509 ["table_name"],
2510 [("quorum_certificate".to_string(), true, offset)],
2511 )
2512 .await?;
2513 tx.commit().await?;
2514 tracing::info!("updated epoch_migration table for quorum_certificate");
2515
2516 Ok(())
2517 }
2518
2519 async fn migrate_x25519_keys(&self) -> anyhow::Result<()> {
2524 use super::RegisteredValidatorNoX25519;
2525
2526 let name = DataMigration::X25519Keys.as_str();
2527
2528 if !self
2530 .is_migration_complete(name, "epoch_drb_and_root")
2531 .await?
2532 {
2533 let rows: Vec<(i64, Vec<u8>)> = {
2534 let mut tx = self.db.read().await?;
2535 query_as("SELECT epoch, stake FROM epoch_drb_and_root WHERE stake IS NOT NULL")
2536 .fetch_all(tx.as_mut())
2537 .await?
2538 };
2539
2540 let num_rows = rows.len();
2541 let mut tx = self.db.write().await?;
2542 for (epoch, stake_bytes) in rows {
2543 if bincode::deserialize::<AuthenticatedValidatorMap>(&stake_bytes).is_ok() {
2545 continue;
2546 }
2547
2548 let old_validators: IndexMap<Address, RegisteredValidatorNoX25519> =
2550 bincode::deserialize(&stake_bytes)
2551 .context("deserializing legacy stake table")?;
2552 let validators: AuthenticatedValidatorMap = old_validators
2553 .into_iter()
2554 .map(|(addr, v)| {
2555 let registered = v.migrate();
2556 (
2557 addr,
2558 AuthenticatedValidator::try_from(registered)
2559 .expect("stake tables only contain authenticated validators"),
2560 )
2561 })
2562 .collect();
2563
2564 let new_bytes =
2565 bincode::serialize(&validators).context("serializing stake table")?;
2566
2567 tracing::debug!(epoch, "migrating x25519 keys in stake table");
2568 tx.execute(
2569 query("UPDATE epoch_drb_and_root SET stake = $1 WHERE epoch = $2")
2570 .bind(&new_bytes)
2571 .bind(epoch),
2572 )
2573 .await?;
2574 }
2575 Self::mark_migration_complete(&mut tx, name, "epoch_drb_and_root", num_rows).await?;
2576 tx.commit().await?;
2577 tracing::info!(
2578 num_rows,
2579 "x25519_keys migration completed for epoch_drb_and_root"
2580 );
2581 }
2582
2583 if !self
2585 .is_migration_complete(name, "stake_table_validators")
2586 .await?
2587 {
2588 let rows: Vec<(i64, String, serde_json::Value)> = {
2589 let mut tx = self.db.read().await?;
2590 query_as("SELECT epoch, address, validator FROM stake_table_validators")
2591 .fetch_all(tx.as_mut())
2592 .await?
2593 };
2594
2595 let num_rows = rows.len();
2596 let mut tx = self.db.write().await?;
2597 for (epoch, address, validator_json) in rows {
2598 if validator_json
2601 .as_object()
2602 .is_some_and(|obj| obj.contains_key("x25519_key"))
2603 {
2604 continue;
2605 }
2606
2607 let validator: RegisteredValidator<PubKey> =
2610 serde_json::from_value(validator_json).context("deserializing validator")?;
2611
2612 let new_json = serde_json::to_value(&validator).context("serializing validator")?;
2613
2614 tracing::debug!(epoch, %address, "migrating x25519 keys for validator");
2615 tx.execute(
2616 query(
2617 "UPDATE stake_table_validators SET validator = $1 WHERE epoch = $2 AND \
2618 address = $3",
2619 )
2620 .bind(&new_json)
2621 .bind(epoch)
2622 .bind(&address),
2623 )
2624 .await?;
2625 }
2626 Self::mark_migration_complete(&mut tx, name, "stake_table_validators", num_rows)
2627 .await?;
2628 tx.commit().await?;
2629 tracing::info!(
2630 num_rows,
2631 "x25519_keys migration completed for stake_table_validators"
2632 );
2633 }
2634
2635 Ok(())
2636 }
2637
2638 async fn store_next_epoch_quorum_certificate(
2639 &self,
2640 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
2641 ) -> anyhow::Result<()> {
2642 let qc2_bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
2643 WRITE_BACKOFF
2644 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2645 let mut tx = self.db.write().await?;
2646 tx.upsert(
2647 "next_epoch_quorum_certificate",
2648 ["id", "data"],
2649 ["id"],
2650 [(true, qc2_bytes.clone())],
2651 )
2652 .await?;
2653 tx.commit().await
2654 })
2655 .await
2656 }
2657
2658 async fn load_next_epoch_quorum_certificate(
2659 &self,
2660 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
2661 let result = self
2662 .db
2663 .read()
2664 .await?
2665 .fetch_optional("SELECT * FROM next_epoch_quorum_certificate where id = true")
2666 .await?;
2667
2668 result
2669 .map(|row| {
2670 let bytes: Vec<u8> = row.get("data");
2671 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
2672 })
2673 .transpose()
2674 }
2675
2676 async fn store_eqc(
2677 &self,
2678 high_qc: QuorumCertificate2<SeqTypes>,
2679 next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
2680 ) -> anyhow::Result<()> {
2681 let eqc_bytes =
2682 bincode::serialize(&(high_qc, next_epoch_high_qc)).context("serializing eqc")?;
2683 WRITE_BACKOFF
2684 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2685 let mut tx = self.db.write().await?;
2686 tx.upsert("eqc", ["id", "data"], ["id"], [(true, eqc_bytes.clone())])
2687 .await?;
2688 tx.commit().await
2689 })
2690 .await
2691 }
2692
2693 async fn load_eqc(
2694 &self,
2695 ) -> Option<(
2696 QuorumCertificate2<SeqTypes>,
2697 NextEpochQuorumCertificate2<SeqTypes>,
2698 )> {
2699 let result = self
2700 .db
2701 .read()
2702 .await
2703 .ok()?
2704 .fetch_optional("SELECT * FROM eqc where id = true")
2705 .await
2706 .ok()?;
2707
2708 result
2709 .map(|row| {
2710 let bytes: Vec<u8> = row.get("data");
2711 bincode::deserialize(&bytes)
2712 })
2713 .transpose()
2714 .ok()?
2715 }
2716
2717 async fn append_da2(
2718 &self,
2719 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
2720 vid_commit: VidCommitment,
2721 ) -> anyhow::Result<()> {
2722 let data = &proposal.data;
2723 let view = data.view_number().u64();
2724 let data_bytes = bincode::serialize(proposal).unwrap();
2725
2726 let now = Instant::now();
2727 let res = WRITE_BACKOFF
2728 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2729 let mut tx = self.db.write().await?;
2730 tx.upsert(
2731 "da_proposal2",
2732 ["view", "data", "payload_hash"],
2733 ["view"],
2734 [(view as i64, data_bytes.clone(), vid_commit.to_string())],
2735 )
2736 .await?;
2737 tx.commit().await
2738 })
2739 .await;
2740 self.internal_metrics
2741 .internal_append_da2_duration
2742 .add_point(now.elapsed().as_secs_f64());
2743 res
2744 }
2745
2746 async fn store_drb_result(
2747 &self,
2748 epoch: EpochNumber,
2749 drb_result: DrbResult,
2750 ) -> anyhow::Result<()> {
2751 let epoch_i64 = epoch.u64() as i64;
2752 let drb_result_vec = Vec::from(drb_result);
2753 WRITE_BACKOFF
2754 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2755 let mut tx = self.db.write().await?;
2756 tx.upsert(
2757 "epoch_drb_and_root",
2758 ["epoch", "drb_result"],
2759 ["epoch"],
2760 [(epoch_i64, drb_result_vec.clone())],
2761 )
2762 .await?;
2763 tx.commit().await
2764 })
2765 .await
2766 }
2767
2768 async fn store_epoch_root(
2769 &self,
2770 epoch: EpochNumber,
2771 block_header: <SeqTypes as NodeType>::BlockHeader,
2772 ) -> anyhow::Result<()> {
2773 let epoch_i64 = epoch.u64() as i64;
2774 let block_header_bytes =
2775 bincode::serialize(&block_header).context("serializing block header")?;
2776
2777 WRITE_BACKOFF
2778 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2779 let mut tx = self.db.write().await?;
2780 tx.upsert(
2781 "epoch_drb_and_root",
2782 ["epoch", "block_header"],
2783 ["epoch"],
2784 [(epoch_i64, block_header_bytes.clone())],
2785 )
2786 .await?;
2787 tx.commit().await
2788 })
2789 .await
2790 }
2791
2792 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
2793 if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
2794 if loaded_drb_input.difficulty_level != drb_input.difficulty_level {
2795 tracing::error!("Overwriting {loaded_drb_input:?} in storage with {drb_input:?}");
2796 } else if loaded_drb_input.iteration >= drb_input.iteration {
2797 anyhow::bail!(
2798 "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
2799 loaded_drb_input,
2800 drb_input
2801 )
2802 }
2803 }
2804
2805 let drb_epoch_i64 = drb_input.epoch as i64;
2806 let drb_input_bytes = bincode::serialize(&drb_input)
2807 .context("Failed to serialize DrbInput. This is not fatal, but should never happen.")?;
2808
2809 WRITE_BACKOFF
2810 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2811 let mut tx = self.db.write().await?;
2812 tx.upsert(
2813 "drb",
2814 ["epoch", "drb_input"],
2815 ["epoch"],
2816 [(drb_epoch_i64, drb_input_bytes.clone())],
2817 )
2818 .await?;
2819 tx.commit().await
2820 })
2821 .await
2822 }
2823
2824 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
2825 let row = self
2826 .db
2827 .read()
2828 .await?
2829 .fetch_optional(query("SELECT drb_input FROM drb WHERE epoch = $1").bind(epoch as i64))
2830 .await?;
2831
2832 match row {
2833 None => anyhow::bail!("No DrbInput for epoch {} in storage", epoch),
2834 Some(row) => {
2835 let drb_input_bytes: Vec<u8> = row.try_get("drb_input")?;
2836 let drb_input = bincode::deserialize(&drb_input_bytes)
2837 .context("Failed to deserialize drb_input from storage")?;
2838
2839 Ok(drb_input)
2840 },
2841 }
2842 }
2843
2844 async fn add_state_cert(
2845 &self,
2846 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
2847 ) -> anyhow::Result<()> {
2848 let view_number = state_cert.light_client_state.view_number as i64;
2849 let state_cert_bytes = bincode::serialize(&state_cert)
2850 .context("serializing light client state update certificate")?;
2851
2852 WRITE_BACKOFF
2853 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2854 let mut tx = self.db.write().await?;
2855 tx.upsert(
2856 "state_cert",
2857 ["view", "state_cert"],
2858 ["view"],
2859 [(view_number, state_cert_bytes.clone())],
2860 )
2861 .await?;
2862 tx.commit().await
2863 })
2864 .await
2865 }
2866
2867 async fn load_state_cert(
2868 &self,
2869 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
2870 let Some(row) = self
2871 .db
2872 .read()
2873 .await?
2874 .fetch_optional(
2875 "SELECT state_cert FROM finalized_state_cert ORDER BY epoch DESC LIMIT 1",
2876 )
2877 .await?
2878 else {
2879 return Ok(None);
2880 };
2881 let bytes: Vec<u8> = row.get("state_cert");
2882
2883 let cert = match bincode::deserialize(&bytes) {
2884 Ok(cert) => cert,
2885 Err(err) => {
2886 tracing::info!(
2887 error = %err,
2888 "Failed to deserialize state certificate with v2. attempting with v1"
2889 );
2890
2891 let v1_cert =
2892 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
2893 .with_context(|| {
2894 format!("Failed to deserialize using both v1 and v2. error: {err}")
2895 })?;
2896
2897 v1_cert.into()
2898 },
2899 };
2900
2901 Ok(Some(cert))
2902 }
2903
2904 async fn get_state_cert_by_epoch(
2905 &self,
2906 epoch: u64,
2907 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
2908 let Some(row) = self
2909 .db
2910 .read()
2911 .await?
2912 .fetch_optional(
2913 query("SELECT state_cert FROM finalized_state_cert WHERE epoch = $1")
2914 .bind(epoch as i64),
2915 )
2916 .await?
2917 else {
2918 return Ok(None);
2919 };
2920 let bytes: Vec<u8> = row.get("state_cert");
2921
2922 let cert = match bincode::deserialize(&bytes) {
2923 Ok(cert) => cert,
2924 Err(err) => {
2925 tracing::info!(
2926 error = %err,
2927 "Failed to deserialize state certificate with v2. attempting with v1"
2928 );
2929
2930 let v1_cert =
2931 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
2932 .with_context(|| {
2933 format!("Failed to deserialize using both v1 and v2. error: {err}")
2934 })?;
2935
2936 v1_cert.into()
2937 },
2938 };
2939
2940 Ok(Some(cert))
2941 }
2942
2943 async fn insert_state_cert(
2944 &self,
2945 epoch: u64,
2946 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
2947 ) -> anyhow::Result<()> {
2948 let epoch_i64 = epoch as i64;
2949 let bytes = bincode::serialize(&cert)
2950 .with_context(|| format!("Failed to serialize state cert for epoch {epoch}"))?;
2951
2952 WRITE_BACKOFF
2953 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2954 let mut tx = self.db.write().await?;
2955 tx.upsert(
2956 "finalized_state_cert",
2957 ["epoch", "state_cert"],
2958 ["epoch"],
2959 [(epoch_i64, bytes.clone())],
2960 )
2961 .await?;
2962 tx.commit().await
2963 })
2964 .await
2965 }
2966
2967 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
2968 let rows = self
2969 .db
2970 .read()
2971 .await?
2972 .fetch_all(
2973 query("SELECT * from epoch_drb_and_root ORDER BY epoch DESC LIMIT $1")
2974 .bind(RECENT_STAKE_TABLES_LIMIT as i64),
2975 )
2976 .await?;
2977
2978 rows.into_iter()
2980 .rev()
2981 .map(|row| {
2982 let epoch: i64 = row.try_get("epoch")?;
2983 let drb_result: Option<Vec<u8>> = row.try_get("drb_result")?;
2984 let block_header: Option<Vec<u8>> = row.try_get("block_header")?;
2985 if let Some(drb_result) = drb_result {
2986 let drb_result_array = drb_result
2987 .try_into()
2988 .or_else(|_| bail!("invalid drb result"))?;
2989 let block_header: Option<<SeqTypes as NodeType>::BlockHeader> = block_header
2990 .map(|data| bincode::deserialize(&data))
2991 .transpose()?;
2992 Ok(Some(InitializerEpochInfo::<SeqTypes> {
2993 epoch: EpochNumber::new(epoch as u64),
2994 drb_result: drb_result_array,
2995 block_header,
2996 }))
2997 } else {
2998 Ok(None)
3001 }
3002 })
3003 .filter_map(|e| match e {
3004 Err(v) => Some(Err(v)),
3005 Ok(Some(v)) => Some(Ok(v)),
3006 Ok(None) => None,
3007 })
3008 .collect()
3009 }
3010
3011 fn enable_metrics(&mut self, metrics: &dyn Metrics) {
3012 self.internal_metrics = PersistenceMetricsValue::new(metrics);
3013 }
3014}
3015
3016#[async_trait]
3017impl MembershipPersistence for Persistence {
3018 async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<StakeTuple>> {
3019 let result = self
3020 .db
3021 .read()
3022 .await?
3023 .fetch_optional(
3024 query(
3025 "SELECT stake, block_reward, stake_table_hash FROM epoch_drb_and_root WHERE \
3026 epoch = $1",
3027 )
3028 .bind(epoch.u64() as i64),
3029 )
3030 .await?;
3031
3032 result
3033 .map(|row| {
3034 let stake_table_bytes: Vec<u8> = row.get("stake");
3035 let reward_bytes: Option<Vec<u8>> = row.get("block_reward");
3036 let stake_table_hash_bytes: Option<Vec<u8>> = row.get("stake_table_hash");
3037 let stake_table: AuthenticatedValidatorMap =
3038 bincode::deserialize(&stake_table_bytes)
3039 .context("deserializing stake table")?;
3040 let reward: Option<RewardAmount> = reward_bytes
3041 .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
3042 .transpose()?;
3043 let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes
3044 .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
3045 .transpose()?;
3046
3047 Ok((stake_table, reward, stake_table_hash))
3048 })
3049 .transpose()
3050 }
3051
3052 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
3053 let mut tx = self.db.read().await?;
3054
3055 let rows = match query_as::<(i64, Vec<u8>, Option<Vec<u8>>, Option<Vec<u8>>)>(
3056 "SELECT epoch, stake, block_reward, stake_table_hash FROM epoch_drb_and_root WHERE \
3057 stake is NOT NULL ORDER BY epoch DESC LIMIT $1",
3058 )
3059 .bind(limit as i64)
3060 .fetch_all(tx.as_mut())
3061 .await
3062 {
3063 Ok(bytes) => bytes,
3064 Err(err) => {
3065 tracing::error!("error loading stake tables: {err:#}");
3066 bail!("{err:#}");
3067 },
3068 };
3069
3070 let stakes: anyhow::Result<Vec<IndexedStake>> = rows
3071 .into_iter()
3072 .map(
3073 |(id, stake_bytes, reward_bytes_opt, stake_table_hash_bytes_opt)| {
3074 let stake_table: AuthenticatedValidatorMap =
3075 bincode::deserialize(&stake_bytes).context("deserializing stake table")?;
3076
3077 let block_reward: Option<RewardAmount> = reward_bytes_opt
3078 .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
3079 .transpose()?;
3080
3081 let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes_opt
3082 .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
3083 .transpose()?;
3084
3085 Ok((
3086 EpochNumber::new(id as u64),
3087 (stake_table, block_reward),
3088 stake_table_hash,
3089 ))
3090 },
3091 )
3092 .collect();
3093
3094 Ok(Some(stakes?))
3095 }
3096
3097 async fn store_stake(
3098 &self,
3099 epoch: EpochNumber,
3100 stake: AuthenticatedValidatorMap,
3101 block_reward: Option<RewardAmount>,
3102 stake_table_hash: Option<StakeTableHash>,
3103 ) -> anyhow::Result<()> {
3104 let epoch_i64 = epoch.u64() as i64;
3105 let stake_table_bytes = bincode::serialize(&stake).context("serializing stake table")?;
3106 let reward_bytes = block_reward
3107 .map(|r| bincode::serialize(&r).context("serializing block reward"))
3108 .transpose()?;
3109 let stake_table_hash_bytes = stake_table_hash
3110 .map(|h| bincode::serialize(&h).context("serializing stake table hash"))
3111 .transpose()?;
3112 WRITE_BACKOFF
3113 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
3114 let mut tx = self.db.write().await?;
3115 tx.upsert(
3116 "epoch_drb_and_root",
3117 ["epoch", "stake", "block_reward", "stake_table_hash"],
3118 ["epoch"],
3119 [(
3120 epoch_i64,
3121 stake_table_bytes.clone(),
3122 reward_bytes.clone(),
3123 stake_table_hash_bytes.clone(),
3124 )],
3125 )
3126 .await?;
3127 tx.commit().await
3128 })
3129 .await
3130 }
3131
3132 async fn store_events(
3133 &self,
3134 l1_finalized: u64,
3135 events: Vec<(EventKey, StakeTableEvent)>,
3136 ) -> anyhow::Result<()> {
3137 let l1_finalized_i64: i64 = l1_finalized.try_into()?;
3138 let serialized_events = events
3139 .into_iter()
3140 .map(|((block_number, index), event)| {
3141 Ok((
3142 i64::try_from(block_number)?,
3143 i64::try_from(index)?,
3144 serde_json::to_value(event).context("l1 event to value")?,
3145 ))
3146 })
3147 .collect::<anyhow::Result<Vec<_>>>()?;
3148
3149 WRITE_BACKOFF
3150 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
3151 let mut tx = self.db.write().await?;
3152
3153 let last_processed_l1_block = query_as::<(i64,)>(
3155 "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
3156 )
3157 .fetch_optional(tx.as_mut())
3158 .await?
3159 .map(|(l1,)| l1);
3160
3161 tracing::debug!("last l1 finalizes in database = {last_processed_l1_block:?}");
3162
3163 let serialized_events_len = serialized_events.len();
3165 if last_processed_l1_block > Some(l1_finalized_i64) {
3166 tracing::debug!(
3167 ?last_processed_l1_block,
3168 l1_finalized,
3169 serialized_events_len,
3170 "last l1 finalized stored is already higher"
3171 );
3172 return Ok(());
3173 }
3174
3175 if !serialized_events.is_empty() {
3176 let mut query_builder: sqlx::QueryBuilder<Db> = sqlx::QueryBuilder::new(
3177 "INSERT INTO stake_table_events (l1_block, log_index, event) ",
3178 );
3179
3180 query_builder.push_values(
3181 serialized_events.iter().cloned(),
3182 |mut b, (l1_block, log_index, event)| {
3183 b.push_bind(l1_block).push_bind(log_index).push_bind(event);
3184 },
3185 );
3186
3187 query_builder.push(" ON CONFLICT DO NOTHING");
3188 let query = query_builder.build();
3189
3190 query.execute(tx.as_mut()).await?;
3191 }
3192
3193 tx.upsert(
3195 "stake_table_events_l1_block",
3196 ["id", "last_l1_block"],
3197 ["id"],
3198 [(0_i32, l1_finalized_i64)],
3199 )
3200 .await?;
3201
3202 tx.commit().await?;
3203
3204 Ok(())
3205 })
3206 .await
3207 }
3208
3209 async fn load_events(
3220 &self,
3221 from_l1_block: u64,
3222 to_l1_block: u64,
3223 ) -> anyhow::Result<(
3224 Option<EventsPersistenceRead>,
3225 Vec<(EventKey, StakeTableEvent)>,
3226 )> {
3227 let mut tx = self.db.read().await?;
3228
3229 let res = query_as::<(i64,)>(
3231 "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
3232 )
3233 .fetch_optional(tx.as_mut())
3234 .await?;
3235
3236 let Some((last_processed_l1_block,)) = res else {
3237 return Ok((None, Vec::new()));
3239 };
3240
3241 let to_l1_block = to_l1_block.try_into()?;
3245 let query_l1_block = if last_processed_l1_block > to_l1_block {
3246 to_l1_block
3247 } else {
3248 last_processed_l1_block
3249 };
3250
3251 let rows = query(
3252 "SELECT l1_block, log_index, event FROM stake_table_events WHERE $1 <= l1_block AND \
3253 l1_block <= $2 ORDER BY l1_block ASC, log_index ASC",
3254 )
3255 .bind(i64::try_from(from_l1_block)?)
3256 .bind(query_l1_block)
3257 .fetch_all(tx.as_mut())
3258 .await?;
3259
3260 let events = rows
3261 .into_iter()
3262 .map(|row| {
3263 let l1_block: i64 = row.try_get("l1_block")?;
3264 let log_index: i64 = row.try_get("log_index")?;
3265 let event = serde_json::from_value(row.try_get("event")?)?;
3266
3267 Ok(((l1_block.try_into()?, log_index.try_into()?), event))
3268 })
3269 .collect::<anyhow::Result<Vec<_>>>()?;
3270
3271 if query_l1_block == to_l1_block {
3275 Ok((Some(EventsPersistenceRead::Complete), events))
3276 } else {
3277 Ok((
3278 Some(EventsPersistenceRead::UntilL1Block(
3279 query_l1_block.try_into()?,
3280 )),
3281 events,
3282 ))
3283 }
3284 }
3285
3286 async fn delete_stake_tables(&self) -> anyhow::Result<()> {
3287 WRITE_BACKOFF
3288 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
3289 let mut tx = self.db.write().await?;
3290 #[cfg(not(feature = "embedded-db"))]
3291 query(
3292 "TRUNCATE stake_table_events, stake_table_events_l1_block, \
3293 epoch_drb_and_root, stake_table_validators",
3294 )
3295 .execute(tx.as_mut())
3296 .await?;
3297 #[cfg(feature = "embedded-db")]
3298 {
3299 query("DELETE FROM stake_table_events")
3300 .execute(tx.as_mut())
3301 .await?;
3302 query("DELETE FROM stake_table_events_l1_block")
3303 .execute(tx.as_mut())
3304 .await?;
3305 query("DELETE FROM epoch_drb_and_root")
3306 .execute(tx.as_mut())
3307 .await?;
3308 query("DELETE FROM stake_table_validators")
3309 .execute(tx.as_mut())
3310 .await?;
3311 }
3312 tx.commit().await?;
3313 Ok(())
3314 })
3315 .await
3316 }
3317
3318 async fn store_all_validators(
3319 &self,
3320 epoch: EpochNumber,
3321 all_validators: RegisteredValidatorMap,
3322 ) -> anyhow::Result<()> {
3323 if all_validators.is_empty() {
3324 return Ok(());
3325 }
3326
3327 let epoch_i64 = epoch.u64() as i64;
3328 let serialized_validators = all_validators
3329 .into_iter()
3330 .map(|(address, validator)| {
3331 let validator_json =
3332 serde_json::to_value(&validator).context("serializing validator to json")?;
3333 Ok((address.to_string(), validator_json))
3334 })
3335 .collect::<anyhow::Result<Vec<_>>>()?;
3336
3337 WRITE_BACKOFF
3338 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
3339 let mut tx = self.db.write().await?;
3340
3341 let mut query_builder = QueryBuilder::new(
3342 "INSERT INTO stake_table_validators (epoch, address, validator) ",
3343 );
3344
3345 query_builder.push_values(
3346 serialized_validators.iter().cloned(),
3347 |mut b, (address, validator)| {
3348 b.push_bind(epoch_i64)
3349 .push_bind(address)
3350 .push_bind(validator);
3351 },
3352 );
3353
3354 query_builder.push(
3355 " ON CONFLICT (epoch, address) DO UPDATE SET validator = EXCLUDED.validator",
3356 );
3357
3358 let query = query_builder.build();
3359
3360 query.execute(tx.as_mut()).await?;
3361
3362 tx.commit().await?;
3363 Ok(())
3364 })
3365 .await
3366 }
3367
3368 async fn load_all_validators(
3369 &self,
3370 epoch: EpochNumber,
3371 offset: u64,
3372 limit: u64,
3373 ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>> {
3374 let mut tx = self.db.read().await?;
3375
3376 let rows = query(
3380 "SELECT address, validator
3381 FROM stake_table_validators
3382 WHERE epoch = $1
3383 ORDER BY LOWER(address) ASC
3384 LIMIT $2 OFFSET $3",
3385 )
3386 .bind(epoch.u64() as i64)
3387 .bind(limit as i64)
3388 .bind(offset as i64)
3389 .fetch_all(tx.as_mut())
3390 .await?;
3391 rows.into_iter()
3392 .map(|row| {
3393 let validator_json: serde_json::Value = row.try_get("validator")?;
3394 serde_json::from_value::<RegisteredValidator<PubKey>>(validator_json)
3395 .map_err(Into::into)
3396 })
3397 .collect()
3398 }
3399}
3400
3401#[async_trait]
3402impl DhtPersistentStorage for Persistence {
3403 async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
3409 let to_save =
3411 bincode::serialize(&records).with_context(|| "failed to serialize records")?;
3412
3413 let stmt = "INSERT INTO libp2p_dht (id, serialized_records) VALUES (0, $1) ON CONFLICT \
3415 (id) DO UPDATE SET serialized_records = $1";
3416
3417 WRITE_BACKOFF
3418 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
3419 let mut tx = self
3421 .db
3422 .write()
3423 .await
3424 .with_context(|| "failed to start an atomic DB transaction")?;
3425 tx.execute(query(stmt).bind(to_save.clone()))
3426 .await
3427 .with_context(|| "failed to execute DB query")?;
3428
3429 tx.commit().await.with_context(|| "failed to commit to DB")
3431 })
3432 .await
3433 }
3434
3435 async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
3441 let result = self
3443 .db
3444 .read()
3445 .await
3446 .with_context(|| "failed to start a DB read transaction")?
3447 .fetch_one("SELECT * FROM libp2p_dht where id = 0")
3448 .await
3449 .with_context(|| "failed to fetch from DB")?;
3450
3451 let serialied_records: Vec<u8> = result.get("serialized_records");
3453
3454 let records: Vec<SerializableRecord> = bincode::deserialize(&serialied_records)
3456 .with_context(|| "Failed to deserialize records")?;
3457
3458 Ok(records)
3459 }
3460}
3461
3462#[async_trait]
3463impl Provider<SeqTypes, VidCommonRequest> for Persistence {
3464 #[tracing::instrument(skip(self))]
3465 async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
3466 let mut tx = match self.db.read().await {
3467 Ok(tx) => tx,
3468 Err(err) => {
3469 tracing::warn!("could not open transaction: {err:#}");
3470 return None;
3471 },
3472 };
3473
3474 let bytes = match query_as::<(Vec<u8>,)>(
3475 "SELECT data FROM vid_share2 WHERE payload_hash = $1 LIMIT 1",
3476 )
3477 .bind(req.0.to_string())
3478 .fetch_optional(tx.as_mut())
3479 .await
3480 {
3481 Ok(Some((bytes,))) => bytes,
3482 Ok(None) => return None,
3483 Err(err) => {
3484 tracing::error!("error loading VID share: {err:#}");
3485 return None;
3486 },
3487 };
3488
3489 let share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
3490 match bincode::deserialize(&bytes) {
3491 Ok(share) => share,
3492 Err(err) => {
3493 tracing::warn!("error decoding VID share: {err:#}");
3494 return None;
3495 },
3496 };
3497
3498 match share.data {
3499 VidDisperseShare::V0(vid) => Some(VidCommon::V0(vid.common)),
3500 VidDisperseShare::V1(vid) => Some(VidCommon::V1(vid.common)),
3501 VidDisperseShare::V2(vid) => Some(VidCommon::V2(vid.common)),
3502 }
3503 }
3504}
3505
3506#[async_trait]
3507impl Provider<SeqTypes, PayloadRequest> for Persistence {
3508 #[tracing::instrument(skip(self))]
3509 async fn fetch(&self, req: PayloadRequest) -> Option<Payload> {
3510 let mut tx = match self.db.read().await {
3511 Ok(tx) => tx,
3512 Err(err) => {
3513 tracing::warn!("could not open transaction: {err:#}");
3514 return None;
3515 },
3516 };
3517
3518 let bytes = match query_as::<(Vec<u8>,)>(
3519 "SELECT data FROM da_proposal2 WHERE payload_hash = $1 LIMIT 1",
3520 )
3521 .bind(req.0.to_string())
3522 .fetch_optional(tx.as_mut())
3523 .await
3524 {
3525 Ok(Some((bytes,))) => bytes,
3526 Ok(None) => return None,
3527 Err(err) => {
3528 tracing::warn!("error loading DA proposal: {err:#}");
3529 return None;
3530 },
3531 };
3532
3533 let proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> = match bincode::deserialize(&bytes)
3534 {
3535 Ok(proposal) => proposal,
3536 Err(err) => {
3537 tracing::error!("error decoding DA proposal: {err:#}");
3538 return None;
3539 },
3540 };
3541
3542 Some(Payload::from_bytes(
3543 &proposal.data.encoded_transactions,
3544 &proposal.data.metadata,
3545 ))
3546 }
3547}
3548
3549#[cfg(test)]
3550mod testing {
3551 use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
3552
3553 use super::*;
3554 use crate::persistence::tests::TestablePersistence;
3555
3556 #[async_trait]
3557 impl TestablePersistence for Persistence {
3558 type Storage = Arc<TmpDb>;
3559
3560 async fn tmp_storage() -> Self::Storage {
3561 Arc::new(TmpDb::init().await)
3562 }
3563
3564 #[allow(refining_impl_trait)]
3565 fn options(db: &Self::Storage) -> Options {
3566 #[cfg(not(feature = "embedded-db"))]
3567 {
3568 PostgresOptions {
3569 port: Some(db.port()),
3570 host: Some(db.host()),
3571 user: Some("postgres".into()),
3572 password: Some("password".into()),
3573 ..Default::default()
3574 }
3575 .into()
3576 }
3577
3578 #[cfg(feature = "embedded-db")]
3579 {
3580 SqliteOptions { path: db.path() }.into()
3581 }
3582 }
3583 }
3584}
3585
3586#[cfg(test)]
3587mod test {
3588 use std::sync::atomic::{AtomicU32, Ordering};
3589
3590 use committable::{Commitment, CommitmentBoundsArkless};
3591 use espresso_types::{Header, Leaf, NodeState, ValidatedState, traits::NullEventConsumer};
3592 use futures::stream::TryStreamExt;
3593 use hotshot_example_types::node_types::TEST_VERSIONS;
3594 use hotshot_types::{
3595 data::{
3596 EpochNumber, QuorumProposal2, ns_table::parse_ns_table,
3597 vid_disperse::AvidMDisperseShare,
3598 },
3599 message::convert_proposal,
3600 simple_certificate::QuorumCertificate,
3601 simple_vote::QuorumData,
3602 traits::{
3603 EncodeBytes,
3604 block_contents::{BlockHeader, GENESIS_VID_NUM_STORAGE_NODES},
3605 signature_key::SignatureKey,
3606 },
3607 utils::EpochTransitionIndicator,
3608 vid::{
3609 advz::advz_scheme,
3610 avidm::{AvidMScheme, init_avidm_param},
3611 },
3612 };
3613 use jf_advz::VidScheme;
3614
3615 use super::*;
3616 use crate::{BLSPubKey, PubKey, persistence::tests::TestablePersistence as _};
3617
3618 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3619 async fn test_quorum_proposals_leaf_hash_migration() {
3620 let leaf: Leaf2 = Leaf::genesis(
3622 &ValidatedState::default(),
3623 &NodeState::mock(),
3624 TEST_VERSIONS.test.base,
3625 )
3626 .await
3627 .into();
3628 let privkey = BLSPubKey::generated_from_seed_indexed([0; 32], 1).1;
3629 let signature = PubKey::sign(&privkey, &[]).unwrap();
3630 let mut quorum_proposal = Proposal {
3631 data: QuorumProposal2::<SeqTypes> {
3632 epoch: None,
3633 block_header: leaf.block_header().clone(),
3634 view_number: ViewNumber::genesis(),
3635 justify_qc: QuorumCertificate::genesis(
3636 &ValidatedState::default(),
3637 &NodeState::mock(),
3638 TEST_VERSIONS.test,
3639 )
3640 .await
3641 .to_qc2(),
3642 upgrade_certificate: None,
3643 view_change_evidence: None,
3644 next_drb_result: None,
3645 next_epoch_justify_qc: None,
3646 state_cert: None,
3647 },
3648 signature,
3649 _pd: Default::default(),
3650 };
3651
3652 let qp1: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
3653 convert_proposal(quorum_proposal.clone());
3654
3655 quorum_proposal.data.view_number = ViewNumber::new(1);
3656
3657 let qp2: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
3658 convert_proposal(quorum_proposal.clone());
3659 let qps = [qp1, qp2];
3660
3661 let db = Persistence::tmp_storage().await;
3663 let persistence = Persistence::connect(&db).await;
3664 let mut tx = persistence.db.write().await.unwrap();
3665 let params = qps
3666 .iter()
3667 .map(|qp| {
3668 (
3669 qp.data.view_number.u64() as i64,
3670 bincode::serialize(&qp).unwrap(),
3671 )
3672 })
3673 .collect::<Vec<_>>();
3674 tx.upsert("quorum_proposals", ["view", "data"], ["view"], params)
3675 .await
3676 .unwrap();
3677 tx.commit().await.unwrap();
3678
3679 let persistence = Persistence::connect(&db).await;
3681 let mut tx = persistence.db.read().await.unwrap();
3682 let rows = tx
3683 .fetch("SELECT * FROM quorum_proposals ORDER BY view ASC")
3684 .try_collect::<Vec<_>>()
3685 .await
3686 .unwrap();
3687 assert_eq!(rows.len(), qps.len());
3688 for (row, qp) in rows.into_iter().zip(qps) {
3689 assert_eq!(row.get::<i64, _>("view"), qp.data.view_number.u64() as i64);
3690 assert_eq!(
3691 row.get::<Vec<u8>, _>("data"),
3692 bincode::serialize(&qp).unwrap()
3693 );
3694 assert_eq!(
3695 row.get::<String, _>("leaf_hash"),
3696 Committable::commit(&Leaf::from_quorum_proposal(&qp.data)).to_string()
3697 );
3698 }
3699 }
3700
3701 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3702 async fn test_x25519_keys_migration() {
3703 use std::collections::HashMap;
3704
3705 use crate::persistence::RegisteredValidatorNoX25519;
3706
3707 let mut validator = RegisteredValidator::mock();
3708 validator.delegators.clear();
3709 validator.stake = alloy::primitives::U256::from(1000u64);
3710
3711 let epoch = 1i64;
3712 let address = validator.account;
3713
3714 let legacy = RegisteredValidatorNoX25519 {
3716 account: validator.account,
3717 stake_table_key: validator.stake_table_key,
3718 state_ver_key: validator.state_ver_key.clone(),
3719 stake: validator.stake,
3720 commission: validator.commission,
3721 delegators: HashMap::new(),
3722 authenticated: true,
3723 };
3724
3725 let mut legacy_map: IndexMap<Address, RegisteredValidatorNoX25519> = IndexMap::new();
3727 legacy_map.insert(address, legacy);
3728 let stake_bytes = bincode::serialize(&legacy_map).unwrap();
3729
3730 let json_legacy = RegisteredValidatorNoX25519 {
3732 account: validator.account,
3733 stake_table_key: validator.stake_table_key,
3734 state_ver_key: validator.state_ver_key.clone(),
3735 stake: validator.stake,
3736 commission: validator.commission,
3737 delegators: HashMap::new(),
3738 authenticated: true,
3739 };
3740 let validator_json = serde_json::to_value(&json_legacy).unwrap();
3741
3742 let db = Persistence::tmp_storage().await;
3743 let persistence = Persistence::connect(&db).await;
3744 let mut tx = persistence.db.write().await.unwrap();
3745
3746 tx.execute(
3747 query(
3748 "INSERT INTO stake_table_validators (epoch, address, validator) VALUES ($1, $2, \
3749 $3)",
3750 )
3751 .bind(epoch)
3752 .bind(format!("{:?}", address))
3753 .bind(&validator_json),
3754 )
3755 .await
3756 .unwrap();
3757
3758 tx.execute(
3759 query("INSERT INTO epoch_drb_and_root (epoch, stake) VALUES ($1, $2)")
3760 .bind(epoch)
3761 .bind(&stake_bytes),
3762 )
3763 .await
3764 .unwrap();
3765
3766 tx.execute(query(
3768 "UPDATE data_migrations SET completed = false, migrated_rows = 0 WHERE name = \
3769 'x25519_keys'",
3770 ))
3771 .await
3772 .unwrap();
3773
3774 tx.commit().await.unwrap();
3775
3776 {
3778 let mut tx = persistence.db.read().await.unwrap();
3779 let row: (serde_json::Value,) =
3780 query_as("SELECT validator FROM stake_table_validators WHERE epoch = $1")
3781 .bind(epoch)
3782 .fetch_one(tx.as_mut())
3783 .await
3784 .unwrap();
3785 let json_obj = row.0.as_object().unwrap();
3786 assert!(!json_obj.contains_key("x25519_key"));
3787 }
3788
3789 let persistence = Persistence::connect(&db).await;
3791 persistence.migrate_storage().await.unwrap();
3792
3793 {
3795 let mut tx = persistence.db.read().await.unwrap();
3796 let row: (serde_json::Value,) =
3797 query_as("SELECT validator FROM stake_table_validators WHERE epoch = $1")
3798 .bind(epoch)
3799 .fetch_one(tx.as_mut())
3800 .await
3801 .unwrap();
3802 let json_obj = row.0.as_object().unwrap();
3803 assert!(json_obj.contains_key("x25519_key"));
3804 assert!(json_obj.get("x25519_key").unwrap().is_null());
3805 }
3806
3807 {
3809 let mut tx = persistence.db.read().await.unwrap();
3810 let row: (Vec<u8>,) = query_as("SELECT stake FROM epoch_drb_and_root WHERE epoch = $1")
3811 .bind(epoch)
3812 .fetch_one(tx.as_mut())
3813 .await
3814 .unwrap();
3815 let migrated_map: AuthenticatedValidatorMap = bincode::deserialize(&row.0).unwrap();
3816 assert!(migrated_map.contains_key(&address));
3817 let v = migrated_map.get(&address).unwrap();
3818 assert!(v.x25519_key.is_none());
3819 }
3820
3821 {
3823 let mut tx = persistence.db.read().await.unwrap();
3824 let row: (bool, i64) = query_as(
3825 "SELECT completed, migrated_rows FROM data_migrations WHERE name = 'x25519_keys' \
3826 AND table_name = 'epoch_drb_and_root'",
3827 )
3828 .fetch_one(tx.as_mut())
3829 .await
3830 .unwrap();
3831 assert!(row.0);
3832 assert_eq!(row.1, 1);
3833 }
3834 }
3835
3836 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3837 async fn test_store_all_validators_authenticated_and_unauthenticated() {
3838 use std::collections::HashMap;
3839
3840 use alloy::primitives::{Address, U256};
3841 use hotshot_types::light_client::StateVerKey;
3842 use indexmap::IndexMap;
3843
3844 let tmp = Persistence::tmp_storage().await;
3845 let storage = Persistence::connect(&tmp).await;
3846
3847 let authenticated_validator = RegisteredValidator {
3849 account: Address::random(),
3850 stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
3851 state_ver_key: StateVerKey::default(),
3852 stake: U256::from(1000),
3853 commission: 100,
3854 delegators: HashMap::new(),
3855 authenticated: true,
3856 x25519_key: None,
3857 p2p_addr: None,
3858 };
3859
3860 let unauthenticated_validator = RegisteredValidator {
3862 account: Address::random(),
3863 stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 1).0,
3864 state_ver_key: StateVerKey::default(),
3865 stake: U256::from(2000),
3866 commission: 200,
3867 delegators: HashMap::new(),
3868 authenticated: false,
3869 x25519_key: None,
3870 p2p_addr: None,
3871 };
3872
3873 let mut validators: IndexMap<Address, RegisteredValidator<BLSPubKey>> = IndexMap::new();
3874 validators.insert(
3875 authenticated_validator.account,
3876 authenticated_validator.clone(),
3877 );
3878 validators.insert(
3879 unauthenticated_validator.account,
3880 unauthenticated_validator.clone(),
3881 );
3882
3883 storage
3885 .store_all_validators(EpochNumber::new(1), validators)
3886 .await
3887 .unwrap();
3888
3889 let loaded = storage
3891 .load_all_validators(EpochNumber::new(1), 0, 100)
3892 .await
3893 .unwrap();
3894 assert_eq!(loaded.len(), 2);
3895
3896 let loaded_auth = loaded
3898 .iter()
3899 .find(|v| v.account == authenticated_validator.account)
3900 .unwrap();
3901 assert!(
3902 loaded_auth.authenticated,
3903 "authenticated validator should remain authenticated"
3904 );
3905
3906 let loaded_unauth = loaded
3907 .iter()
3908 .find(|v| v.account == unauthenticated_validator.account)
3909 .unwrap();
3910 assert!(
3911 !loaded_unauth.authenticated,
3912 "unauthenticated validator should remain unauthenticated"
3913 );
3914 }
3915
3916 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3917 async fn test_fetching_providers() {
3918 let tmp = Persistence::tmp_storage().await;
3919 let storage = Persistence::connect(&tmp).await;
3920
3921 let leaf = Leaf2::genesis(
3923 &ValidatedState::default(),
3924 &NodeState::mock(),
3925 TEST_VERSIONS.test.base,
3926 )
3927 .await;
3928 let leaf_payload = leaf.block_payload().unwrap();
3929 let leaf_payload_bytes_arc = leaf_payload.encode();
3930
3931 let avidm_param = init_avidm_param(2).unwrap();
3932 let weights = vec![1u32; 2];
3933
3934 let ns_table = parse_ns_table(
3935 leaf_payload.byte_len().as_usize(),
3936 &leaf_payload.ns_table().encode(),
3937 );
3938 let (payload_commitment, shares) =
3939 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
3940 .unwrap();
3941 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
3942 let vid_share = convert_proposal(
3943 AvidMDisperseShare::<SeqTypes> {
3944 view_number: ViewNumber::new(0),
3945 payload_commitment,
3946 share: shares[0].clone(),
3947 recipient_key: pubkey,
3948 epoch: None,
3949 target_epoch: None,
3950 common: avidm_param.clone(),
3951 }
3952 .to_proposal(&privkey)
3953 .unwrap()
3954 .clone(),
3955 );
3956
3957 let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
3958 proposal: QuorumProposal2::<SeqTypes> {
3959 block_header: leaf.block_header().clone(),
3960 view_number: leaf.view_number(),
3961 justify_qc: leaf.justify_qc(),
3962 upgrade_certificate: None,
3963 view_change_evidence: None,
3964 next_drb_result: None,
3965 next_epoch_justify_qc: None,
3966 epoch: None,
3967 state_cert: None,
3968 },
3969 };
3970 let quorum_proposal_signature =
3971 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3972 .expect("Failed to sign quorum proposal");
3973 let quorum_proposal = Proposal {
3974 data: quorum_proposal,
3975 signature: quorum_proposal_signature,
3976 _pd: Default::default(),
3977 };
3978
3979 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
3980 .expect("Failed to sign block payload");
3981 let da_proposal = Proposal {
3982 data: DaProposal2::<SeqTypes> {
3983 encoded_transactions: leaf_payload_bytes_arc,
3984 metadata: leaf_payload.ns_table().clone(),
3985 view_number: ViewNumber::new(0),
3986 epoch: None,
3987 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
3988 },
3989 signature: block_payload_signature,
3990 _pd: Default::default(),
3991 };
3992
3993 let mut next_quorum_proposal = quorum_proposal.clone();
3994 next_quorum_proposal.data.proposal.view_number += 1;
3995 next_quorum_proposal.data.proposal.justify_qc.view_number += 1;
3996 next_quorum_proposal
3997 .data
3998 .proposal
3999 .justify_qc
4000 .data
4001 .leaf_commit = Committable::commit(&leaf.clone());
4002
4003 storage
4005 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
4006 .await
4007 .unwrap();
4008 storage.append_vid(&vid_share).await.unwrap();
4009 storage
4010 .append_quorum_proposal2(&quorum_proposal)
4011 .await
4012 .unwrap();
4013
4014 storage
4016 .append_quorum_proposal2(&next_quorum_proposal)
4017 .await
4018 .unwrap();
4019
4020 assert_eq!(
4022 Some(VidCommon::V1(avidm_param)),
4023 storage
4024 .fetch(VidCommonRequest(vid_share.data.payload_commitment()))
4025 .await
4026 );
4027 assert_eq!(
4028 leaf_payload,
4029 storage
4030 .fetch(PayloadRequest(vid_share.data.payload_commitment()))
4031 .await
4032 .unwrap()
4033 );
4034 }
4035
4036 async fn test_pruning_helper(pruning_opt: ConsensusPruningOptions) {
4044 let tmp = Persistence::tmp_storage().await;
4045 let mut opt = Persistence::options(&tmp);
4046 opt.consensus_pruning = pruning_opt;
4047 let storage = opt.create().await.unwrap();
4048
4049 let data_view = ViewNumber::new(1);
4050
4051 let leaf = Leaf2::genesis(
4053 &ValidatedState::default(),
4054 &NodeState::mock(),
4055 TEST_VERSIONS.test.base,
4056 )
4057 .await;
4058 let leaf_payload = leaf.block_payload().unwrap();
4059 let leaf_payload_bytes_arc = leaf_payload.encode();
4060
4061 let avidm_param = init_avidm_param(2).unwrap();
4062 let weights = vec![1u32; 2];
4063
4064 let ns_table = parse_ns_table(
4065 leaf_payload.byte_len().as_usize(),
4066 &leaf_payload.ns_table().encode(),
4067 );
4068 let (payload_commitment, shares) =
4069 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
4070 .unwrap();
4071
4072 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
4073 let vid = convert_proposal(
4074 AvidMDisperseShare::<SeqTypes> {
4075 view_number: data_view,
4076 payload_commitment,
4077 share: shares[0].clone(),
4078 recipient_key: pubkey,
4079 epoch: None,
4080 target_epoch: None,
4081 common: avidm_param,
4082 }
4083 .to_proposal(&privkey)
4084 .unwrap()
4085 .clone(),
4086 );
4087 let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
4088 proposal: QuorumProposal2::<SeqTypes> {
4089 epoch: None,
4090 block_header: leaf.block_header().clone(),
4091 view_number: data_view,
4092 justify_qc: QuorumCertificate2::genesis(
4093 &ValidatedState::default(),
4094 &NodeState::mock(),
4095 TEST_VERSIONS.test,
4096 )
4097 .await,
4098 upgrade_certificate: None,
4099 view_change_evidence: None,
4100 next_drb_result: None,
4101 next_epoch_justify_qc: None,
4102 state_cert: None,
4103 },
4104 };
4105 let quorum_proposal_signature =
4106 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
4107 .expect("Failed to sign quorum proposal");
4108 let quorum_proposal = Proposal {
4109 data: quorum_proposal,
4110 signature: quorum_proposal_signature,
4111 _pd: Default::default(),
4112 };
4113
4114 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
4115 .expect("Failed to sign block payload");
4116 let da_proposal = Proposal {
4117 data: DaProposal2::<SeqTypes> {
4118 encoded_transactions: leaf_payload_bytes_arc.clone(),
4119 metadata: leaf_payload.ns_table().clone(),
4120 view_number: data_view,
4121 epoch: Some(EpochNumber::new(0)),
4122 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
4123 },
4124 signature: block_payload_signature,
4125 _pd: Default::default(),
4126 };
4127
4128 tracing::info!(?vid, ?da_proposal, ?quorum_proposal, "append data");
4129 storage.append_vid(&vid).await.unwrap();
4130 storage
4131 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
4132 .await
4133 .unwrap();
4134 storage
4135 .append_quorum_proposal2(&quorum_proposal)
4136 .await
4137 .unwrap();
4138
4139 tracing::info!("decide view 1");
4142 storage
4143 .append_decided_leaves(data_view + 1, [], None, &NullEventConsumer)
4144 .await
4145 .unwrap();
4146 assert_eq!(
4147 storage.load_vid_share(data_view).await.unwrap().unwrap(),
4148 vid
4149 );
4150 assert_eq!(
4151 storage.load_da_proposal(data_view).await.unwrap().unwrap(),
4152 da_proposal
4153 );
4154 assert_eq!(
4155 storage.load_quorum_proposal(data_view).await.unwrap(),
4156 quorum_proposal
4157 );
4158
4159 tracing::info!("decide view 2");
4162 storage
4163 .append_decided_leaves(data_view + 2, [], None, &NullEventConsumer)
4164 .await
4165 .unwrap();
4166 assert!(storage.load_vid_share(data_view).await.unwrap().is_none(),);
4167 assert!(storage.load_da_proposal(data_view).await.unwrap().is_none());
4168 storage.load_quorum_proposal(data_view).await.unwrap_err();
4169 }
4170
4171 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4172 async fn test_pruning_minimum_retention() {
4173 test_pruning_helper(ConsensusPruningOptions {
4174 target_usage: 0,
4177 minimum_retention: 1,
4178 target_retention: u64::MAX,
4181 })
4182 .await
4183 }
4184
4185 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4186 async fn test_pruning_target_retention() {
4187 test_pruning_helper(ConsensusPruningOptions {
4188 target_retention: 1,
4189 minimum_retention: 0,
4192 target_usage: u64::MAX,
4195 })
4196 .await
4197 }
4198
4199 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4200 async fn test_consensus_migration() {
4201 let tmp = Persistence::tmp_storage().await;
4202 let mut opt = Persistence::options(&tmp);
4203
4204 let storage = opt.create().await.unwrap();
4205
4206 let rows = 300;
4207
4208 assert!(storage.load_state_cert().await.unwrap().is_none());
4209
4210 for i in 0..rows {
4211 let view = ViewNumber::new(i);
4212 let validated_state = ValidatedState::default();
4213 let instance_state = NodeState::default();
4214
4215 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
4216 let (payload, metadata) =
4217 Payload::from_transactions([], &validated_state, &instance_state)
4218 .await
4219 .unwrap();
4220
4221 let payload_bytes = payload.encode();
4222
4223 let block_header = Header::genesis(
4224 &instance_state,
4225 payload.clone(),
4226 &metadata,
4227 TEST_VERSIONS.test.base,
4228 );
4229
4230 let null_quorum_data = QuorumData {
4231 leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
4232 };
4233
4234 let justify_qc = QuorumCertificate::new(
4235 null_quorum_data.clone(),
4236 null_quorum_data.commit(),
4237 view,
4238 None,
4239 std::marker::PhantomData,
4240 );
4241
4242 let quorum_proposal = QuorumProposal {
4243 block_header,
4244 view_number: view,
4245 justify_qc: justify_qc.clone(),
4246 upgrade_certificate: None,
4247 proposal_certificate: None,
4248 };
4249
4250 let quorum_proposal_signature =
4251 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
4252 .expect("Failed to sign quorum proposal");
4253
4254 let proposal = Proposal {
4255 data: quorum_proposal.clone(),
4256 signature: quorum_proposal_signature,
4257 _pd: std::marker::PhantomData::<SeqTypes>,
4258 };
4259
4260 let proposal_bytes = bincode::serialize(&proposal)
4261 .context("serializing proposal")
4262 .unwrap();
4263
4264 let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
4265 leaf.fill_block_payload(
4266 payload,
4267 GENESIS_VID_NUM_STORAGE_NODES,
4268 TEST_VERSIONS.test.base,
4269 )
4270 .unwrap();
4271
4272 let mut tx = storage.db.write().await.unwrap();
4273
4274 let qc_bytes = bincode::serialize(&justify_qc).unwrap();
4275 let leaf_bytes = bincode::serialize(&leaf).unwrap();
4276
4277 tx.upsert(
4278 "anchor_leaf",
4279 ["view", "leaf", "qc"],
4280 ["view"],
4281 [(i as i64, leaf_bytes, qc_bytes)],
4282 )
4283 .await
4284 .unwrap();
4285
4286 let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
4287 epoch: EpochNumber::new(i),
4288 light_client_state: Default::default(), next_stake_table_state: Default::default(), signatures: vec![], auth_root: Default::default(),
4292 };
4293 let state_cert_bytes = bincode::serialize(&state_cert).unwrap();
4295 tx.upsert(
4296 "finalized_state_cert",
4297 ["epoch", "state_cert"],
4298 ["epoch"],
4299 [(i as i64, state_cert_bytes)],
4300 )
4301 .await
4302 .unwrap();
4303
4304 tx.commit().await.unwrap();
4305
4306 let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
4307 .disperse(payload_bytes.clone())
4308 .unwrap();
4309
4310 let vid = VidDisperseShare0::<SeqTypes> {
4311 view_number: ViewNumber::new(i),
4312 payload_commitment: Default::default(),
4313 share: disperse.shares[0].clone(),
4314 common: disperse.common,
4315 recipient_key: pubkey,
4316 };
4317
4318 let (payload, metadata) =
4319 Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
4320 .await
4321 .unwrap();
4322
4323 let da = DaProposal::<SeqTypes> {
4324 encoded_transactions: payload.encode(),
4325 metadata,
4326 view_number: ViewNumber::new(i),
4327 };
4328
4329 let block_payload_signature =
4330 BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
4331
4332 let da_proposal = Proposal {
4333 data: da,
4334 signature: block_payload_signature,
4335 _pd: Default::default(),
4336 };
4337
4338 storage
4339 .append_vid(&convert_proposal(vid.to_proposal(&privkey).unwrap()))
4340 .await
4341 .unwrap();
4342 storage
4343 .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
4344 .await
4345 .unwrap();
4346
4347 let leaf_hash = Committable::commit(&leaf);
4348 let mut tx = storage.db.write().await.expect("failed to start write tx");
4349 tx.upsert(
4350 "quorum_proposals",
4351 ["view", "leaf_hash", "data"],
4352 ["view"],
4353 [(i as i64, leaf_hash.to_string(), proposal_bytes)],
4354 )
4355 .await
4356 .expect("failed to upsert quorum proposal");
4357
4358 let justify_qc = &proposal.data.justify_qc;
4359 let justify_qc_bytes = bincode::serialize(&justify_qc)
4360 .context("serializing QC")
4361 .unwrap();
4362 tx.upsert(
4363 "quorum_certificate",
4364 ["view", "leaf_hash", "data"],
4365 ["view"],
4366 [(
4367 justify_qc.view_number.u64() as i64,
4368 justify_qc.data.leaf_commit.to_string(),
4369 &justify_qc_bytes,
4370 )],
4371 )
4372 .await
4373 .expect("failed to upsert qc");
4374
4375 tx.commit().await.expect("failed to commit");
4376 }
4377
4378 storage.migrate_storage().await.unwrap();
4379
4380 let mut tx = storage.db.read().await.unwrap();
4381 let (anchor_leaf2_count,) = query_as::<(i64,)>("SELECT COUNT(*) from anchor_leaf2")
4382 .fetch_one(tx.as_mut())
4383 .await
4384 .unwrap();
4385 assert_eq!(
4386 anchor_leaf2_count, rows as i64,
4387 "anchor leaf count does not match rows",
4388 );
4389
4390 let (da_proposal_count,) = query_as::<(i64,)>("SELECT COUNT(*) from da_proposal2")
4391 .fetch_one(tx.as_mut())
4392 .await
4393 .unwrap();
4394 assert_eq!(
4395 da_proposal_count, rows as i64,
4396 "da proposal count does not match rows",
4397 );
4398
4399 let (vid_share_count,) = query_as::<(i64,)>("SELECT COUNT(*) from vid_share2")
4400 .fetch_one(tx.as_mut())
4401 .await
4402 .unwrap();
4403 assert_eq!(
4404 vid_share_count, rows as i64,
4405 "vid share count does not match rows"
4406 );
4407
4408 let (quorum_proposals_count,) =
4409 query_as::<(i64,)>("SELECT COUNT(*) from quorum_proposals2")
4410 .fetch_one(tx.as_mut())
4411 .await
4412 .unwrap();
4413 assert_eq!(
4414 quorum_proposals_count, rows as i64,
4415 "quorum proposals count does not match rows",
4416 );
4417
4418 let (quorum_certificates_count,) =
4419 query_as::<(i64,)>("SELECT COUNT(*) from quorum_certificate2")
4420 .fetch_one(tx.as_mut())
4421 .await
4422 .unwrap();
4423 assert_eq!(
4424 quorum_certificates_count, rows as i64,
4425 "quorum certificates count does not match rows",
4426 );
4427
4428 let (state_cert_count,) = query_as::<(i64,)>("SELECT COUNT(*) from finalized_state_cert")
4429 .fetch_one(tx.as_mut())
4430 .await
4431 .unwrap();
4432 assert_eq!(
4433 state_cert_count, rows as i64,
4434 "Light client state update certificates count does not match rows",
4435 );
4436 assert_eq!(
4437 storage.load_state_cert().await.unwrap().unwrap(),
4438 LightClientStateUpdateCertificateV2::<SeqTypes> {
4439 epoch: EpochNumber::new(rows - 1),
4440 light_client_state: Default::default(),
4441 next_stake_table_state: Default::default(),
4442 signatures: vec![],
4443 auth_root: Default::default(),
4444 },
4445 "Wrong light client state update certificate in the storage",
4446 );
4447
4448 storage.migrate_storage().await.unwrap();
4449 }
4450
4451 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4474 async fn test_store_events_empty() {
4475 let tmp = Persistence::tmp_storage().await;
4476 let mut opt = Persistence::options(&tmp);
4477 let storage = opt.create().await.unwrap();
4478
4479 assert_eq!(storage.load_events(0, 100).await.unwrap(), (None, vec![]));
4480
4481 for i in 1..=2 {
4483 tracing::info!(i, "update l1 height");
4484 storage.store_events(i, vec![]).await.unwrap();
4485 assert_eq!(
4486 storage.load_events(0, 100).await.unwrap(),
4487 (Some(EventsPersistenceRead::UntilL1Block(i)), vec![])
4488 );
4489 }
4490 }
4491
4492 #[derive(Debug)]
4500 struct MockDatabaseError {
4501 code: &'static str,
4502 }
4503
4504 impl std::fmt::Display for MockDatabaseError {
4505 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4506 write!(f, "mock db error (code {})", self.code)
4507 }
4508 }
4509
4510 impl std::error::Error for MockDatabaseError {}
4511
4512 impl sqlx::error::DatabaseError for MockDatabaseError {
4513 fn message(&self) -> &str {
4514 "mock db error"
4515 }
4516
4517 fn code(&self) -> Option<std::borrow::Cow<'_, str>> {
4518 Some(std::borrow::Cow::Borrowed(self.code))
4519 }
4520
4521 fn kind(&self) -> sqlx::error::ErrorKind {
4522 sqlx::error::ErrorKind::Other
4523 }
4524
4525 fn as_error(&self) -> &(dyn std::error::Error + Send + Sync + 'static) {
4526 self
4527 }
4528
4529 fn as_error_mut(&mut self) -> &mut (dyn std::error::Error + Send + Sync + 'static) {
4530 self
4531 }
4532
4533 fn into_error(self: Box<Self>) -> Box<dyn std::error::Error + Send + Sync + 'static> {
4534 self
4535 }
4536 }
4537
4538 fn mock_serialization_error() -> anyhow::Error {
4539 anyhow::Error::from(sqlx::Error::Database(Box::new(MockDatabaseError {
4540 code: "40001",
4541 })))
4542 }
4543
4544 #[test]
4545 fn test_is_serialization_error() {
4546 assert!(is_serialization_error(&mock_serialization_error()));
4548
4549 let unique_violation =
4551 anyhow::Error::from(sqlx::Error::Database(Box::new(MockDatabaseError {
4552 code: "23505",
4553 })));
4554 assert!(!is_serialization_error(&unique_violation));
4555
4556 assert!(!is_serialization_error(&anyhow::anyhow!("plain error")));
4558 }
4559
4560 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4561 async fn test_retry_if_succeeds_immediately() {
4562 let calls = Arc::new(AtomicU32::new(0));
4563 let calls_clone = calls.clone();
4564
4565 let result = WRITE_BACKOFF
4566 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || {
4567 let calls = calls_clone.clone();
4568 async move {
4569 calls.fetch_add(1, Ordering::SeqCst);
4570 Ok(())
4571 }
4572 })
4573 .await;
4574
4575 assert!(result.is_ok());
4576 assert_eq!(calls.load(Ordering::SeqCst), 1);
4577 }
4578
4579 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4580 async fn test_retry_if_retries_on_serialization_error() {
4581 let calls = Arc::new(AtomicU32::new(0));
4582 let calls_clone = calls.clone();
4583
4584 let result = WRITE_BACKOFF
4586 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || {
4587 let calls = calls_clone.clone();
4588 async move {
4589 let n = calls.fetch_add(1, Ordering::SeqCst);
4590 if n < 2 {
4591 Err(mock_serialization_error())
4592 } else {
4593 Ok(())
4594 }
4595 }
4596 })
4597 .await;
4598
4599 assert!(result.is_ok());
4600 assert_eq!(calls.load(Ordering::SeqCst), 3);
4601 }
4602
4603 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4604 async fn test_retry_if_exhausts_retries() {
4605 let calls = Arc::new(AtomicU32::new(0));
4606 let calls_clone = calls.clone();
4607
4608 let result: anyhow::Result<()> = WRITE_BACKOFF
4610 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || {
4611 let calls = calls_clone.clone();
4612 async move {
4613 calls.fetch_add(1, Ordering::SeqCst);
4614 Err(mock_serialization_error())
4615 }
4616 })
4617 .await;
4618
4619 assert!(result.is_err());
4620 assert_eq!(calls.load(Ordering::SeqCst), 6);
4622 }
4623
4624 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4625 async fn test_retry_if_no_retry_on_other_errors() {
4626 let calls = Arc::new(AtomicU32::new(0));
4627 let calls_clone = calls.clone();
4628
4629 let result: anyhow::Result<()> = WRITE_BACKOFF
4631 .retry_if(WRITE_RETRY_MAX, is_serialization_error, || {
4632 let calls = calls_clone.clone();
4633 async move {
4634 calls.fetch_add(1, Ordering::SeqCst);
4635 Err(anyhow::anyhow!("unrelated error"))
4636 }
4637 })
4638 .await;
4639
4640 assert!(result.is_err());
4641 assert_eq!(calls.load(Ordering::SeqCst), 1);
4642 }
4643}
4644
4645#[cfg(test)]
4646#[cfg(not(feature = "embedded-db"))]
4647mod postgres_tests {
4648 use espresso_types::{FeeAccount, Header, Leaf, NodeState, Transaction as Tx};
4649 use hotshot_example_types::node_types::TEST_VERSIONS;
4650 use hotshot_query_service::{
4651 availability::{BlockQueryData, LeafQueryData},
4652 data_source::storage::UpdateAvailabilityStorage,
4653 };
4654 use hotshot_types::{
4655 data::vid_commitment,
4656 simple_certificate::QuorumCertificate,
4657 traits::{
4658 EncodeBytes,
4659 block_contents::{BlockHeader, BuilderFee, GENESIS_VID_NUM_STORAGE_NODES},
4660 election::Membership,
4661 signature_key::BuilderSignatureKey,
4662 },
4663 };
4664
4665 use super::*;
4666 use crate::persistence::tests::TestablePersistence as _;
4667
4668 async fn test_postgres_read_ns_table(instance_state: NodeState) {
4669 instance_state
4670 .coordinator
4671 .membership()
4672 .set_first_epoch(EpochNumber::genesis(), Default::default());
4673
4674 let tmp = Persistence::tmp_storage().await;
4675 let mut opt = Persistence::options(&tmp);
4676 let storage = opt.create().await.unwrap();
4677
4678 let txs = [
4679 Tx::new(10001u32.into(), vec![1, 2, 3]),
4680 Tx::new(10001u32.into(), vec![4, 5, 6]),
4681 Tx::new(10009u32.into(), vec![7, 8, 9]),
4682 ];
4683
4684 let validated_state = Default::default();
4685 let justify_qc =
4686 QuorumCertificate::genesis(&validated_state, &instance_state, TEST_VERSIONS.test).await;
4687 let view_number: ViewNumber = justify_qc.view_number + 1;
4688 let parent_leaf = Leaf::genesis(&validated_state, &instance_state, TEST_VERSIONS.test.base)
4689 .await
4690 .into();
4691
4692 let (payload, ns_table) =
4693 Payload::from_transactions(txs.clone(), &validated_state, &instance_state)
4694 .await
4695 .unwrap();
4696 let payload_bytes = payload.encode();
4697 let payload_commitment = vid_commitment(
4698 &payload_bytes,
4699 &ns_table.encode(),
4700 GENESIS_VID_NUM_STORAGE_NODES,
4701 instance_state.current_version,
4702 );
4703 let builder_commitment = payload.builder_commitment(&ns_table);
4704 let (fee_account, fee_key) = FeeAccount::generated_from_seed_indexed([0; 32], 0);
4705 let fee_amount = 0;
4706 let fee_signature = FeeAccount::sign_fee(&fee_key, fee_amount, &ns_table).unwrap();
4707 let block_header = Header::new(
4708 &validated_state,
4709 &instance_state,
4710 &parent_leaf,
4711 payload_commitment,
4712 builder_commitment,
4713 ns_table,
4714 BuilderFee {
4715 fee_amount,
4716 fee_account,
4717 fee_signature,
4718 },
4719 instance_state.current_version,
4720 view_number.u64(),
4721 )
4722 .await
4723 .unwrap();
4724 let proposal = QuorumProposal {
4725 block_header: block_header.clone(),
4726 view_number,
4727 justify_qc: justify_qc.clone(),
4728 upgrade_certificate: None,
4729 proposal_certificate: None,
4730 };
4731 let leaf: Leaf2 = Leaf::from_quorum_proposal(&proposal).into();
4732 let mut qc = justify_qc.to_qc2();
4733 qc.data.leaf_commit = leaf.commit();
4734 qc.view_number = view_number;
4735
4736 let mut tx = storage.db.write().await.unwrap();
4737 tx.insert_leaf(&LeafQueryData::new(leaf, qc).unwrap())
4738 .await
4739 .unwrap();
4740 tx.insert_block(&BlockQueryData::<SeqTypes>::new(block_header, payload))
4741 .await
4742 .unwrap();
4743 tx.commit().await.unwrap();
4744
4745 let mut tx = storage.db.read().await.unwrap();
4746 let rows = query(
4747 "
4748 SELECT ns_id, read_ns_id(get_ns_table(h.data), t.ns_index) AS read_ns_id
4749 FROM header AS h
4750 JOIN transactions AS t ON t.block_height = h.height
4751 ORDER BY t.ns_index, t.position
4752 ",
4753 )
4754 .fetch_all(tx.as_mut())
4755 .await
4756 .unwrap();
4757 assert_eq!(rows.len(), txs.len());
4758 for (i, row) in rows.into_iter().enumerate() {
4759 let ns = u64::from(txs[i].namespace()) as i64;
4760 assert_eq!(row.get::<i64, _>("ns_id"), ns);
4761 assert_eq!(row.get::<i64, _>("read_ns_id"), ns);
4762 }
4763 }
4764
4765 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4766 async fn test_postgres_read_ns_table_v0_1() {
4767 test_postgres_read_ns_table(NodeState::mock()).await;
4768 }
4769
4770 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4771 async fn test_postgres_read_ns_table_v0_2() {
4772 test_postgres_read_ns_table(NodeState::mock_v2()).await;
4773 }
4774
4775 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4776 async fn test_postgres_read_ns_table_v0_3() {
4777 test_postgres_read_ns_table(NodeState::mock_v3().with_epoch_height(0)).await;
4778 }
4779
4780 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4785 async fn test_record_action_concurrent() {
4786 let tmp = Persistence::tmp_storage().await;
4787 let storage = Arc::new(Persistence::connect(&tmp).await);
4788
4789 let handles: Vec<_> = (0u64..20)
4790 .map(|i| {
4791 let storage = Arc::clone(&storage);
4792 tokio::spawn(async move {
4793 storage
4794 .record_action(ViewNumber::new(i), None, HotShotAction::Vote)
4795 .await
4796 })
4797 })
4798 .collect();
4799
4800 for handle in handles {
4801 handle.await.unwrap().unwrap();
4802 }
4803
4804 let latest = storage.load_latest_acted_view().await.unwrap();
4805 assert_eq!(latest, Some(ViewNumber::new(19)));
4806 }
4807}