1use std::{
2 collections::BTreeMap,
3 path::PathBuf,
4 str::FromStr,
5 sync::Arc,
6 time::{Duration, Instant},
7};
8
9use alloy::primitives::Address;
10use anyhow::{Context, bail, ensure};
11use async_trait::async_trait;
12use clap::Parser;
13use committable::Committable;
14use derivative::Derivative;
15use derive_more::derive::{From, Into};
16use either::Either;
17use espresso_types::{
18 AuthenticatedValidatorMap, BackoffParams, BlockMerkleTree, FeeMerkleTree, Leaf, Leaf2,
19 NetworkConfig, Payload, PubKey, RegisteredValidatorMap, StakeTableHash, parse_duration,
20 parse_size,
21 traits::{EventsPersistenceRead, MembershipPersistence, StakeTuple},
22 v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence, StateCatchup},
23 v0_3::{
24 AuthenticatedValidator, EventKey, IndexedStake, RegisteredValidator, RewardAmount,
25 StakeTableEvent,
26 },
27 v0_4::{REWARD_MERKLE_TREE_V2_HEIGHT, RewardAccountV2, RewardMerkleTreeV2},
28};
29use futures::stream::StreamExt;
30use hotshot::InitializerEpochInfo;
31use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
32 DhtPersistentStorage, SerializableRecord,
33};
34use hotshot_query_service::{
35 availability::{BlockId, LeafQueryData},
36 data_source::{
37 Transaction as _, VersionedDataSource,
38 storage::{
39 AvailabilityStorage,
40 pruning::PrunerCfg,
41 sql::{
42 Config, Db, Read, SqlStorage, StorageConnectionType, Transaction, TransactionMode,
43 Write, include_migrations, query_as, syntax_helpers::MAX_FN,
44 },
45 },
46 },
47 fetching::{
48 Provider,
49 request::{LeafRequest, PayloadRequest, VidCommonRequest},
50 },
51 merklized_state::MerklizedState,
52};
53use hotshot_types::{
54 data::{
55 DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
56 QuorumProposalWrapperLegacy, VidCommitment, VidCommon, VidDisperseShare, VidDisperseShare0,
57 },
58 drb::{DrbInput, DrbResult},
59 event::{Event, EventType, HotShotAction, LeafInfo},
60 message::{Proposal, convert_proposal},
61 simple_certificate::{
62 CertificatePair, LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
63 NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
64 },
65 traits::{
66 block_contents::{BlockHeader, BlockPayload},
67 metrics::Metrics,
68 },
69 vote::HasViewNumber,
70};
71use indexmap::IndexMap;
72use itertools::Itertools;
73use jf_merkle_tree_compat::MerkleTreeScheme;
74use sqlx::{Executor, QueryBuilder, Row, query};
75
76use crate::{
77 NodeType, RECENT_STAKE_TABLES_LIMIT, SeqTypes, ViewNumber,
78 api::RewardMerkleTreeV2Data,
79 catchup::SqlStateCatchup,
80 persistence::{migrate_network_config, persistence_metrics::PersistenceMetricsValue},
81};
82
83#[derive(Parser, Clone, Derivative)]
85#[derivative(Debug)]
86pub struct PostgresOptions {
87 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_HOST")]
89 pub(crate) host: Option<String>,
90
91 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PORT")]
93 pub(crate) port: Option<u16>,
94
95 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_DATABASE")]
97 pub(crate) database: Option<String>,
98
99 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USER")]
101 pub(crate) user: Option<String>,
102
103 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PASSWORD")]
105 #[derivative(Debug = "ignore")]
107 pub(crate) password: Option<String>,
108
109 #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USE_TLS")]
111 pub(crate) use_tls: bool,
112}
113
114impl Default for PostgresOptions {
115 fn default() -> Self {
116 Self::parse_from(std::iter::empty::<String>())
117 }
118}
119
120#[derive(Parser, Clone, Derivative, Default, From, Into)]
121#[derivative(Debug)]
122pub struct SqliteOptions {
123 #[clap(
126 long,
127 env = "ESPRESSO_SEQUENCER_STORAGE_PATH",
128 value_parser = build_sqlite_path
129 )]
130 pub(crate) path: PathBuf,
131}
132
133pub fn build_sqlite_path(path: &str) -> anyhow::Result<PathBuf> {
134 let sub_dir = PathBuf::from_str(path)?.join("sqlite");
135
136 if !sub_dir.exists() {
138 std::fs::create_dir_all(&sub_dir)
139 .with_context(|| format!("failed to create directory: {sub_dir:?}"))?;
140 }
141
142 Ok(sub_dir.join("database"))
143}
144
145#[derive(Parser, Clone, Derivative, From, Into)]
147#[derivative(Debug)]
148pub struct Options {
149 #[cfg(not(feature = "embedded-db"))]
150 #[clap(flatten)]
151 pub(crate) postgres_options: PostgresOptions,
152
153 #[cfg(feature = "embedded-db")]
154 #[clap(flatten)]
155 pub(crate) sqlite_options: SqliteOptions,
156
157 #[derivative(Debug = "ignore")]
170 pub(crate) uri: Option<String>,
171
172 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_PRUNE")]
181 pub(crate) prune: bool,
182
183 #[clap(flatten)]
185 pub(crate) pruning: PruningOptions,
186
187 #[clap(flatten)]
189 pub(crate) consensus_pruning: ConsensusPruningOptions,
190
191 #[clap(long, env = "ESPRESSO_SEQUENCER_FETCH_RATE_LIMIT")]
193 pub(crate) fetch_rate_limit: Option<usize>,
194
195 #[clap(long, env = "ESPRESSO_SEQUENCER_ACTIVE_FETCH_DELAY", value_parser = parse_duration)]
197 pub(crate) active_fetch_delay: Option<Duration>,
198
199 #[clap(long, env = "ESPRESSO_SEQUENCER_CHUNK_FETCH_DELAY", value_parser = parse_duration)]
201 pub(crate) chunk_fetch_delay: Option<Duration>,
202
203 #[clap(long, env = "ESPRESSO_SEQUENCER_SYNC_STATUS_CHUNK_SIZE")]
206 pub(crate) sync_status_chunk_size: Option<usize>,
207
208 #[clap(long, env = "ESPRESSO_SEQUENCER_SYNC_STATUS_TTL", value_parser = parse_duration)]
210 pub(crate) sync_status_ttl: Option<Duration>,
211
212 #[clap(long, env = "ESPRESSO_SEQUENCER_PROACTIVE_SCAN_CHUNK_SIZE")]
214 pub(crate) proactive_scan_chunk_size: Option<usize>,
215
216 #[clap(long, env = "ESPRESSO_SEQUENCER_PROACTIVE_SCAN_INTERVAL", value_parser = parse_duration)]
218 pub(crate) proactive_scan_interval: Option<Duration>,
219
220 #[clap(long, env = "ESPRESSO_SEQUENCER_DISABLE_PROACTIVE_FETCHING")]
222 pub(crate) disable_proactive_fetching: bool,
223
224 #[clap(long, env = "ESPRESSO_SEQUENCER_ARCHIVE", conflicts_with = "prune")]
231 pub(crate) archive: bool,
232
233 #[clap(
235 long,
236 env = "ESPRESSO_SEQUENCER_LIGHTWEIGHT",
237 default_value_t = false,
238 conflicts_with = "archive"
239 )]
240 pub(crate) lightweight: bool,
241
242 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_IDLE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "10m")]
247 pub(crate) idle_connection_timeout: Duration,
248
249 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "30m")]
256 pub(crate) connection_timeout: Duration,
257
258 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_SLOW_STATEMENT_THRESHOLD", value_parser = parse_duration, default_value = "1s")]
259 pub(crate) slow_statement_threshold: Duration,
260
261 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_STATEMENT_TIMEOUT", value_parser = parse_duration, default_value = "10m")]
266 pub(crate) statement_timeout: Duration,
267
268 #[clap(
274 long,
275 env = "ESPRESSO_SEQUENCER_DATABASE_MIN_CONNECTIONS",
276 default_value = "0"
277 )]
278 pub(crate) min_connections: u32,
279
280 #[cfg(not(feature = "embedded-db"))]
283 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_QUERY_MIN_CONNECTIONS", default_value = None)]
284 pub(crate) query_min_connections: Option<u32>,
285
286 #[clap(
291 long,
292 env = "ESPRESSO_SEQUENCER_DATABASE_MAX_CONNECTIONS",
293 default_value = "25"
294 )]
295 pub(crate) max_connections: u32,
296
297 #[cfg(not(feature = "embedded-db"))]
300 #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_QUERY_MAX_CONNECTIONS", default_value = None)]
301 pub(crate) query_max_connections: Option<u32>,
302
303 #[clap(skip)]
311 pub(crate) pool: Option<sqlx::Pool<Db>>,
312}
313
314impl Default for Options {
315 fn default() -> Self {
316 Self::parse_from(std::iter::empty::<String>())
317 }
318}
319
320#[cfg(not(feature = "embedded-db"))]
321impl From<PostgresOptions> for Config {
322 fn from(opt: PostgresOptions) -> Self {
323 let mut cfg = Config::default();
324
325 if let Some(host) = opt.host {
326 cfg = cfg.host(host);
327 }
328
329 if let Some(port) = opt.port {
330 cfg = cfg.port(port);
331 }
332
333 if let Some(database) = &opt.database {
334 cfg = cfg.database(database);
335 }
336
337 if let Some(user) = &opt.user {
338 cfg = cfg.user(user);
339 }
340
341 if let Some(password) = &opt.password {
342 cfg = cfg.password(password);
343 }
344
345 if opt.use_tls {
346 cfg = cfg.tls();
347 }
348
349 cfg = cfg.max_connections(20);
350 cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
351 cfg = cfg.connection_timeout(Duration::from_secs(10240));
352 cfg = cfg.slow_statement_threshold(Duration::from_secs(1));
353 cfg = cfg.statement_timeout(Duration::from_secs(600)); cfg
356 }
357}
358
359#[cfg(feature = "embedded-db")]
360impl From<SqliteOptions> for Config {
361 fn from(opt: SqliteOptions) -> Self {
362 let mut cfg = Config::default();
363
364 cfg = cfg.db_path(opt.path);
365
366 cfg = cfg.max_connections(20);
367 cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
368 cfg = cfg.connection_timeout(Duration::from_secs(10240));
369 cfg = cfg.slow_statement_threshold(Duration::from_secs(2));
370 cfg = cfg.statement_timeout(Duration::from_secs(600));
371 cfg
372 }
373}
374
375#[cfg(not(feature = "embedded-db"))]
376impl From<PostgresOptions> for Options {
377 fn from(opt: PostgresOptions) -> Self {
378 Options {
379 postgres_options: opt,
380 max_connections: 20,
381 idle_connection_timeout: Duration::from_secs(120),
382 connection_timeout: Duration::from_secs(10240),
383 slow_statement_threshold: Duration::from_secs(1),
384 statement_timeout: Duration::from_secs(600),
385 ..Default::default()
386 }
387 }
388}
389
390#[cfg(feature = "embedded-db")]
391impl From<SqliteOptions> for Options {
392 fn from(opt: SqliteOptions) -> Self {
393 Options {
394 sqlite_options: opt,
395 max_connections: 5,
396 idle_connection_timeout: Duration::from_secs(120),
397 connection_timeout: Duration::from_secs(10240),
398 slow_statement_threshold: Duration::from_secs(1),
399 uri: None,
400 statement_timeout: Duration::from_secs(600),
401 prune: false,
402 pruning: Default::default(),
403 consensus_pruning: Default::default(),
404 fetch_rate_limit: None,
405 active_fetch_delay: None,
406 chunk_fetch_delay: None,
407 sync_status_chunk_size: None,
408 sync_status_ttl: None,
409 proactive_scan_chunk_size: None,
410 proactive_scan_interval: None,
411 disable_proactive_fetching: false,
412 archive: false,
413 lightweight: false,
414 min_connections: 0,
415 pool: None,
416 }
417 }
418}
419impl TryFrom<&Options> for Config {
420 type Error = anyhow::Error;
421
422 fn try_from(opt: &Options) -> Result<Self, Self::Error> {
423 let mut cfg = match &opt.uri {
424 Some(uri) => uri.parse()?,
425 None => Self::default(),
426 };
427
428 if let Some(pool) = &opt.pool {
429 cfg = cfg.pool(pool.clone());
430 }
431
432 cfg = cfg.max_connections(opt.max_connections);
433 cfg = cfg.idle_connection_timeout(opt.idle_connection_timeout);
434 cfg = cfg.min_connections(opt.min_connections);
435
436 #[cfg(not(feature = "embedded-db"))]
437 {
438 cfg =
439 cfg.query_max_connections(opt.query_max_connections.unwrap_or(opt.max_connections));
440 cfg =
441 cfg.query_min_connections(opt.query_min_connections.unwrap_or(opt.min_connections));
442 }
443
444 cfg = cfg.connection_timeout(opt.connection_timeout);
445 cfg = cfg.slow_statement_threshold(opt.slow_statement_threshold);
446 cfg = cfg.statement_timeout(opt.statement_timeout);
447
448 #[cfg(not(feature = "embedded-db"))]
449 {
450 cfg = cfg.migrations(include_migrations!(
451 "$CARGO_MANIFEST_DIR/api/migrations/postgres"
452 ));
453
454 let pg_options = &opt.postgres_options;
455
456 if let Some(host) = &pg_options.host {
457 cfg = cfg.host(host.clone());
458 }
459
460 if let Some(port) = pg_options.port {
461 cfg = cfg.port(port);
462 }
463
464 if let Some(database) = &pg_options.database {
465 cfg = cfg.database(database);
466 }
467
468 if let Some(user) = &pg_options.user {
469 cfg = cfg.user(user);
470 }
471
472 if let Some(password) = &pg_options.password {
473 cfg = cfg.password(password);
474 }
475
476 if pg_options.use_tls {
477 cfg = cfg.tls();
478 }
479 }
480
481 #[cfg(feature = "embedded-db")]
482 {
483 cfg = cfg.migrations(include_migrations!(
484 "$CARGO_MANIFEST_DIR/api/migrations/sqlite"
485 ));
486
487 cfg = cfg.db_path(opt.sqlite_options.path.clone());
488 }
489
490 if opt.prune {
491 cfg = cfg.pruner_cfg(PrunerCfg::from(opt.pruning))?;
492 }
493 if opt.archive {
494 cfg = cfg.archive();
495 }
496
497 Ok(cfg)
498 }
499}
500
501#[derive(Parser, Clone, Copy, Debug)]
503pub struct PruningOptions {
504 #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_PRUNING_THRESHOLD", value_parser = parse_size)]
508 pruning_threshold: Option<u64>,
509
510 #[clap(
513 long,
514 env = "ESPRESSO_SEQUENCER_PRUNER_MINIMUM_RETENTION",
515 value_parser = parse_duration,
516 )]
517 minimum_retention: Option<Duration>,
518
519 #[clap(
522 long,
523 env = "ESPRESSO_SEQUENCER_PRUNER_TARGET_RETENTION",
524 value_parser = parse_duration,
525 )]
526 target_retention: Option<Duration>,
527
528 #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_BATCH_SIZE")]
531 batch_size: Option<u64>,
532
533 #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_MAX_USAGE")]
539 max_usage: Option<u16>,
540
541 #[clap(
543 long,
544 env = "ESPRESSO_SEQUENCER_PRUNER_INTERVAL",
545 value_parser = parse_duration,
546 )]
547 interval: Option<Duration>,
548
549 #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_INCREMENTAL_VACUUM_PAGES")]
553 pages: Option<u64>,
554}
555
556impl Default for PruningOptions {
557 fn default() -> Self {
558 Self::parse_from(std::iter::empty::<String>())
559 }
560}
561
562impl From<PruningOptions> for PrunerCfg {
563 fn from(opt: PruningOptions) -> Self {
564 let mut cfg = PrunerCfg::new();
565 if let Some(threshold) = opt.pruning_threshold {
566 cfg = cfg.with_pruning_threshold(threshold);
567 }
568 if let Some(min) = opt.minimum_retention {
569 cfg = cfg.with_minimum_retention(min);
570 }
571 if let Some(target) = opt.target_retention {
572 cfg = cfg.with_target_retention(target);
573 }
574 if let Some(batch) = opt.batch_size {
575 cfg = cfg.with_batch_size(batch);
576 }
577 if let Some(max) = opt.max_usage {
578 cfg = cfg.with_max_usage(max);
579 }
580 if let Some(interval) = opt.interval {
581 cfg = cfg.with_interval(interval);
582 }
583
584 if let Some(pages) = opt.pages {
585 cfg = cfg.with_incremental_vacuum_pages(pages)
586 }
587
588 cfg = cfg.with_state_tables(vec![
589 BlockMerkleTree::state_type().to_string(),
590 FeeMerkleTree::state_type().to_string(),
591 ]);
592
593 cfg
594 }
595}
596
597#[derive(Parser, Clone, Copy, Debug)]
599pub struct ConsensusPruningOptions {
600 #[clap(
616 name = "TARGET_RETENTION",
617 long = "consensus-storage-target-retention",
618 env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_RETENTION",
619 default_value = "302000"
620 )]
621 target_retention: u64,
622
623 #[clap(
634 name = "MINIMUM_RETENTION",
635 long = "consensus-storage-minimum-retention",
636 env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_MINIMUM_RETENTION",
637 default_value = "130000"
638 )]
639 minimum_retention: u64,
640
641 #[clap(
646 name = "TARGET_USAGE",
647 long = "consensus-storage-target-usage",
648 env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_USAGE",
649 default_value = "1000000000"
650 )]
651 target_usage: u64,
652}
653
654impl Default for ConsensusPruningOptions {
655 fn default() -> Self {
656 Self::parse_from(std::iter::empty::<String>())
657 }
658}
659
660#[async_trait]
661impl PersistenceOptions for Options {
662 type Persistence = Persistence;
663
664 fn set_view_retention(&mut self, view_retention: u64) {
665 self.consensus_pruning.target_retention = view_retention;
666 self.consensus_pruning.minimum_retention = view_retention;
667 }
668
669 async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
670 let config = (&*self).try_into()?;
671 let persistence = Persistence {
672 db: SqlStorage::connect(config, StorageConnectionType::Sequencer).await?,
673 gc_opt: self.consensus_pruning,
674 internal_metrics: PersistenceMetricsValue::default(),
675 };
676 persistence.migrate_quorum_proposal_leaf_hashes().await?;
677 self.pool = Some(persistence.db.pool());
678 Ok(persistence)
679 }
680
681 async fn reset(self) -> anyhow::Result<()> {
682 SqlStorage::connect(
683 Config::try_from(&self)?.reset_schema(),
684 StorageConnectionType::Sequencer,
685 )
686 .await?;
687 Ok(())
688 }
689}
690
691#[derive(Debug, Clone, Copy)]
692pub enum DataMigration {
693 X25519Keys,
694}
695
696impl DataMigration {
697 pub fn as_str(&self) -> &'static str {
698 match self {
699 Self::X25519Keys => "x25519_keys",
700 }
701 }
702}
703
704#[derive(Clone, Debug)]
706pub struct Persistence {
707 db: SqlStorage,
708 gc_opt: ConsensusPruningOptions,
709 internal_metrics: PersistenceMetricsValue,
711}
712
713impl Persistence {
714 async fn migrate_quorum_proposal_leaf_hashes(&self) -> anyhow::Result<()> {
721 let mut tx = self.db.write().await?;
722
723 let mut proposals = tx.fetch("SELECT * FROM quorum_proposals");
724
725 let mut updates = vec![];
726 while let Some(row) = proposals.next().await {
727 let row = row?;
728
729 let hash: Option<String> = row.try_get("leaf_hash")?;
730 if hash.is_none() {
731 let view: i64 = row.try_get("view")?;
732 let data: Vec<u8> = row.try_get("data")?;
733 let proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
734 bincode::deserialize(&data)?;
735 let leaf = Leaf::from_quorum_proposal(&proposal.data);
736 let leaf_hash = Committable::commit(&leaf);
737 tracing::info!(view, %leaf_hash, "populating quorum proposal leaf hash");
738 updates.push((view, leaf_hash.to_string()));
739 }
740 }
741 drop(proposals);
742
743 tx.upsert("quorum_proposals", ["view", "leaf_hash"], ["view"], updates)
744 .await?;
745
746 tx.commit().await
747 }
748
749 async fn is_migration_complete(&self, name: &str, table_name: &str) -> anyhow::Result<bool> {
750 let mut tx = self.db.read().await?;
751 let (completed,): (bool,) =
752 query_as("SELECT completed FROM data_migrations WHERE name = $1 AND table_name = $2")
753 .bind(name)
754 .bind(table_name)
755 .fetch_one(tx.as_mut())
756 .await
757 .context("migration tracking row missing - schema may be out of sync")?;
758 Ok(completed)
759 }
760
761 async fn mark_migration_complete(
762 tx: &mut Transaction<Write>,
763 name: &str,
764 table_name: &str,
765 migrated_rows: usize,
766 ) -> anyhow::Result<()> {
767 tx.execute(
768 query(
769 "UPDATE data_migrations SET completed = true, migrated_rows = $1 WHERE name = $2 \
770 AND table_name = $3",
771 )
772 .bind(migrated_rows as i64)
773 .bind(name)
774 .bind(table_name),
775 )
776 .await?;
777 Ok(())
778 }
779
780 async fn generate_decide_events(
781 &self,
782 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
783 consumer: &impl EventConsumer,
784 ) -> anyhow::Result<()> {
785 let mut last_processed_view: Option<i64> = self
786 .db
787 .read()
788 .await?
789 .fetch_optional("SELECT last_processed_view FROM event_stream WHERE id = 1 LIMIT 1")
790 .await?
791 .map(|row| row.get("last_processed_view"));
792 loop {
793 let mut tx = self.db.read().await?;
800
801 let from_view = match last_processed_view {
806 Some(v) => v + 1,
807 None => 0,
808 };
809 tracing::debug!(?from_view, "generate decide event");
810
811 let mut parent = None;
812 let mut rows = query(
813 "SELECT leaf, qc, next_epoch_qc FROM anchor_leaf2 WHERE view >= $1 ORDER BY view",
814 )
815 .bind(from_view)
816 .fetch(tx.as_mut());
817 let mut leaves = vec![];
818 let mut final_qc = None;
819 while let Some(row) = rows.next().await {
820 let row = match row {
821 Ok(row) => row,
822 Err(err) => {
823 tracing::warn!("error loading row: {err:#}");
826 break;
827 },
828 };
829
830 let leaf_data: Vec<u8> = row.get("leaf");
831 let leaf = bincode::deserialize::<Leaf2>(&leaf_data)?;
832 let qc_data: Vec<u8> = row.get("qc");
833 let qc = bincode::deserialize::<QuorumCertificate2<SeqTypes>>(&qc_data)?;
834 let next_epoch_qc = match row.get::<Option<Vec<u8>>, _>("next_epoch_qc") {
835 Some(bytes) => {
836 Some(bincode::deserialize::<NextEpochQuorumCertificate2<SeqTypes>>(&bytes)?)
837 },
838 None => None,
839 };
840 let height = leaf.block_header().block_number();
841
842 if let Some(parent) = parent
846 && height != parent + 1
847 {
848 tracing::debug!(
849 height,
850 parent,
851 "ending decide event at non-consecutive leaf"
852 );
853 break;
854 }
855 parent = Some(height);
856 leaves.push(leaf);
857 final_qc = Some(CertificatePair::new(qc, next_epoch_qc));
858 }
859 drop(rows);
860
861 let Some(final_qc) = final_qc else {
862 tracing::debug!(from_view, "no new leaves at decide");
864 return Ok(());
865 };
866
867 let from_view = leaves[0].view_number();
870 let to_view = leaves[leaves.len() - 1].view_number();
871
872 let mut vid_shares = tx
874 .fetch_all(
875 query("SELECT view, data FROM vid_share2 where view >= $1 AND view <= $2")
876 .bind(from_view.u64() as i64)
877 .bind(to_view.u64() as i64),
878 )
879 .await?
880 .into_iter()
881 .map(|row| {
882 let view: i64 = row.get("view");
883 let data: Vec<u8> = row.get("data");
884 let vid_proposal = bincode::deserialize::<
885 Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
886 >(&data)?;
887 Ok((view as u64, vid_proposal.data))
888 })
889 .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
890
891 let mut da_proposals = tx
893 .fetch_all(
894 query("SELECT view, data FROM da_proposal2 where view >= $1 AND view <= $2")
895 .bind(from_view.u64() as i64)
896 .bind(to_view.u64() as i64),
897 )
898 .await?
899 .into_iter()
900 .map(|row| {
901 let view: i64 = row.get("view");
902 let data: Vec<u8> = row.get("data");
903 let da_proposal =
904 bincode::deserialize::<Proposal<SeqTypes, DaProposal2<SeqTypes>>>(&data)?;
905 Ok((view as u64, da_proposal.data))
906 })
907 .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
908
909 let state_certs = Self::load_state_certs(&mut tx, from_view, to_view)
911 .await
912 .inspect_err(|err| {
913 tracing::error!(
914 ?from_view,
915 ?to_view,
916 "failed to load state certificates. error={err:#}"
917 );
918 })?;
919 drop(tx);
920
921 let leaf_chain = leaves
923 .into_iter()
924 .rev()
926 .map(|mut leaf| {
927 let view = leaf.view_number();
928
929 let vid_share = vid_shares.remove(&view);
931 if vid_share.is_none() {
932 tracing::debug!(?view, "VID share not available at decide");
933 }
934
935 if let Some(proposal) = da_proposals.remove(&view) {
937 let payload =
938 Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata);
939 leaf.fill_block_payload_unchecked(payload);
940 } else if view == ViewNumber::genesis() {
941 leaf.fill_block_payload_unchecked(Payload::empty().0);
944 } else {
945 tracing::debug!(?view, "DA proposal not available at decide");
946 }
947
948 let state_cert = state_certs
949 .get(&view)
950 .cloned();
951
952 LeafInfo {
953 leaf,
954 vid_share,
955 state_cert,
956 state: Default::default(),
959 delta: Default::default(),
960 }
961 })
962 .collect();
963
964 {
965 tracing::debug!(
967 ?from_view,
968 ?to_view,
969 ?final_qc,
970 ?leaf_chain,
971 "generating decide event"
972 );
973 let deciding_qc = if let Some(deciding_qc) = &deciding_qc {
976 (deciding_qc.view_number() == final_qc.view_number() + 1)
977 .then_some(deciding_qc.clone())
978 } else {
979 None
980 };
981 consumer
982 .handle_event(&Event {
983 view_number: to_view,
984 event: EventType::Decide {
985 leaf_chain: Arc::new(leaf_chain),
986 committing_qc: Arc::new(final_qc),
987 deciding_qc,
988 block_size: None,
989 },
990 })
991 .await?;
992 }
993
994 let mut tx = self.db.write().await?;
995
996 tx.upsert(
1002 "event_stream",
1003 ["id", "last_processed_view"],
1004 ["id"],
1005 [(1i32, to_view.u64() as i64)],
1006 )
1007 .await?;
1008
1009 for (epoch, state_cert) in state_certs {
1011 let state_cert_bytes = bincode::serialize(&state_cert)?;
1012 tx.upsert(
1013 "finalized_state_cert",
1014 ["epoch", "state_cert"],
1015 ["epoch"],
1016 [(epoch as i64, state_cert_bytes)],
1017 )
1018 .await?;
1019 }
1020
1021 tx.execute(
1023 query("DELETE FROM vid_share2 where view >= $1 AND view <= $2")
1024 .bind(from_view.u64() as i64)
1025 .bind(to_view.u64() as i64),
1026 )
1027 .await?;
1028 tx.execute(
1029 query("DELETE FROM da_proposal2 where view >= $1 AND view <= $2")
1030 .bind(from_view.u64() as i64)
1031 .bind(to_view.u64() as i64),
1032 )
1033 .await?;
1034 tx.execute(
1035 query("DELETE FROM quorum_proposals2 where view >= $1 AND view <= $2")
1036 .bind(from_view.u64() as i64)
1037 .bind(to_view.u64() as i64),
1038 )
1039 .await?;
1040 tx.execute(
1041 query("DELETE FROM quorum_certificate2 where view >= $1 AND view <= $2")
1042 .bind(from_view.u64() as i64)
1043 .bind(to_view.u64() as i64),
1044 )
1045 .await?;
1046 tx.execute(
1047 query("DELETE FROM state_cert where view >= $1 AND view <= $2")
1048 .bind(from_view.u64() as i64)
1049 .bind(to_view.u64() as i64),
1050 )
1051 .await?;
1052
1053 tx.execute(
1057 query("DELETE FROM anchor_leaf2 WHERE view >= $1 AND view < $2")
1058 .bind(from_view.u64() as i64)
1059 .bind(to_view.u64() as i64),
1060 )
1061 .await?;
1062
1063 tx.commit().await?;
1064 last_processed_view = Some(to_view.u64() as i64);
1065 }
1066 }
1067
1068 async fn load_state_certs(
1069 tx: &mut Transaction<Read>,
1070 from_view: ViewNumber,
1071 to_view: ViewNumber,
1072 ) -> anyhow::Result<BTreeMap<u64, LightClientStateUpdateCertificateV2<SeqTypes>>> {
1073 let rows = tx
1074 .fetch_all(
1075 query("SELECT view, state_cert FROM state_cert WHERE view >= $1 AND view <= $2")
1076 .bind(from_view.u64() as i64)
1077 .bind(to_view.u64() as i64),
1078 )
1079 .await?;
1080
1081 let mut result = BTreeMap::new();
1082
1083 for row in rows {
1084 let data: Vec<u8> = row.get("state_cert");
1085
1086 let cert: LightClientStateUpdateCertificateV2<SeqTypes> = bincode::deserialize(&data)
1087 .or_else(|err_v2| {
1088 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&data)
1089 .map(Into::into)
1090 .context(format!(
1091 "Failed to deserialize LightClientStateUpdateCertificate: with v1 and v2. \
1092 error: {err_v2}"
1093 ))
1094 })?;
1095
1096 result.insert(cert.epoch.u64(), cert);
1097 }
1098
1099 Ok(result)
1100 }
1101
1102 #[tracing::instrument(skip(self))]
1103 async fn prune(&self, cur_view: ViewNumber) -> anyhow::Result<()> {
1104 let mut tx = self.db.write().await?;
1105
1106 prune_to_view(
1108 &mut tx,
1109 cur_view.u64().saturating_sub(self.gc_opt.target_retention),
1110 )
1111 .await?;
1112
1113 #[cfg(feature = "embedded-db")]
1116 let usage_query = format!(
1117 "SELECT sum(pgsize) FROM dbstat WHERE name IN ({})",
1118 PRUNE_TABLES
1119 .iter()
1120 .map(|table| format!("'{table}'"))
1121 .join(",")
1122 );
1123
1124 #[cfg(not(feature = "embedded-db"))]
1125 let usage_query = {
1126 let table_sizes = PRUNE_TABLES
1127 .iter()
1128 .map(|table| format!("pg_table_size('{table}')"))
1129 .join(" + ");
1130 format!("SELECT {table_sizes}")
1131 };
1132
1133 let (usage,): (i64,) = query_as(&usage_query).fetch_one(tx.as_mut()).await?;
1134 tracing::debug!(usage, "consensus storage usage after pruning");
1135
1136 if (usage as u64) > self.gc_opt.target_usage {
1137 tracing::warn!(
1138 usage,
1139 gc_opt = ?self.gc_opt,
1140 "consensus storage is running out of space, pruning to minimum retention"
1141 );
1142 prune_to_view(
1143 &mut tx,
1144 cur_view.u64().saturating_sub(self.gc_opt.minimum_retention),
1145 )
1146 .await?;
1147 }
1148
1149 tx.commit().await
1150 }
1151}
1152
1153const PRUNE_TABLES: &[&str] = &[
1154 "anchor_leaf2",
1155 "vid_share2",
1156 "da_proposal2",
1157 "quorum_proposals2",
1158 "quorum_certificate2",
1159];
1160
1161async fn prune_to_view(tx: &mut Transaction<Write>, view: u64) -> anyhow::Result<()> {
1162 if view == 0 {
1163 return Ok(());
1165 }
1166 tracing::debug!(view, "pruning consensus storage");
1167
1168 for table in PRUNE_TABLES {
1169 let res = query(&format!("DELETE FROM {table} WHERE view < $1"))
1170 .bind(view as i64)
1171 .execute(tx.as_mut())
1172 .await
1173 .context(format!("pruning {table}"))?;
1174 if res.rows_affected() > 0 {
1175 tracing::info!(
1176 "garbage collected {} rows from {table}",
1177 res.rows_affected()
1178 );
1179 }
1180 }
1181
1182 Ok(())
1183}
1184
1185#[async_trait]
1186impl SequencerPersistence for Persistence {
1187 async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()> {
1188 let batch_size: i64 = 1000;
1189
1190 let result = {
1191 let mut tx = self.db.read().await?;
1192 query_as::<(bool, i64)>(
1193 "SELECT completed, migrated_rows FROM epoch_migration WHERE table_name = \
1194 'reward_merkle_tree_v2_data'",
1195 )
1196 .fetch_optional(tx.as_mut())
1197 .await?
1198 };
1199
1200 let (is_completed, mut offset) = result.unwrap_or((false, 0));
1201
1202 if is_completed {
1203 tracing::info!("reward_merkle_tree_v2 migration already done");
1204 return Ok(());
1205 }
1206
1207 let max_height: Option<i64> = {
1208 let mut tx = self.db.read().await?;
1209 query_as::<(Option<i64>,)>("SELECT MAX(created) FROM reward_merkle_tree_v2")
1210 .fetch_one(tx.as_mut())
1211 .await?
1212 .0
1213 };
1214
1215 let max_height = match max_height {
1216 Some(h) => h,
1217 None => {
1218 tracing::info!("no reward data found in reward_merkle_tree_v2, skipping migration");
1219 return Ok(());
1220 },
1221 };
1222
1223 tracing::warn!(
1224 "migrating reward_merkle_tree_v2 to reward_merkle_tree_v2_data at height \
1225 {max_height}..."
1226 );
1227
1228 let mut balances: Vec<(RewardAccountV2, RewardAmount)> = Vec::new();
1229
1230 loop {
1231 let mut tx = self.db.read().await?;
1232
1233 #[cfg(not(feature = "embedded-db"))]
1234 let rows = query_as::<(serde_json::Value, serde_json::Value)>(
1235 "SELECT DISTINCT ON (idx) idx, entry
1236 FROM reward_merkle_tree_v2
1237 WHERE idx IS NOT NULL AND entry IS NOT NULL
1238 ORDER BY idx, created DESC
1239 LIMIT $1 OFFSET $2",
1240 )
1241 .bind(batch_size)
1242 .bind(offset)
1243 .fetch_all(tx.as_mut())
1244 .await
1245 .context("loading reward accounts from reward_merkle_tree_v2")?;
1246
1247 #[cfg(feature = "embedded-db")]
1248 let rows = query_as::<(serde_json::Value, serde_json::Value)>(
1249 "SELECT idx, entry FROM (
1250 SELECT idx, entry, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY created DESC) \
1251 as rn
1252 FROM reward_merkle_tree_v2
1253 WHERE idx IS NOT NULL AND entry IS NOT NULL
1254 ) sub
1255 WHERE rn = 1 ORDER BY idx
1256 LIMIT $1 OFFSET $2",
1257 )
1258 .bind(batch_size)
1259 .bind(offset)
1260 .fetch_all(tx.as_mut())
1261 .await
1262 .context("loading reward accounts from reward_merkle_tree_v2")?;
1263
1264 drop(tx);
1265
1266 if rows.is_empty() {
1267 break;
1268 }
1269
1270 let rows_count = rows.len();
1271
1272 for (idx, entry) in rows {
1273 let account: RewardAccountV2 =
1274 serde_json::from_value(idx).context("deserializing reward account")?;
1275 let balance: RewardAmount = serde_json::from_value(entry).context(format!(
1276 "deserializing reward balance for account {account}"
1277 ))?;
1278 balances.push((account, balance));
1279 }
1280
1281 offset += rows_count as i64;
1282 let mut tx = self.db.write().await?;
1283 tx.upsert(
1284 "epoch_migration",
1285 ["table_name", "completed", "migrated_rows"],
1286 ["table_name"],
1287 [("reward_merkle_tree_v2_data".to_string(), false, offset)],
1288 )
1289 .await?;
1290 tx.commit().await?;
1291
1292 tracing::info!(
1293 "reward_merkle_tree_v2 progress: rows={} offset={}",
1294 rows_count,
1295 offset
1296 );
1297
1298 if rows_count < batch_size as usize {
1299 break;
1300 }
1301 }
1302
1303 if balances.is_empty() {
1304 tracing::info!("no reward accounts found, skipping tree rebuild");
1305 return Ok(());
1306 }
1307
1308 tracing::info!(
1309 "rebuilding RewardMerkleTreeV2 from {} accounts",
1310 balances.len()
1311 );
1312
1313 let tree = RewardMerkleTreeV2::from_kv_set(REWARD_MERKLE_TREE_V2_HEIGHT, balances)
1314 .context("failed to rebuild RewardMerkleTreeV2 from balances")?;
1315
1316 let mut tx = self.db.read().await?;
1317 let header = tx
1318 .get_header(BlockId::<SeqTypes>::from(max_height as usize))
1319 .await
1320 .context(format!("header {max_height} not available"))?;
1321 drop(tx);
1322
1323 match header.reward_merkle_tree_root() {
1324 Either::Right(expected_root) => {
1325 ensure!(
1326 tree.commitment() == expected_root,
1327 "rebuilt RewardMerkleTreeV2 commitment {} does not match header commitment {} \
1328 at height {max_height}",
1329 tree.commitment(),
1330 expected_root,
1331 );
1332 },
1333 Either::Left(_) => {
1334 bail!(
1335 "header at height {max_height} has a v1 reward merkle tree root, expected v2"
1336 );
1337 },
1338 }
1339
1340 let tree_data: RewardMerkleTreeV2Data = (&tree)
1341 .try_into()
1342 .context("failed to convert RewardMerkleTreeV2 to RewardMerkleTreeV2Data")?;
1343 let serialized =
1344 bincode::serialize(&tree_data).context("failed to serialize RewardMerkleTreeV2Data")?;
1345
1346 let mut tx = self.db.write().await?;
1347 tx.upsert(
1348 "reward_merkle_tree_v2_data",
1349 ["height", "balances"],
1350 ["height"],
1351 [(max_height, serialized)],
1352 )
1353 .await?;
1354 tx.commit().await?;
1355
1356 let mut tx = self.db.write().await?;
1358 tx.upsert(
1359 "epoch_migration",
1360 ["table_name", "completed", "migrated_rows"],
1361 ["table_name"],
1362 [("reward_merkle_tree_v2_data".to_string(), true, offset)],
1363 )
1364 .await?;
1365 tx.commit().await?;
1366
1367 tracing::warn!("migrated reward_merkle_tree_v2 at height {max_height}");
1368
1369 Ok(())
1370 }
1371
1372 fn into_catchup_provider(
1373 self,
1374 backoff: BackoffParams,
1375 ) -> anyhow::Result<Arc<dyn StateCatchup>> {
1376 Ok(Arc::new(SqlStateCatchup::new(Arc::new(self.db), backoff)))
1377 }
1378
1379 async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
1380 tracing::info!("loading config from Postgres");
1381
1382 let Some(row) = self
1384 .db
1385 .read()
1386 .await?
1387 .fetch_optional("SELECT config FROM network_config ORDER BY id DESC LIMIT 1")
1388 .await?
1389 else {
1390 tracing::info!("config not found");
1391 return Ok(None);
1392 };
1393 let json = row.try_get("config")?;
1394
1395 let json = migrate_network_config(json).context("migration of network config failed")?;
1396 let config = serde_json::from_value(json).context("malformed config file")?;
1397
1398 Ok(Some(config))
1399 }
1400
1401 async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
1402 tracing::info!("saving config to database");
1403 let json = serde_json::to_value(cfg)?;
1404
1405 let mut tx = self.db.write().await?;
1406 tx.execute(query("INSERT INTO network_config (config) VALUES ($1)").bind(json))
1407 .await?;
1408 tx.commit().await
1409 }
1410
1411 async fn append_decided_leaves(
1412 &self,
1413 view: ViewNumber,
1414 leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
1415 deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
1416 consumer: &(impl EventConsumer + 'static),
1417 ) -> anyhow::Result<()> {
1418 let values = leaf_chain
1419 .into_iter()
1420 .map(|(info, cert)| {
1421 let mut leaf = info.leaf.clone();
1426 leaf.unfill_block_payload();
1427
1428 let view = cert.view_number().u64() as i64;
1429 let leaf_bytes = bincode::serialize(&leaf)?;
1430 let qc_bytes = bincode::serialize(cert.qc())?;
1431 let next_epoch_qc_bytes = match cert.next_epoch_qc() {
1432 Some(qc) => Some(bincode::serialize(qc)?),
1433 None => None,
1434 };
1435 Ok((view, leaf_bytes, qc_bytes, next_epoch_qc_bytes))
1436 })
1437 .collect::<anyhow::Result<Vec<_>>>()?;
1438
1439 let mut tx = self.db.write().await?;
1442
1443 tx.upsert(
1444 "anchor_leaf2",
1445 ["view", "leaf", "qc", "next_epoch_qc"],
1446 ["view"],
1447 values,
1448 )
1449 .await?;
1450 tx.commit().await?;
1451
1452 if let Err(err) = self.generate_decide_events(deciding_qc, consumer).await {
1455 tracing::warn!(?view, "event processing failed: {err:#}");
1459 return Ok(());
1460 }
1461
1462 if let Err(err) = self.prune(view).await {
1465 tracing::warn!(?view, "pruning failed: {err:#}");
1466 }
1467
1468 Ok(())
1469 }
1470
1471 async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1472 Ok(self
1473 .db
1474 .read()
1475 .await?
1476 .fetch_optional(query("SELECT view FROM highest_voted_view WHERE id = 0"))
1477 .await?
1478 .map(|row| {
1479 let view: i64 = row.get("view");
1480 ViewNumber::new(view as u64)
1481 }))
1482 }
1483
1484 async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1485 Ok(self
1486 .db
1487 .read()
1488 .await?
1489 .fetch_optional(query("SELECT view FROM restart_view WHERE id = 0"))
1490 .await?
1491 .map(|row| {
1492 let view: i64 = row.get("view");
1493 ViewNumber::new(view as u64)
1494 }))
1495 }
1496
1497 async fn load_anchor_leaf(
1498 &self,
1499 ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
1500 let Some(row) = self
1501 .db
1502 .read()
1503 .await?
1504 .fetch_optional("SELECT leaf, qc FROM anchor_leaf2 ORDER BY view DESC LIMIT 1")
1505 .await?
1506 else {
1507 return Ok(None);
1508 };
1509
1510 let leaf_bytes: Vec<u8> = row.get("leaf");
1511 let leaf2: Leaf2 = bincode::deserialize(&leaf_bytes)?;
1512
1513 let qc_bytes: Vec<u8> = row.get("qc");
1514 let qc2: QuorumCertificate2<SeqTypes> = bincode::deserialize(&qc_bytes)?;
1515
1516 Ok(Some((leaf2, qc2)))
1517 }
1518
1519 async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
1520 let mut tx = self.db.read().await?;
1521 let (view,) = query_as::<(i64,)>("SELECT coalesce(max(view), 0) FROM anchor_leaf2")
1522 .fetch_one(tx.as_mut())
1523 .await?;
1524 Ok(ViewNumber::new(view as u64))
1525 }
1526
1527 async fn load_da_proposal(
1528 &self,
1529 view: ViewNumber,
1530 ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
1531 let result = self
1532 .db
1533 .read()
1534 .await?
1535 .fetch_optional(
1536 query("SELECT data FROM da_proposal2 where view = $1").bind(view.u64() as i64),
1537 )
1538 .await?;
1539
1540 result
1541 .map(|row| {
1542 let bytes: Vec<u8> = row.get("data");
1543 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1544 })
1545 .transpose()
1546 }
1547
1548 async fn load_vid_share(
1549 &self,
1550 view: ViewNumber,
1551 ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
1552 let result = self
1553 .db
1554 .read()
1555 .await?
1556 .fetch_optional(
1557 query("SELECT data FROM vid_share2 where view = $1").bind(view.u64() as i64),
1558 )
1559 .await?;
1560
1561 result
1562 .map(|row| {
1563 let bytes: Vec<u8> = row.get("data");
1564 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1565 })
1566 .transpose()
1567 }
1568
1569 async fn load_quorum_proposals(
1570 &self,
1571 ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
1572 {
1573 let rows = self
1574 .db
1575 .read()
1576 .await?
1577 .fetch_all("SELECT * FROM quorum_proposals2")
1578 .await?;
1579
1580 Ok(BTreeMap::from_iter(
1581 rows.into_iter()
1582 .map(|row| {
1583 let view: i64 = row.get("view");
1584 let view_number: ViewNumber = ViewNumber::new(view.try_into()?);
1585 let bytes: Vec<u8> = row.get("data");
1586 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1587 bincode::deserialize(&bytes).or_else(|error| {
1588 bincode::deserialize::<
1589 Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>,
1590 >(&bytes)
1591 .map(convert_proposal)
1592 .inspect_err(|err_v3| {
1593 tracing::warn!(
1594 ?view_number,
1595 %error,
1596 %err_v3,
1597 "ignoring malformed quorum proposal DB row"
1598 );
1599 })
1600 })?;
1601 Ok((view_number, proposal))
1602 })
1603 .collect::<anyhow::Result<Vec<_>>>()?,
1604 ))
1605 }
1606
1607 async fn load_quorum_proposal(
1608 &self,
1609 view: ViewNumber,
1610 ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
1611 let mut tx = self.db.read().await?;
1612 let (data,) =
1613 query_as::<(Vec<u8>,)>("SELECT data FROM quorum_proposals2 WHERE view = $1 LIMIT 1")
1614 .bind(view.u64() as i64)
1615 .fetch_one(tx.as_mut())
1616 .await?;
1617 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1618 bincode::deserialize(&data).or_else(|error| {
1619 bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1620 &data,
1621 )
1622 .map(convert_proposal)
1623 .context(format!(
1624 "Failed to deserialize quorum proposal for view {view}. error={error}"
1625 ))
1626 })?;
1627
1628 Ok(proposal)
1629 }
1630
1631 async fn append_vid(
1632 &self,
1633 proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
1634 ) -> anyhow::Result<()> {
1635 let view = proposal.data.view_number().u64();
1636 let payload_hash = proposal.data.payload_commitment();
1637 let data_bytes = bincode::serialize(proposal).unwrap();
1638
1639 let now = Instant::now();
1640 let mut tx = self.db.write().await?;
1641 tx.upsert(
1642 "vid_share2",
1643 ["view", "data", "payload_hash"],
1644 ["view"],
1645 [(view as i64, data_bytes, payload_hash.to_string())],
1646 )
1647 .await?;
1648 let res = tx.commit().await;
1649 self.internal_metrics
1650 .internal_append_vid_duration
1651 .add_point(now.elapsed().as_secs_f64());
1652 res
1653 }
1654
1655 async fn append_da(
1656 &self,
1657 proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1658 vid_commit: VidCommitment,
1659 ) -> anyhow::Result<()> {
1660 let data = &proposal.data;
1661 let view = data.view_number().u64();
1662 let data_bytes = bincode::serialize(proposal).unwrap();
1663
1664 let now = Instant::now();
1665 let mut tx = self.db.write().await?;
1666 tx.upsert(
1667 "da_proposal",
1668 ["view", "data", "payload_hash"],
1669 ["view"],
1670 [(view as i64, data_bytes, vid_commit.to_string())],
1671 )
1672 .await?;
1673 let res = tx.commit().await;
1674 self.internal_metrics
1675 .internal_append_da_duration
1676 .add_point(now.elapsed().as_secs_f64());
1677 res
1678 }
1679
1680 async fn record_action(
1681 &self,
1682 view: ViewNumber,
1683 _epoch: Option<EpochNumber>,
1684 action: HotShotAction,
1685 ) -> anyhow::Result<()> {
1686 if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
1688 return Ok(());
1689 }
1690
1691 let stmt = format!(
1692 "INSERT INTO highest_voted_view (id, view) VALUES (0, $1)
1693 ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(highest_voted_view.view, excluded.view)"
1694 );
1695
1696 let mut tx = self.db.write().await?;
1697 tx.execute(query(&stmt).bind(view.u64() as i64)).await?;
1698
1699 if matches!(action, HotShotAction::Vote) {
1700 let restart_view = view + 1;
1701 let stmt = format!(
1702 "INSERT INTO restart_view (id, view) VALUES (0, $1)
1703 ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(restart_view.view, excluded.view)"
1704 );
1705 tx.execute(query(&stmt).bind(restart_view.u64() as i64))
1706 .await?;
1707 }
1708
1709 tx.commit().await
1710 }
1711
1712 async fn append_quorum_proposal2(
1713 &self,
1714 proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1715 ) -> anyhow::Result<()> {
1716 let view_number = proposal.data.view_number().u64();
1717
1718 let proposal_bytes = bincode::serialize(&proposal).context("serializing proposal")?;
1719 let leaf_hash = Committable::commit(&Leaf2::from_quorum_proposal(&proposal.data));
1720
1721 let now = Instant::now();
1722 let mut tx = self.db.write().await?;
1723 tx.upsert(
1724 "quorum_proposals2",
1725 ["view", "leaf_hash", "data"],
1726 ["view"],
1727 [(view_number as i64, leaf_hash.to_string(), proposal_bytes)],
1728 )
1729 .await?;
1730
1731 let justify_qc = proposal.data.justify_qc();
1733 let justify_qc_bytes = bincode::serialize(&justify_qc).context("serializing QC")?;
1734 tx.upsert(
1735 "quorum_certificate2",
1736 ["view", "leaf_hash", "data"],
1737 ["view"],
1738 [(
1739 justify_qc.view_number.u64() as i64,
1740 justify_qc.data.leaf_commit.to_string(),
1741 &justify_qc_bytes,
1742 )],
1743 )
1744 .await?;
1745 let res = tx.commit().await;
1746 self.internal_metrics
1747 .internal_append_quorum2_duration
1748 .add_point(now.elapsed().as_secs_f64());
1749 res
1750 }
1751
1752 async fn load_upgrade_certificate(
1753 &self,
1754 ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
1755 let result = self
1756 .db
1757 .read()
1758 .await?
1759 .fetch_optional("SELECT * FROM upgrade_certificate where id = true")
1760 .await?;
1761
1762 result
1763 .map(|row| {
1764 let bytes: Vec<u8> = row.get("data");
1765 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1766 })
1767 .transpose()
1768 }
1769
1770 async fn store_upgrade_certificate(
1771 &self,
1772 decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1773 ) -> anyhow::Result<()> {
1774 let certificate = match decided_upgrade_certificate {
1775 Some(cert) => cert,
1776 None => return Ok(()),
1777 };
1778 let upgrade_certificate_bytes =
1779 bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1780 let mut tx = self.db.write().await?;
1781 tx.upsert(
1782 "upgrade_certificate",
1783 ["id", "data"],
1784 ["id"],
1785 [(true, upgrade_certificate_bytes)],
1786 )
1787 .await?;
1788 tx.commit().await
1789 }
1790
1791 async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
1792 let batch_size: i64 = 10000;
1793 let mut tx = self.db.read().await?;
1794
1795 let (is_completed, mut offset) = query_as::<(bool, i64)>(
1800 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'anchor_leaf'",
1801 )
1802 .fetch_one(tx.as_mut())
1803 .await?;
1804
1805 if is_completed {
1806 tracing::info!("decided leaves already migrated");
1807 return Ok(());
1808 }
1809
1810 tracing::warn!("migrating decided leaves..");
1811 loop {
1812 let mut tx = self.db.read().await?;
1813 let rows = query(
1814 "SELECT view, leaf, qc FROM anchor_leaf WHERE view >= $1 ORDER BY view LIMIT $2",
1815 )
1816 .bind(offset)
1817 .bind(batch_size)
1818 .fetch_all(tx.as_mut())
1819 .await?;
1820
1821 drop(tx);
1822 if rows.is_empty() {
1823 break;
1824 }
1825 let mut values = Vec::new();
1826
1827 for row in rows.iter() {
1828 let leaf: Vec<u8> = row.try_get("leaf")?;
1829 let qc: Vec<u8> = row.try_get("qc")?;
1830 let leaf1: Leaf = bincode::deserialize(&leaf)?;
1831 let qc1: QuorumCertificate<SeqTypes> = bincode::deserialize(&qc)?;
1832 let view: i64 = row.try_get("view")?;
1833
1834 let leaf2: Leaf2 = leaf1.into();
1835 let qc2: QuorumCertificate2<SeqTypes> = qc1.to_qc2();
1836
1837 let leaf2_bytes = bincode::serialize(&leaf2)?;
1838 let qc2_bytes = bincode::serialize(&qc2)?;
1839
1840 values.push((view, leaf2_bytes, qc2_bytes));
1841 }
1842
1843 let mut query_builder: sqlx::QueryBuilder<Db> =
1844 sqlx::QueryBuilder::new("INSERT INTO anchor_leaf2 (view, leaf, qc) ");
1845
1846 offset = values.last().context("last row")?.0;
1847
1848 query_builder.push_values(values, |mut b, (view, leaf, qc)| {
1849 b.push_bind(view).push_bind(leaf).push_bind(qc);
1850 });
1851
1852 query_builder.push(" ON CONFLICT DO NOTHING");
1855
1856 let query = query_builder.build();
1857
1858 let mut tx = self.db.write().await?;
1859 query.execute(tx.as_mut()).await?;
1860
1861 tx.upsert(
1862 "epoch_migration",
1863 ["table_name", "completed", "migrated_rows"],
1864 ["table_name"],
1865 [("anchor_leaf".to_string(), false, offset)],
1866 )
1867 .await?;
1868 tx.commit().await?;
1869
1870 tracing::info!(
1871 "anchor leaf migration progress: rows={} offset={}",
1872 rows.len(),
1873 offset
1874 );
1875
1876 if rows.len() < batch_size as usize {
1877 break;
1878 }
1879 }
1880
1881 tracing::warn!("migrated decided leaves");
1882
1883 let mut tx = self.db.write().await?;
1884 tx.upsert(
1885 "epoch_migration",
1886 ["table_name", "completed", "migrated_rows"],
1887 ["table_name"],
1888 [("anchor_leaf".to_string(), true, offset)],
1889 )
1890 .await?;
1891 tx.commit().await?;
1892
1893 tracing::info!("updated epoch_migration table for anchor_leaf");
1894
1895 Ok(())
1896 }
1897
1898 async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
1899 let batch_size: i64 = 10000;
1900 let mut tx = self.db.read().await?;
1901
1902 let (is_completed, mut offset) = query_as::<(bool, i64)>(
1903 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'da_proposal'",
1904 )
1905 .fetch_one(tx.as_mut())
1906 .await?;
1907
1908 if is_completed {
1909 tracing::info!("da proposals migration already done");
1910 return Ok(());
1911 }
1912
1913 tracing::warn!("migrating da proposals..");
1914
1915 loop {
1916 let mut tx = self.db.read().await?;
1917 let rows = query(
1918 "SELECT payload_hash, data FROM da_proposal WHERE view >= $1 ORDER BY view LIMIT \
1919 $2",
1920 )
1921 .bind(offset)
1922 .bind(batch_size)
1923 .fetch_all(tx.as_mut())
1924 .await?;
1925
1926 drop(tx);
1927 if rows.is_empty() {
1928 break;
1929 }
1930 let mut values = Vec::new();
1931
1932 for row in rows.iter() {
1933 let data: Vec<u8> = row.try_get("data")?;
1934 let payload_hash: String = row.try_get("payload_hash")?;
1935
1936 let da_proposal: Proposal<SeqTypes, DaProposal<SeqTypes>> =
1937 bincode::deserialize(&data)?;
1938 let da_proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
1939 convert_proposal(da_proposal);
1940
1941 let view = da_proposal2.data.view_number.u64() as i64;
1942 let data = bincode::serialize(&da_proposal2)?;
1943
1944 values.push((view, payload_hash, data));
1945 }
1946
1947 let mut query_builder: sqlx::QueryBuilder<Db> =
1948 sqlx::QueryBuilder::new("INSERT INTO da_proposal2 (view, payload_hash, data) ");
1949
1950 offset = values.last().context("last row")?.0;
1951 query_builder.push_values(values, |mut b, (view, payload_hash, data)| {
1952 b.push_bind(view).push_bind(payload_hash).push_bind(data);
1953 });
1954 query_builder.push(" ON CONFLICT DO NOTHING");
1955 let query = query_builder.build();
1956
1957 let mut tx = self.db.write().await?;
1958 query.execute(tx.as_mut()).await?;
1959
1960 tx.upsert(
1961 "epoch_migration",
1962 ["table_name", "completed", "migrated_rows"],
1963 ["table_name"],
1964 [("da_proposal".to_string(), false, offset)],
1965 )
1966 .await?;
1967 tx.commit().await?;
1968
1969 tracing::info!(
1970 "DA proposals migration progress: rows={} offset={}",
1971 rows.len(),
1972 offset
1973 );
1974 if rows.len() < batch_size as usize {
1975 break;
1976 }
1977 }
1978
1979 tracing::warn!("migrated da proposals");
1980
1981 let mut tx = self.db.write().await?;
1982 tx.upsert(
1983 "epoch_migration",
1984 ["table_name", "completed", "migrated_rows"],
1985 ["table_name"],
1986 [("da_proposal".to_string(), true, offset)],
1987 )
1988 .await?;
1989 tx.commit().await?;
1990
1991 tracing::info!("updated epoch_migration table for da_proposal");
1992
1993 Ok(())
1994 }
1995
1996 async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
1997 let batch_size: i64 = 10000;
1998
1999 let mut tx = self.db.read().await?;
2000
2001 let (is_completed, mut offset) = query_as::<(bool, i64)>(
2002 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'vid_share'",
2003 )
2004 .fetch_one(tx.as_mut())
2005 .await?;
2006
2007 if is_completed {
2008 tracing::info!("vid_share migration already done");
2009 return Ok(());
2010 }
2011
2012 tracing::warn!("migrating vid shares..");
2013 loop {
2014 let mut tx = self.db.read().await?;
2015 let rows = query(
2016 "SELECT payload_hash, data FROM vid_share WHERE view >= $1 ORDER BY view LIMIT $2",
2017 )
2018 .bind(offset)
2019 .bind(batch_size)
2020 .fetch_all(tx.as_mut())
2021 .await?;
2022
2023 drop(tx);
2024 if rows.is_empty() {
2025 break;
2026 }
2027 let mut values = Vec::new();
2028
2029 for row in rows.iter() {
2030 let data: Vec<u8> = row.try_get("data")?;
2031 let payload_hash: String = row.try_get("payload_hash")?;
2032
2033 let vid_share: Proposal<SeqTypes, VidDisperseShare0<SeqTypes>> =
2034 bincode::deserialize(&data)?;
2035 let vid_share2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
2036 convert_proposal(vid_share);
2037
2038 let view = vid_share2.data.view_number().u64() as i64;
2039 let data = bincode::serialize(&vid_share2)?;
2040
2041 values.push((view, payload_hash, data));
2042 }
2043
2044 let mut query_builder: sqlx::QueryBuilder<Db> =
2045 sqlx::QueryBuilder::new("INSERT INTO vid_share2 (view, payload_hash, data) ");
2046
2047 offset = values.last().context("last row")?.0;
2048
2049 query_builder.push_values(values, |mut b, (view, payload_hash, data)| {
2050 b.push_bind(view).push_bind(payload_hash).push_bind(data);
2051 });
2052
2053 let query = query_builder.build();
2054
2055 let mut tx = self.db.write().await?;
2056 query.execute(tx.as_mut()).await?;
2057
2058 tx.upsert(
2059 "epoch_migration",
2060 ["table_name", "completed", "migrated_rows"],
2061 ["table_name"],
2062 [("vid_share".to_string(), false, offset)],
2063 )
2064 .await?;
2065 tx.commit().await?;
2066
2067 tracing::info!(
2068 "VID shares migration progress: rows={} offset={}",
2069 rows.len(),
2070 offset
2071 );
2072 if rows.len() < batch_size as usize {
2073 break;
2074 }
2075 }
2076
2077 tracing::warn!("migrated vid shares");
2078
2079 let mut tx = self.db.write().await?;
2080 tx.upsert(
2081 "epoch_migration",
2082 ["table_name", "completed", "migrated_rows"],
2083 ["table_name"],
2084 [("vid_share".to_string(), true, offset)],
2085 )
2086 .await?;
2087 tx.commit().await?;
2088
2089 tracing::info!("updated epoch_migration table for vid_share");
2090
2091 Ok(())
2092 }
2093
2094 async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
2095 let batch_size: i64 = 10000;
2096 let mut tx = self.db.read().await?;
2097
2098 let (is_completed, mut offset) = query_as::<(bool, i64)>(
2099 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
2100 'quorum_proposals'",
2101 )
2102 .fetch_one(tx.as_mut())
2103 .await?;
2104
2105 if is_completed {
2106 tracing::info!("quorum proposals migration already done");
2107 return Ok(());
2108 }
2109
2110 tracing::warn!("migrating quorum proposals..");
2111
2112 loop {
2113 let mut tx = self.db.read().await?;
2114 let rows = query(
2115 "SELECT view, leaf_hash, data FROM quorum_proposals WHERE view >= $1 ORDER BY \
2116 view LIMIT $2",
2117 )
2118 .bind(offset)
2119 .bind(batch_size)
2120 .fetch_all(tx.as_mut())
2121 .await?;
2122
2123 drop(tx);
2124
2125 if rows.is_empty() {
2126 break;
2127 }
2128
2129 let mut values = Vec::new();
2130
2131 for row in rows.iter() {
2132 let leaf_hash: String = row.try_get("leaf_hash")?;
2133 let data: Vec<u8> = row.try_get("data")?;
2134
2135 let quorum_proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
2136 bincode::deserialize(&data)?;
2137 let quorum_proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
2138 convert_proposal(quorum_proposal);
2139
2140 let view = quorum_proposal2.data.view_number().u64() as i64;
2141 let data = bincode::serialize(&quorum_proposal2)?;
2142
2143 values.push((view, leaf_hash, data));
2144 }
2145
2146 let mut query_builder: sqlx::QueryBuilder<Db> =
2147 sqlx::QueryBuilder::new("INSERT INTO quorum_proposals2 (view, leaf_hash, data) ");
2148
2149 offset = values.last().context("last row")?.0;
2150 query_builder.push_values(values, |mut b, (view, leaf_hash, data)| {
2151 b.push_bind(view).push_bind(leaf_hash).push_bind(data);
2152 });
2153
2154 query_builder.push(" ON CONFLICT DO NOTHING");
2155
2156 let query = query_builder.build();
2157
2158 let mut tx = self.db.write().await?;
2159 query.execute(tx.as_mut()).await?;
2160
2161 tx.upsert(
2162 "epoch_migration",
2163 ["table_name", "completed", "migrated_rows"],
2164 ["table_name"],
2165 [("quorum_proposals".to_string(), false, offset)],
2166 )
2167 .await?;
2168 tx.commit().await?;
2169
2170 tracing::info!(
2171 "quorum proposals migration progress: rows={} offset={}",
2172 rows.len(),
2173 offset
2174 );
2175
2176 if rows.len() < batch_size as usize {
2177 break;
2178 }
2179 }
2180
2181 tracing::warn!("migrated quorum proposals");
2182
2183 let mut tx = self.db.write().await?;
2184 tx.upsert(
2185 "epoch_migration",
2186 ["table_name", "completed", "migrated_rows"],
2187 ["table_name"],
2188 [("quorum_proposals".to_string(), true, offset)],
2189 )
2190 .await?;
2191 tx.commit().await?;
2192
2193 tracing::info!("updated epoch_migration table for quorum_proposals");
2194
2195 Ok(())
2196 }
2197
2198 async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
2199 let batch_size: i64 = 10000;
2200 let mut tx = self.db.read().await?;
2201
2202 let (is_completed, mut offset) = query_as::<(bool, i64)>(
2203 "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
2204 'quorum_certificate'",
2205 )
2206 .fetch_one(tx.as_mut())
2207 .await?;
2208
2209 if is_completed {
2210 tracing::info!("quorum certificates migration already done");
2211 return Ok(());
2212 }
2213
2214 tracing::warn!("migrating quorum certificates..");
2215 loop {
2216 let mut tx = self.db.read().await?;
2217 let rows = query(
2218 "SELECT view, leaf_hash, data FROM quorum_certificate WHERE view >= $1 ORDER BY \
2219 view LIMIT $2",
2220 )
2221 .bind(offset)
2222 .bind(batch_size)
2223 .fetch_all(tx.as_mut())
2224 .await?;
2225
2226 drop(tx);
2227 if rows.is_empty() {
2228 break;
2229 }
2230 let mut values = Vec::new();
2231
2232 for row in rows.iter() {
2233 let leaf_hash: String = row.try_get("leaf_hash")?;
2234 let data: Vec<u8> = row.try_get("data")?;
2235
2236 let qc: QuorumCertificate<SeqTypes> = bincode::deserialize(&data)?;
2237 let qc2: QuorumCertificate2<SeqTypes> = qc.to_qc2();
2238
2239 let view = qc2.view_number().u64() as i64;
2240 let data = bincode::serialize(&qc2)?;
2241
2242 values.push((view, leaf_hash, data));
2243 }
2244
2245 let mut query_builder: sqlx::QueryBuilder<Db> =
2246 sqlx::QueryBuilder::new("INSERT INTO quorum_certificate2 (view, leaf_hash, data) ");
2247
2248 offset = values.last().context("last row")?.0;
2249
2250 query_builder.push_values(values, |mut b, (view, leaf_hash, data)| {
2251 b.push_bind(view).push_bind(leaf_hash).push_bind(data);
2252 });
2253
2254 query_builder.push(" ON CONFLICT DO NOTHING");
2255 let query = query_builder.build();
2256
2257 let mut tx = self.db.write().await?;
2258 query.execute(tx.as_mut()).await?;
2259
2260 tx.upsert(
2261 "epoch_migration",
2262 ["table_name", "completed", "migrated_rows"],
2263 ["table_name"],
2264 [("quorum_certificate".to_string(), false, offset)],
2265 )
2266 .await?;
2267 tx.commit().await?;
2268
2269 tracing::info!(
2270 "Quorum certificates migration progress: rows={} offset={}",
2271 rows.len(),
2272 offset
2273 );
2274
2275 if rows.len() < batch_size as usize {
2276 break;
2277 }
2278 }
2279
2280 tracing::warn!("migrated quorum certificates");
2281
2282 let mut tx = self.db.write().await?;
2283 tx.upsert(
2284 "epoch_migration",
2285 ["table_name", "completed", "migrated_rows"],
2286 ["table_name"],
2287 [("quorum_certificate".to_string(), true, offset)],
2288 )
2289 .await?;
2290 tx.commit().await?;
2291 tracing::info!("updated epoch_migration table for quorum_certificate");
2292
2293 Ok(())
2294 }
2295
2296 async fn migrate_x25519_keys(&self) -> anyhow::Result<()> {
2301 use super::RegisteredValidatorNoX25519;
2302
2303 let name = DataMigration::X25519Keys.as_str();
2304
2305 if !self
2307 .is_migration_complete(name, "epoch_drb_and_root")
2308 .await?
2309 {
2310 let rows: Vec<(i64, Vec<u8>)> = {
2311 let mut tx = self.db.read().await?;
2312 query_as("SELECT epoch, stake FROM epoch_drb_and_root WHERE stake IS NOT NULL")
2313 .fetch_all(tx.as_mut())
2314 .await?
2315 };
2316
2317 let num_rows = rows.len();
2318 let mut tx = self.db.write().await?;
2319 for (epoch, stake_bytes) in rows {
2320 if bincode::deserialize::<AuthenticatedValidatorMap>(&stake_bytes).is_ok() {
2322 continue;
2323 }
2324
2325 let old_validators: IndexMap<Address, RegisteredValidatorNoX25519> =
2327 bincode::deserialize(&stake_bytes)
2328 .context("deserializing legacy stake table")?;
2329 let validators: AuthenticatedValidatorMap = old_validators
2330 .into_iter()
2331 .map(|(addr, v)| {
2332 let registered = v.migrate();
2333 (
2334 addr,
2335 AuthenticatedValidator::try_from(registered)
2336 .expect("stake tables only contain authenticated validators"),
2337 )
2338 })
2339 .collect();
2340
2341 let new_bytes =
2342 bincode::serialize(&validators).context("serializing stake table")?;
2343
2344 tracing::debug!(epoch, "migrating x25519 keys in stake table");
2345 tx.execute(
2346 query("UPDATE epoch_drb_and_root SET stake = $1 WHERE epoch = $2")
2347 .bind(&new_bytes)
2348 .bind(epoch),
2349 )
2350 .await?;
2351 }
2352 Self::mark_migration_complete(&mut tx, name, "epoch_drb_and_root", num_rows).await?;
2353 tx.commit().await?;
2354 tracing::info!(
2355 num_rows,
2356 "x25519_keys migration completed for epoch_drb_and_root"
2357 );
2358 }
2359
2360 if !self
2362 .is_migration_complete(name, "stake_table_validators")
2363 .await?
2364 {
2365 let rows: Vec<(i64, String, serde_json::Value)> = {
2366 let mut tx = self.db.read().await?;
2367 query_as("SELECT epoch, address, validator FROM stake_table_validators")
2368 .fetch_all(tx.as_mut())
2369 .await?
2370 };
2371
2372 let num_rows = rows.len();
2373 let mut tx = self.db.write().await?;
2374 for (epoch, address, validator_json) in rows {
2375 if validator_json
2378 .as_object()
2379 .is_some_and(|obj| obj.contains_key("x25519_key"))
2380 {
2381 continue;
2382 }
2383
2384 let validator: RegisteredValidator<PubKey> =
2387 serde_json::from_value(validator_json).context("deserializing validator")?;
2388
2389 let new_json = serde_json::to_value(&validator).context("serializing validator")?;
2390
2391 tracing::debug!(epoch, %address, "migrating x25519 keys for validator");
2392 tx.execute(
2393 query(
2394 "UPDATE stake_table_validators SET validator = $1 WHERE epoch = $2 AND \
2395 address = $3",
2396 )
2397 .bind(&new_json)
2398 .bind(epoch)
2399 .bind(&address),
2400 )
2401 .await?;
2402 }
2403 Self::mark_migration_complete(&mut tx, name, "stake_table_validators", num_rows)
2404 .await?;
2405 tx.commit().await?;
2406 tracing::info!(
2407 num_rows,
2408 "x25519_keys migration completed for stake_table_validators"
2409 );
2410 }
2411
2412 Ok(())
2413 }
2414
2415 async fn store_next_epoch_quorum_certificate(
2416 &self,
2417 high_qc: NextEpochQuorumCertificate2<SeqTypes>,
2418 ) -> anyhow::Result<()> {
2419 let qc2_bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
2420 let mut tx = self.db.write().await?;
2421 tx.upsert(
2422 "next_epoch_quorum_certificate",
2423 ["id", "data"],
2424 ["id"],
2425 [(true, qc2_bytes)],
2426 )
2427 .await?;
2428 tx.commit().await
2429 }
2430
2431 async fn load_next_epoch_quorum_certificate(
2432 &self,
2433 ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
2434 let result = self
2435 .db
2436 .read()
2437 .await?
2438 .fetch_optional("SELECT * FROM next_epoch_quorum_certificate where id = true")
2439 .await?;
2440
2441 result
2442 .map(|row| {
2443 let bytes: Vec<u8> = row.get("data");
2444 anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
2445 })
2446 .transpose()
2447 }
2448
2449 async fn store_eqc(
2450 &self,
2451 high_qc: QuorumCertificate2<SeqTypes>,
2452 next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
2453 ) -> anyhow::Result<()> {
2454 let eqc_bytes =
2455 bincode::serialize(&(high_qc, next_epoch_high_qc)).context("serializing eqc")?;
2456 let mut tx = self.db.write().await?;
2457 tx.upsert("eqc", ["id", "data"], ["id"], [(true, eqc_bytes)])
2458 .await?;
2459 tx.commit().await
2460 }
2461
2462 async fn load_eqc(
2463 &self,
2464 ) -> Option<(
2465 QuorumCertificate2<SeqTypes>,
2466 NextEpochQuorumCertificate2<SeqTypes>,
2467 )> {
2468 let result = self
2469 .db
2470 .read()
2471 .await
2472 .ok()?
2473 .fetch_optional("SELECT * FROM eqc where id = true")
2474 .await
2475 .ok()?;
2476
2477 result
2478 .map(|row| {
2479 let bytes: Vec<u8> = row.get("data");
2480 bincode::deserialize(&bytes)
2481 })
2482 .transpose()
2483 .ok()?
2484 }
2485
2486 async fn append_da2(
2487 &self,
2488 proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
2489 vid_commit: VidCommitment,
2490 ) -> anyhow::Result<()> {
2491 let data = &proposal.data;
2492 let view = data.view_number().u64();
2493 let data_bytes = bincode::serialize(proposal).unwrap();
2494
2495 let now = Instant::now();
2496 let mut tx = self.db.write().await?;
2497 tx.upsert(
2498 "da_proposal2",
2499 ["view", "data", "payload_hash"],
2500 ["view"],
2501 [(view as i64, data_bytes, vid_commit.to_string())],
2502 )
2503 .await?;
2504 let res = tx.commit().await;
2505 self.internal_metrics
2506 .internal_append_da2_duration
2507 .add_point(now.elapsed().as_secs_f64());
2508 res
2509 }
2510
2511 async fn store_drb_result(
2512 &self,
2513 epoch: EpochNumber,
2514 drb_result: DrbResult,
2515 ) -> anyhow::Result<()> {
2516 let drb_result_vec = Vec::from(drb_result);
2517 let mut tx = self.db.write().await?;
2518 tx.upsert(
2519 "epoch_drb_and_root",
2520 ["epoch", "drb_result"],
2521 ["epoch"],
2522 [(epoch.u64() as i64, drb_result_vec)],
2523 )
2524 .await?;
2525 tx.commit().await
2526 }
2527
2528 async fn store_epoch_root(
2529 &self,
2530 epoch: EpochNumber,
2531 block_header: <SeqTypes as NodeType>::BlockHeader,
2532 ) -> anyhow::Result<()> {
2533 let block_header_bytes =
2534 bincode::serialize(&block_header).context("serializing block header")?;
2535
2536 let mut tx = self.db.write().await?;
2537 tx.upsert(
2538 "epoch_drb_and_root",
2539 ["epoch", "block_header"],
2540 ["epoch"],
2541 [(epoch.u64() as i64, block_header_bytes)],
2542 )
2543 .await?;
2544 tx.commit().await
2545 }
2546
2547 async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
2548 if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
2549 if loaded_drb_input.difficulty_level != drb_input.difficulty_level {
2550 tracing::error!("Overwriting {loaded_drb_input:?} in storage with {drb_input:?}");
2551 } else if loaded_drb_input.iteration >= drb_input.iteration {
2552 anyhow::bail!(
2553 "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
2554 loaded_drb_input,
2555 drb_input
2556 )
2557 }
2558 }
2559
2560 let drb_input_bytes = bincode::serialize(&drb_input)
2561 .context("Failed to serialize DrbInput. This is not fatal, but should never happen.")?;
2562
2563 let mut tx = self.db.write().await?;
2564
2565 tx.upsert(
2566 "drb",
2567 ["epoch", "drb_input"],
2568 ["epoch"],
2569 [(drb_input.epoch as i64, drb_input_bytes)],
2570 )
2571 .await?;
2572 tx.commit().await
2573 }
2574
2575 async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
2576 let row = self
2577 .db
2578 .read()
2579 .await?
2580 .fetch_optional(query("SELECT drb_input FROM drb WHERE epoch = $1").bind(epoch as i64))
2581 .await?;
2582
2583 match row {
2584 None => anyhow::bail!("No DrbInput for epoch {} in storage", epoch),
2585 Some(row) => {
2586 let drb_input_bytes: Vec<u8> = row.try_get("drb_input")?;
2587 let drb_input = bincode::deserialize(&drb_input_bytes)
2588 .context("Failed to deserialize drb_input from storage")?;
2589
2590 Ok(drb_input)
2591 },
2592 }
2593 }
2594
2595 async fn add_state_cert(
2596 &self,
2597 state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
2598 ) -> anyhow::Result<()> {
2599 let state_cert_bytes = bincode::serialize(&state_cert)
2600 .context("serializing light client state update certificate")?;
2601
2602 let mut tx = self.db.write().await?;
2603 tx.upsert(
2604 "state_cert",
2605 ["view", "state_cert"],
2606 ["view"],
2607 [(
2608 state_cert.light_client_state.view_number as i64,
2609 state_cert_bytes,
2610 )],
2611 )
2612 .await?;
2613 tx.commit().await
2614 }
2615
2616 async fn load_state_cert(
2617 &self,
2618 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
2619 let Some(row) = self
2620 .db
2621 .read()
2622 .await?
2623 .fetch_optional(
2624 "SELECT state_cert FROM finalized_state_cert ORDER BY epoch DESC LIMIT 1",
2625 )
2626 .await?
2627 else {
2628 return Ok(None);
2629 };
2630 let bytes: Vec<u8> = row.get("state_cert");
2631
2632 let cert = match bincode::deserialize(&bytes) {
2633 Ok(cert) => cert,
2634 Err(err) => {
2635 tracing::info!(
2636 error = %err,
2637 "Failed to deserialize state certificate with v2. attempting with v1"
2638 );
2639
2640 let v1_cert =
2641 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
2642 .with_context(|| {
2643 format!("Failed to deserialize using both v1 and v2. error: {err}")
2644 })?;
2645
2646 v1_cert.into()
2647 },
2648 };
2649
2650 Ok(Some(cert))
2651 }
2652
2653 async fn get_state_cert_by_epoch(
2654 &self,
2655 epoch: u64,
2656 ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
2657 let Some(row) = self
2658 .db
2659 .read()
2660 .await?
2661 .fetch_optional(
2662 query("SELECT state_cert FROM finalized_state_cert WHERE epoch = $1")
2663 .bind(epoch as i64),
2664 )
2665 .await?
2666 else {
2667 return Ok(None);
2668 };
2669 let bytes: Vec<u8> = row.get("state_cert");
2670
2671 let cert = match bincode::deserialize(&bytes) {
2672 Ok(cert) => cert,
2673 Err(err) => {
2674 tracing::info!(
2675 error = %err,
2676 "Failed to deserialize state certificate with v2. attempting with v1"
2677 );
2678
2679 let v1_cert =
2680 bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
2681 .with_context(|| {
2682 format!("Failed to deserialize using both v1 and v2. error: {err}")
2683 })?;
2684
2685 v1_cert.into()
2686 },
2687 };
2688
2689 Ok(Some(cert))
2690 }
2691
2692 async fn insert_state_cert(
2693 &self,
2694 epoch: u64,
2695 cert: LightClientStateUpdateCertificateV2<SeqTypes>,
2696 ) -> anyhow::Result<()> {
2697 let bytes = bincode::serialize(&cert)
2698 .with_context(|| format!("Failed to serialize state cert for epoch {epoch}"))?;
2699
2700 let mut tx = self.db.write().await?;
2701
2702 tx.upsert(
2703 "finalized_state_cert",
2704 ["epoch", "state_cert"],
2705 ["epoch"],
2706 [(epoch as i64, bytes)],
2707 )
2708 .await
2709 }
2710
2711 async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
2712 let rows = self
2713 .db
2714 .read()
2715 .await?
2716 .fetch_all(
2717 query("SELECT * from epoch_drb_and_root ORDER BY epoch DESC LIMIT $1")
2718 .bind(RECENT_STAKE_TABLES_LIMIT as i64),
2719 )
2720 .await?;
2721
2722 rows.into_iter()
2724 .rev()
2725 .map(|row| {
2726 let epoch: i64 = row.try_get("epoch")?;
2727 let drb_result: Option<Vec<u8>> = row.try_get("drb_result")?;
2728 let block_header: Option<Vec<u8>> = row.try_get("block_header")?;
2729 if let Some(drb_result) = drb_result {
2730 let drb_result_array = drb_result
2731 .try_into()
2732 .or_else(|_| bail!("invalid drb result"))?;
2733 let block_header: Option<<SeqTypes as NodeType>::BlockHeader> = block_header
2734 .map(|data| bincode::deserialize(&data))
2735 .transpose()?;
2736 Ok(Some(InitializerEpochInfo::<SeqTypes> {
2737 epoch: EpochNumber::new(epoch as u64),
2738 drb_result: drb_result_array,
2739 block_header,
2740 }))
2741 } else {
2742 Ok(None)
2745 }
2746 })
2747 .filter_map(|e| match e {
2748 Err(v) => Some(Err(v)),
2749 Ok(Some(v)) => Some(Ok(v)),
2750 Ok(None) => None,
2751 })
2752 .collect()
2753 }
2754
2755 fn enable_metrics(&mut self, metrics: &dyn Metrics) {
2756 self.internal_metrics = PersistenceMetricsValue::new(metrics);
2757 }
2758}
2759
2760#[async_trait]
2761impl MembershipPersistence for Persistence {
2762 async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<StakeTuple>> {
2763 let result = self
2764 .db
2765 .read()
2766 .await?
2767 .fetch_optional(
2768 query(
2769 "SELECT stake, block_reward, stake_table_hash FROM epoch_drb_and_root WHERE \
2770 epoch = $1",
2771 )
2772 .bind(epoch.u64() as i64),
2773 )
2774 .await?;
2775
2776 result
2777 .map(|row| {
2778 let stake_table_bytes: Vec<u8> = row.get("stake");
2779 let reward_bytes: Option<Vec<u8>> = row.get("block_reward");
2780 let stake_table_hash_bytes: Option<Vec<u8>> = row.get("stake_table_hash");
2781 let stake_table: AuthenticatedValidatorMap =
2782 bincode::deserialize(&stake_table_bytes)
2783 .context("deserializing stake table")?;
2784 let reward: Option<RewardAmount> = reward_bytes
2785 .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
2786 .transpose()?;
2787 let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes
2788 .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
2789 .transpose()?;
2790
2791 Ok((stake_table, reward, stake_table_hash))
2792 })
2793 .transpose()
2794 }
2795
2796 async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
2797 let mut tx = self.db.read().await?;
2798
2799 let rows = match query_as::<(i64, Vec<u8>, Option<Vec<u8>>, Option<Vec<u8>>)>(
2800 "SELECT epoch, stake, block_reward, stake_table_hash FROM epoch_drb_and_root WHERE \
2801 stake is NOT NULL ORDER BY epoch DESC LIMIT $1",
2802 )
2803 .bind(limit as i64)
2804 .fetch_all(tx.as_mut())
2805 .await
2806 {
2807 Ok(bytes) => bytes,
2808 Err(err) => {
2809 tracing::error!("error loading stake tables: {err:#}");
2810 bail!("{err:#}");
2811 },
2812 };
2813
2814 let stakes: anyhow::Result<Vec<IndexedStake>> = rows
2815 .into_iter()
2816 .map(
2817 |(id, stake_bytes, reward_bytes_opt, stake_table_hash_bytes_opt)| {
2818 let stake_table: AuthenticatedValidatorMap =
2819 bincode::deserialize(&stake_bytes).context("deserializing stake table")?;
2820
2821 let block_reward: Option<RewardAmount> = reward_bytes_opt
2822 .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
2823 .transpose()?;
2824
2825 let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes_opt
2826 .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
2827 .transpose()?;
2828
2829 Ok((
2830 EpochNumber::new(id as u64),
2831 (stake_table, block_reward),
2832 stake_table_hash,
2833 ))
2834 },
2835 )
2836 .collect();
2837
2838 Ok(Some(stakes?))
2839 }
2840
2841 async fn store_stake(
2842 &self,
2843 epoch: EpochNumber,
2844 stake: AuthenticatedValidatorMap,
2845 block_reward: Option<RewardAmount>,
2846 stake_table_hash: Option<StakeTableHash>,
2847 ) -> anyhow::Result<()> {
2848 let mut tx = self.db.write().await?;
2849
2850 let stake_table_bytes = bincode::serialize(&stake).context("serializing stake table")?;
2851 let reward_bytes = block_reward
2852 .map(|r| bincode::serialize(&r).context("serializing block reward"))
2853 .transpose()?;
2854 let stake_table_hash_bytes = stake_table_hash
2855 .map(|h| bincode::serialize(&h).context("serializing stake table hash"))
2856 .transpose()?;
2857 tx.upsert(
2858 "epoch_drb_and_root",
2859 ["epoch", "stake", "block_reward", "stake_table_hash"],
2860 ["epoch"],
2861 [(
2862 epoch.u64() as i64,
2863 stake_table_bytes,
2864 reward_bytes,
2865 stake_table_hash_bytes,
2866 )],
2867 )
2868 .await?;
2869 tx.commit().await
2870 }
2871
2872 async fn store_events(
2873 &self,
2874 l1_finalized: u64,
2875 events: Vec<(EventKey, StakeTableEvent)>,
2876 ) -> anyhow::Result<()> {
2877 let mut tx = self.db.write().await?;
2878
2879 let last_processed_l1_block = query_as::<(i64,)>(
2881 "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
2882 )
2883 .fetch_optional(tx.as_mut())
2884 .await?
2885 .map(|(l1,)| l1);
2886
2887 tracing::debug!("last l1 finalizes in database = {last_processed_l1_block:?}");
2888
2889 if last_processed_l1_block > Some(l1_finalized.try_into()?) {
2891 tracing::debug!(
2892 ?last_processed_l1_block,
2893 ?l1_finalized,
2894 ?events,
2895 "last l1 finalized stored is already higher"
2896 );
2897 return Ok(());
2898 }
2899
2900 if !events.is_empty() {
2901 let mut query_builder: sqlx::QueryBuilder<Db> = sqlx::QueryBuilder::new(
2902 "INSERT INTO stake_table_events (l1_block, log_index, event) ",
2903 );
2904
2905 let events = events
2906 .into_iter()
2907 .map(|((block_number, index), event)| {
2908 Ok((
2909 i64::try_from(block_number)?,
2910 i64::try_from(index)?,
2911 serde_json::to_value(event).context("l1 event to value")?,
2912 ))
2913 })
2914 .collect::<anyhow::Result<Vec<_>>>()?;
2915
2916 query_builder.push_values(events, |mut b, (l1_block, log_index, event)| {
2917 b.push_bind(l1_block).push_bind(log_index).push_bind(event);
2918 });
2919
2920 query_builder.push(" ON CONFLICT DO NOTHING");
2921 let query = query_builder.build();
2922
2923 query.execute(tx.as_mut()).await?;
2924 }
2925
2926 tx.upsert(
2928 "stake_table_events_l1_block",
2929 ["id", "last_l1_block"],
2930 ["id"],
2931 [(0_i32, l1_finalized as i64)],
2932 )
2933 .await?;
2934
2935 tx.commit().await?;
2936
2937 Ok(())
2938 }
2939
2940 async fn load_events(
2951 &self,
2952 from_l1_block: u64,
2953 to_l1_block: u64,
2954 ) -> anyhow::Result<(
2955 Option<EventsPersistenceRead>,
2956 Vec<(EventKey, StakeTableEvent)>,
2957 )> {
2958 let mut tx = self.db.read().await?;
2959
2960 let res = query_as::<(i64,)>(
2962 "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
2963 )
2964 .fetch_optional(tx.as_mut())
2965 .await?;
2966
2967 let Some((last_processed_l1_block,)) = res else {
2968 return Ok((None, Vec::new()));
2970 };
2971
2972 let to_l1_block = to_l1_block.try_into()?;
2976 let query_l1_block = if last_processed_l1_block > to_l1_block {
2977 to_l1_block
2978 } else {
2979 last_processed_l1_block
2980 };
2981
2982 let rows = query(
2983 "SELECT l1_block, log_index, event FROM stake_table_events WHERE $1 <= l1_block AND \
2984 l1_block <= $2 ORDER BY l1_block ASC, log_index ASC",
2985 )
2986 .bind(i64::try_from(from_l1_block)?)
2987 .bind(query_l1_block)
2988 .fetch_all(tx.as_mut())
2989 .await?;
2990
2991 let events = rows
2992 .into_iter()
2993 .map(|row| {
2994 let l1_block: i64 = row.try_get("l1_block")?;
2995 let log_index: i64 = row.try_get("log_index")?;
2996 let event = serde_json::from_value(row.try_get("event")?)?;
2997
2998 Ok(((l1_block.try_into()?, log_index.try_into()?), event))
2999 })
3000 .collect::<anyhow::Result<Vec<_>>>()?;
3001
3002 if query_l1_block == to_l1_block {
3006 Ok((Some(EventsPersistenceRead::Complete), events))
3007 } else {
3008 Ok((
3009 Some(EventsPersistenceRead::UntilL1Block(
3010 query_l1_block.try_into()?,
3011 )),
3012 events,
3013 ))
3014 }
3015 }
3016
3017 async fn delete_stake_tables(&self) -> anyhow::Result<()> {
3018 let mut tx = self.db.write().await?;
3019 #[cfg(not(feature = "embedded-db"))]
3020 query(
3021 "TRUNCATE stake_table_events, stake_table_events_l1_block, epoch_drb_and_root, \
3022 stake_table_validators",
3023 )
3024 .execute(tx.as_mut())
3025 .await?;
3026 #[cfg(feature = "embedded-db")]
3027 {
3028 query("DELETE FROM stake_table_events")
3029 .execute(tx.as_mut())
3030 .await?;
3031 query("DELETE FROM stake_table_events_l1_block")
3032 .execute(tx.as_mut())
3033 .await?;
3034 query("DELETE FROM epoch_drb_and_root")
3035 .execute(tx.as_mut())
3036 .await?;
3037 query("DELETE FROM stake_table_validators")
3038 .execute(tx.as_mut())
3039 .await?;
3040 }
3041 tx.commit().await?;
3042 Ok(())
3043 }
3044
3045 async fn store_all_validators(
3046 &self,
3047 epoch: EpochNumber,
3048 all_validators: RegisteredValidatorMap,
3049 ) -> anyhow::Result<()> {
3050 let mut tx = self.db.write().await?;
3051
3052 if all_validators.is_empty() {
3053 return Ok(());
3054 }
3055
3056 let mut query_builder =
3057 QueryBuilder::new("INSERT INTO stake_table_validators (epoch, address, validator) ");
3058
3059 query_builder.push_values(all_validators, |mut b, (address, validator)| {
3060 let validator_json =
3061 serde_json::to_value(&validator).expect("cannot serialize validator to json");
3062 b.push_bind(epoch.u64() as i64)
3063 .push_bind(address.to_string())
3064 .push_bind(validator_json);
3065 });
3066
3067 query_builder
3068 .push(" ON CONFLICT (epoch, address) DO UPDATE SET validator = EXCLUDED.validator");
3069
3070 let query = query_builder.build();
3071
3072 query.execute(tx.as_mut()).await?;
3073
3074 tx.commit().await?;
3075 Ok(())
3076 }
3077
3078 async fn load_all_validators(
3079 &self,
3080 epoch: EpochNumber,
3081 offset: u64,
3082 limit: u64,
3083 ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>> {
3084 let mut tx = self.db.read().await?;
3085
3086 let rows = query(
3090 "SELECT address, validator
3091 FROM stake_table_validators
3092 WHERE epoch = $1
3093 ORDER BY LOWER(address) ASC
3094 LIMIT $2 OFFSET $3",
3095 )
3096 .bind(epoch.u64() as i64)
3097 .bind(limit as i64)
3098 .bind(offset as i64)
3099 .fetch_all(tx.as_mut())
3100 .await?;
3101 rows.into_iter()
3102 .map(|row| {
3103 let validator_json: serde_json::Value = row.try_get("validator")?;
3104 serde_json::from_value::<RegisteredValidator<PubKey>>(validator_json)
3105 .map_err(Into::into)
3106 })
3107 .collect()
3108 }
3109}
3110
3111#[async_trait]
3112impl DhtPersistentStorage for Persistence {
3113 async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
3119 let to_save =
3121 bincode::serialize(&records).with_context(|| "failed to serialize records")?;
3122
3123 let stmt = "INSERT INTO libp2p_dht (id, serialized_records) VALUES (0, $1) ON CONFLICT \
3125 (id) DO UPDATE SET serialized_records = $1";
3126
3127 let mut tx = self
3129 .db
3130 .write()
3131 .await
3132 .with_context(|| "failed to start an atomic DB transaction")?;
3133 tx.execute(query(stmt).bind(to_save))
3134 .await
3135 .with_context(|| "failed to execute DB query")?;
3136
3137 tx.commit().await.with_context(|| "failed to commit to DB")
3139 }
3140
3141 async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
3147 let result = self
3149 .db
3150 .read()
3151 .await
3152 .with_context(|| "failed to start a DB read transaction")?
3153 .fetch_one("SELECT * FROM libp2p_dht where id = 0")
3154 .await
3155 .with_context(|| "failed to fetch from DB")?;
3156
3157 let serialied_records: Vec<u8> = result.get("serialized_records");
3159
3160 let records: Vec<SerializableRecord> = bincode::deserialize(&serialied_records)
3162 .with_context(|| "Failed to deserialize records")?;
3163
3164 Ok(records)
3165 }
3166}
3167
3168#[async_trait]
3169impl Provider<SeqTypes, VidCommonRequest> for Persistence {
3170 #[tracing::instrument(skip(self))]
3171 async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
3172 let mut tx = match self.db.read().await {
3173 Ok(tx) => tx,
3174 Err(err) => {
3175 tracing::warn!("could not open transaction: {err:#}");
3176 return None;
3177 },
3178 };
3179
3180 let bytes = match query_as::<(Vec<u8>,)>(
3181 "SELECT data FROM vid_share2 WHERE payload_hash = $1 LIMIT 1",
3182 )
3183 .bind(req.0.to_string())
3184 .fetch_optional(tx.as_mut())
3185 .await
3186 {
3187 Ok(Some((bytes,))) => bytes,
3188 Ok(None) => return None,
3189 Err(err) => {
3190 tracing::error!("error loading VID share: {err:#}");
3191 return None;
3192 },
3193 };
3194
3195 let share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
3196 match bincode::deserialize(&bytes) {
3197 Ok(share) => share,
3198 Err(err) => {
3199 tracing::warn!("error decoding VID share: {err:#}");
3200 return None;
3201 },
3202 };
3203
3204 match share.data {
3205 VidDisperseShare::V0(vid) => Some(VidCommon::V0(vid.common)),
3206 VidDisperseShare::V1(vid) => Some(VidCommon::V1(vid.common)),
3207 VidDisperseShare::V2(vid) => Some(VidCommon::V2(vid.common)),
3208 }
3209 }
3210}
3211
3212#[async_trait]
3213impl Provider<SeqTypes, PayloadRequest> for Persistence {
3214 #[tracing::instrument(skip(self))]
3215 async fn fetch(&self, req: PayloadRequest) -> Option<Payload> {
3216 let mut tx = match self.db.read().await {
3217 Ok(tx) => tx,
3218 Err(err) => {
3219 tracing::warn!("could not open transaction: {err:#}");
3220 return None;
3221 },
3222 };
3223
3224 let bytes = match query_as::<(Vec<u8>,)>(
3225 "SELECT data FROM da_proposal2 WHERE payload_hash = $1 LIMIT 1",
3226 )
3227 .bind(req.0.to_string())
3228 .fetch_optional(tx.as_mut())
3229 .await
3230 {
3231 Ok(Some((bytes,))) => bytes,
3232 Ok(None) => return None,
3233 Err(err) => {
3234 tracing::warn!("error loading DA proposal: {err:#}");
3235 return None;
3236 },
3237 };
3238
3239 let proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> = match bincode::deserialize(&bytes)
3240 {
3241 Ok(proposal) => proposal,
3242 Err(err) => {
3243 tracing::error!("error decoding DA proposal: {err:#}");
3244 return None;
3245 },
3246 };
3247
3248 Some(Payload::from_bytes(
3249 &proposal.data.encoded_transactions,
3250 &proposal.data.metadata,
3251 ))
3252 }
3253}
3254
3255#[async_trait]
3256impl Provider<SeqTypes, LeafRequest<SeqTypes>> for Persistence {
3257 #[tracing::instrument(skip(self))]
3258 async fn fetch(&self, req: LeafRequest<SeqTypes>) -> Option<LeafQueryData<SeqTypes>> {
3259 let mut tx = match self.db.read().await {
3260 Ok(tx) => tx,
3261 Err(err) => {
3262 tracing::warn!("could not open transaction: {err:#}");
3263 return None;
3264 },
3265 };
3266
3267 let (leaf, qc) = match fetch_leaf_from_proposals(&mut tx, req).await {
3268 Ok(res) => res?,
3269 Err(err) => {
3270 tracing::info!("requested leaf not found in undecided proposals: {err:#}");
3271 return None;
3272 },
3273 };
3274
3275 match LeafQueryData::new(leaf, qc) {
3276 Ok(leaf) => Some(leaf),
3277 Err(err) => {
3278 tracing::warn!("fetched invalid leaf: {err:#}");
3279 None
3280 },
3281 }
3282 }
3283}
3284
3285async fn fetch_leaf_from_proposals<Mode: TransactionMode>(
3286 tx: &mut Transaction<Mode>,
3287 req: LeafRequest<SeqTypes>,
3288) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
3289 let Some((proposal_bytes,)) =
3291 query_as::<(Vec<u8>,)>("SELECT data FROM quorum_proposals2 WHERE leaf_hash = $1 LIMIT 1")
3292 .bind(req.expected_leaf.to_string())
3293 .fetch_optional(tx.as_mut())
3294 .await
3295 .context("fetching proposal")?
3296 else {
3297 return Ok(None);
3298 };
3299
3300 let Some((qc_bytes,)) =
3302 query_as::<(Vec<u8>,)>("SELECT data FROM quorum_certificate2 WHERE leaf_hash = $1 LIMIT 1")
3303 .bind(req.expected_leaf.to_string())
3304 .fetch_optional(tx.as_mut())
3305 .await
3306 .context("fetching QC")?
3307 else {
3308 return Ok(None);
3309 };
3310
3311 let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
3312 bincode::deserialize(&proposal_bytes).context("deserializing quorum proposal")?;
3313 let qc: QuorumCertificate2<SeqTypes> =
3314 bincode::deserialize(&qc_bytes).context("deserializing quorum certificate")?;
3315
3316 let leaf = Leaf2::from_quorum_proposal(&proposal.data);
3317 Ok(Some((leaf, qc)))
3318}
3319
3320#[cfg(test)]
3321mod testing {
3322 use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
3323
3324 use super::*;
3325 use crate::persistence::tests::TestablePersistence;
3326
3327 #[async_trait]
3328 impl TestablePersistence for Persistence {
3329 type Storage = Arc<TmpDb>;
3330
3331 async fn tmp_storage() -> Self::Storage {
3332 Arc::new(TmpDb::init().await)
3333 }
3334
3335 #[allow(refining_impl_trait)]
3336 fn options(db: &Self::Storage) -> Options {
3337 #[cfg(not(feature = "embedded-db"))]
3338 {
3339 PostgresOptions {
3340 port: Some(db.port()),
3341 host: Some(db.host()),
3342 user: Some("postgres".into()),
3343 password: Some("password".into()),
3344 ..Default::default()
3345 }
3346 .into()
3347 }
3348
3349 #[cfg(feature = "embedded-db")]
3350 {
3351 SqliteOptions { path: db.path() }.into()
3352 }
3353 }
3354 }
3355}
3356
3357#[cfg(test)]
3358mod test {
3359 use committable::{Commitment, CommitmentBoundsArkless};
3360 use espresso_types::{Header, Leaf, NodeState, ValidatedState, traits::NullEventConsumer};
3361 use futures::stream::TryStreamExt;
3362 use hotshot_example_types::node_types::TEST_VERSIONS;
3363 use hotshot_types::{
3364 data::{
3365 EpochNumber, QuorumProposal2, ns_table::parse_ns_table,
3366 vid_disperse::AvidMDisperseShare,
3367 },
3368 message::convert_proposal,
3369 simple_certificate::QuorumCertificate,
3370 simple_vote::QuorumData,
3371 traits::{
3372 EncodeBytes,
3373 block_contents::{BlockHeader, GENESIS_VID_NUM_STORAGE_NODES},
3374 signature_key::SignatureKey,
3375 },
3376 utils::EpochTransitionIndicator,
3377 vid::{
3378 advz::advz_scheme,
3379 avidm::{AvidMScheme, init_avidm_param},
3380 },
3381 };
3382 use jf_advz::VidScheme;
3383
3384 use super::*;
3385 use crate::{BLSPubKey, PubKey, persistence::tests::TestablePersistence as _};
3386
3387 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3388 async fn test_quorum_proposals_leaf_hash_migration() {
3389 let leaf: Leaf2 = Leaf::genesis(
3391 &ValidatedState::default(),
3392 &NodeState::mock(),
3393 TEST_VERSIONS.test.base,
3394 )
3395 .await
3396 .into();
3397 let privkey = BLSPubKey::generated_from_seed_indexed([0; 32], 1).1;
3398 let signature = PubKey::sign(&privkey, &[]).unwrap();
3399 let mut quorum_proposal = Proposal {
3400 data: QuorumProposal2::<SeqTypes> {
3401 epoch: None,
3402 block_header: leaf.block_header().clone(),
3403 view_number: ViewNumber::genesis(),
3404 justify_qc: QuorumCertificate::genesis(
3405 &ValidatedState::default(),
3406 &NodeState::mock(),
3407 TEST_VERSIONS.test,
3408 )
3409 .await
3410 .to_qc2(),
3411 upgrade_certificate: None,
3412 view_change_evidence: None,
3413 next_drb_result: None,
3414 next_epoch_justify_qc: None,
3415 state_cert: None,
3416 },
3417 signature,
3418 _pd: Default::default(),
3419 };
3420
3421 let qp1: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
3422 convert_proposal(quorum_proposal.clone());
3423
3424 quorum_proposal.data.view_number = ViewNumber::new(1);
3425
3426 let qp2: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
3427 convert_proposal(quorum_proposal.clone());
3428 let qps = [qp1, qp2];
3429
3430 let db = Persistence::tmp_storage().await;
3432 let persistence = Persistence::connect(&db).await;
3433 let mut tx = persistence.db.write().await.unwrap();
3434 let params = qps
3435 .iter()
3436 .map(|qp| {
3437 (
3438 qp.data.view_number.u64() as i64,
3439 bincode::serialize(&qp).unwrap(),
3440 )
3441 })
3442 .collect::<Vec<_>>();
3443 tx.upsert("quorum_proposals", ["view", "data"], ["view"], params)
3444 .await
3445 .unwrap();
3446 tx.commit().await.unwrap();
3447
3448 let persistence = Persistence::connect(&db).await;
3450 let mut tx = persistence.db.read().await.unwrap();
3451 let rows = tx
3452 .fetch("SELECT * FROM quorum_proposals ORDER BY view ASC")
3453 .try_collect::<Vec<_>>()
3454 .await
3455 .unwrap();
3456 assert_eq!(rows.len(), qps.len());
3457 for (row, qp) in rows.into_iter().zip(qps) {
3458 assert_eq!(row.get::<i64, _>("view"), qp.data.view_number.u64() as i64);
3459 assert_eq!(
3460 row.get::<Vec<u8>, _>("data"),
3461 bincode::serialize(&qp).unwrap()
3462 );
3463 assert_eq!(
3464 row.get::<String, _>("leaf_hash"),
3465 Committable::commit(&Leaf::from_quorum_proposal(&qp.data)).to_string()
3466 );
3467 }
3468 }
3469
3470 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3471 async fn test_x25519_keys_migration() {
3472 use std::collections::HashMap;
3473
3474 use crate::persistence::RegisteredValidatorNoX25519;
3475
3476 let mut validator = RegisteredValidator::mock();
3477 validator.delegators.clear();
3478 validator.stake = alloy::primitives::U256::from(1000u64);
3479
3480 let epoch = 1i64;
3481 let address = validator.account;
3482
3483 let legacy = RegisteredValidatorNoX25519 {
3485 account: validator.account,
3486 stake_table_key: validator.stake_table_key,
3487 state_ver_key: validator.state_ver_key.clone(),
3488 stake: validator.stake,
3489 commission: validator.commission,
3490 delegators: HashMap::new(),
3491 authenticated: true,
3492 };
3493
3494 let mut legacy_map: IndexMap<Address, RegisteredValidatorNoX25519> = IndexMap::new();
3496 legacy_map.insert(address, legacy);
3497 let stake_bytes = bincode::serialize(&legacy_map).unwrap();
3498
3499 let json_legacy = RegisteredValidatorNoX25519 {
3501 account: validator.account,
3502 stake_table_key: validator.stake_table_key,
3503 state_ver_key: validator.state_ver_key.clone(),
3504 stake: validator.stake,
3505 commission: validator.commission,
3506 delegators: HashMap::new(),
3507 authenticated: true,
3508 };
3509 let validator_json = serde_json::to_value(&json_legacy).unwrap();
3510
3511 let db = Persistence::tmp_storage().await;
3512 let persistence = Persistence::connect(&db).await;
3513 let mut tx = persistence.db.write().await.unwrap();
3514
3515 tx.execute(
3516 query(
3517 "INSERT INTO stake_table_validators (epoch, address, validator) VALUES ($1, $2, \
3518 $3)",
3519 )
3520 .bind(epoch)
3521 .bind(format!("{:?}", address))
3522 .bind(&validator_json),
3523 )
3524 .await
3525 .unwrap();
3526
3527 tx.execute(
3528 query("INSERT INTO epoch_drb_and_root (epoch, stake) VALUES ($1, $2)")
3529 .bind(epoch)
3530 .bind(&stake_bytes),
3531 )
3532 .await
3533 .unwrap();
3534
3535 tx.execute(query(
3537 "UPDATE data_migrations SET completed = false, migrated_rows = 0 WHERE name = \
3538 'x25519_keys'",
3539 ))
3540 .await
3541 .unwrap();
3542
3543 tx.commit().await.unwrap();
3544
3545 {
3547 let mut tx = persistence.db.read().await.unwrap();
3548 let row: (serde_json::Value,) =
3549 query_as("SELECT validator FROM stake_table_validators WHERE epoch = $1")
3550 .bind(epoch)
3551 .fetch_one(tx.as_mut())
3552 .await
3553 .unwrap();
3554 let json_obj = row.0.as_object().unwrap();
3555 assert!(!json_obj.contains_key("x25519_key"));
3556 }
3557
3558 let persistence = Persistence::connect(&db).await;
3560 persistence.migrate_storage().await.unwrap();
3561
3562 {
3564 let mut tx = persistence.db.read().await.unwrap();
3565 let row: (serde_json::Value,) =
3566 query_as("SELECT validator FROM stake_table_validators WHERE epoch = $1")
3567 .bind(epoch)
3568 .fetch_one(tx.as_mut())
3569 .await
3570 .unwrap();
3571 let json_obj = row.0.as_object().unwrap();
3572 assert!(json_obj.contains_key("x25519_key"));
3573 assert!(json_obj.get("x25519_key").unwrap().is_null());
3574 }
3575
3576 {
3578 let mut tx = persistence.db.read().await.unwrap();
3579 let row: (Vec<u8>,) = query_as("SELECT stake FROM epoch_drb_and_root WHERE epoch = $1")
3580 .bind(epoch)
3581 .fetch_one(tx.as_mut())
3582 .await
3583 .unwrap();
3584 let migrated_map: AuthenticatedValidatorMap = bincode::deserialize(&row.0).unwrap();
3585 assert!(migrated_map.contains_key(&address));
3586 let v = migrated_map.get(&address).unwrap();
3587 assert!(v.x25519_key.is_none());
3588 }
3589
3590 {
3592 let mut tx = persistence.db.read().await.unwrap();
3593 let row: (bool, i64) = query_as(
3594 "SELECT completed, migrated_rows FROM data_migrations WHERE name = 'x25519_keys' \
3595 AND table_name = 'epoch_drb_and_root'",
3596 )
3597 .fetch_one(tx.as_mut())
3598 .await
3599 .unwrap();
3600 assert!(row.0);
3601 assert_eq!(row.1, 1);
3602 }
3603 }
3604
3605 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3606 async fn test_store_all_validators_authenticated_and_unauthenticated() {
3607 use std::collections::HashMap;
3608
3609 use alloy::primitives::{Address, U256};
3610 use hotshot_types::light_client::StateVerKey;
3611 use indexmap::IndexMap;
3612
3613 let tmp = Persistence::tmp_storage().await;
3614 let storage = Persistence::connect(&tmp).await;
3615
3616 let authenticated_validator = RegisteredValidator {
3618 account: Address::random(),
3619 stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
3620 state_ver_key: StateVerKey::default(),
3621 stake: U256::from(1000),
3622 commission: 100,
3623 delegators: HashMap::new(),
3624 authenticated: true,
3625 x25519_key: None,
3626 p2p_addr: None,
3627 };
3628
3629 let unauthenticated_validator = RegisteredValidator {
3631 account: Address::random(),
3632 stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 1).0,
3633 state_ver_key: StateVerKey::default(),
3634 stake: U256::from(2000),
3635 commission: 200,
3636 delegators: HashMap::new(),
3637 authenticated: false,
3638 x25519_key: None,
3639 p2p_addr: None,
3640 };
3641
3642 let mut validators: IndexMap<Address, RegisteredValidator<BLSPubKey>> = IndexMap::new();
3643 validators.insert(
3644 authenticated_validator.account,
3645 authenticated_validator.clone(),
3646 );
3647 validators.insert(
3648 unauthenticated_validator.account,
3649 unauthenticated_validator.clone(),
3650 );
3651
3652 storage
3654 .store_all_validators(EpochNumber::new(1), validators)
3655 .await
3656 .unwrap();
3657
3658 let loaded = storage
3660 .load_all_validators(EpochNumber::new(1), 0, 100)
3661 .await
3662 .unwrap();
3663 assert_eq!(loaded.len(), 2);
3664
3665 let loaded_auth = loaded
3667 .iter()
3668 .find(|v| v.account == authenticated_validator.account)
3669 .unwrap();
3670 assert!(
3671 loaded_auth.authenticated,
3672 "authenticated validator should remain authenticated"
3673 );
3674
3675 let loaded_unauth = loaded
3676 .iter()
3677 .find(|v| v.account == unauthenticated_validator.account)
3678 .unwrap();
3679 assert!(
3680 !loaded_unauth.authenticated,
3681 "unauthenticated validator should remain unauthenticated"
3682 );
3683 }
3684
3685 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3686 async fn test_fetching_providers() {
3687 let tmp = Persistence::tmp_storage().await;
3688 let storage = Persistence::connect(&tmp).await;
3689
3690 let leaf = Leaf2::genesis(
3692 &ValidatedState::default(),
3693 &NodeState::mock(),
3694 TEST_VERSIONS.test.base,
3695 )
3696 .await;
3697 let leaf_payload = leaf.block_payload().unwrap();
3698 let leaf_payload_bytes_arc = leaf_payload.encode();
3699
3700 let avidm_param = init_avidm_param(2).unwrap();
3701 let weights = vec![1u32; 2];
3702
3703 let ns_table = parse_ns_table(
3704 leaf_payload.byte_len().as_usize(),
3705 &leaf_payload.ns_table().encode(),
3706 );
3707 let (payload_commitment, shares) =
3708 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
3709 .unwrap();
3710 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
3711 let vid_share = convert_proposal(
3712 AvidMDisperseShare::<SeqTypes> {
3713 view_number: ViewNumber::new(0),
3714 payload_commitment,
3715 share: shares[0].clone(),
3716 recipient_key: pubkey,
3717 epoch: None,
3718 target_epoch: None,
3719 common: avidm_param.clone(),
3720 }
3721 .to_proposal(&privkey)
3722 .unwrap()
3723 .clone(),
3724 );
3725
3726 let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
3727 proposal: QuorumProposal2::<SeqTypes> {
3728 block_header: leaf.block_header().clone(),
3729 view_number: leaf.view_number(),
3730 justify_qc: leaf.justify_qc(),
3731 upgrade_certificate: None,
3732 view_change_evidence: None,
3733 next_drb_result: None,
3734 next_epoch_justify_qc: None,
3735 epoch: None,
3736 state_cert: None,
3737 },
3738 };
3739 let quorum_proposal_signature =
3740 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3741 .expect("Failed to sign quorum proposal");
3742 let quorum_proposal = Proposal {
3743 data: quorum_proposal,
3744 signature: quorum_proposal_signature,
3745 _pd: Default::default(),
3746 };
3747
3748 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
3749 .expect("Failed to sign block payload");
3750 let da_proposal = Proposal {
3751 data: DaProposal2::<SeqTypes> {
3752 encoded_transactions: leaf_payload_bytes_arc,
3753 metadata: leaf_payload.ns_table().clone(),
3754 view_number: ViewNumber::new(0),
3755 epoch: None,
3756 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
3757 },
3758 signature: block_payload_signature,
3759 _pd: Default::default(),
3760 };
3761
3762 let mut next_quorum_proposal = quorum_proposal.clone();
3763 next_quorum_proposal.data.proposal.view_number += 1;
3764 next_quorum_proposal.data.proposal.justify_qc.view_number += 1;
3765 next_quorum_proposal
3766 .data
3767 .proposal
3768 .justify_qc
3769 .data
3770 .leaf_commit = Committable::commit(&leaf.clone());
3771 let qc = next_quorum_proposal.data.justify_qc();
3772
3773 storage
3775 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
3776 .await
3777 .unwrap();
3778 storage.append_vid(&vid_share).await.unwrap();
3779 storage
3780 .append_quorum_proposal2(&quorum_proposal)
3781 .await
3782 .unwrap();
3783
3784 storage
3786 .append_quorum_proposal2(&next_quorum_proposal)
3787 .await
3788 .unwrap();
3789
3790 assert_eq!(
3792 Some(VidCommon::V1(avidm_param)),
3793 storage
3794 .fetch(VidCommonRequest(vid_share.data.payload_commitment()))
3795 .await
3796 );
3797 assert_eq!(
3798 leaf_payload,
3799 storage
3800 .fetch(PayloadRequest(vid_share.data.payload_commitment()))
3801 .await
3802 .unwrap()
3803 );
3804 assert_eq!(
3805 LeafQueryData::new(leaf.clone(), qc.clone()).unwrap(),
3806 storage
3807 .fetch(LeafRequest::new(
3808 leaf.block_header().block_number(),
3809 Committable::commit(&leaf),
3810 qc.clone().commit()
3811 ))
3812 .await
3813 .unwrap()
3814 );
3815 }
3816
3817 async fn test_pruning_helper(pruning_opt: ConsensusPruningOptions) {
3825 let tmp = Persistence::tmp_storage().await;
3826 let mut opt = Persistence::options(&tmp);
3827 opt.consensus_pruning = pruning_opt;
3828 let storage = opt.create().await.unwrap();
3829
3830 let data_view = ViewNumber::new(1);
3831
3832 let leaf = Leaf2::genesis(
3834 &ValidatedState::default(),
3835 &NodeState::mock(),
3836 TEST_VERSIONS.test.base,
3837 )
3838 .await;
3839 let leaf_payload = leaf.block_payload().unwrap();
3840 let leaf_payload_bytes_arc = leaf_payload.encode();
3841
3842 let avidm_param = init_avidm_param(2).unwrap();
3843 let weights = vec![1u32; 2];
3844
3845 let ns_table = parse_ns_table(
3846 leaf_payload.byte_len().as_usize(),
3847 &leaf_payload.ns_table().encode(),
3848 );
3849 let (payload_commitment, shares) =
3850 AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
3851 .unwrap();
3852
3853 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
3854 let vid = convert_proposal(
3855 AvidMDisperseShare::<SeqTypes> {
3856 view_number: data_view,
3857 payload_commitment,
3858 share: shares[0].clone(),
3859 recipient_key: pubkey,
3860 epoch: None,
3861 target_epoch: None,
3862 common: avidm_param,
3863 }
3864 .to_proposal(&privkey)
3865 .unwrap()
3866 .clone(),
3867 );
3868 let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
3869 proposal: QuorumProposal2::<SeqTypes> {
3870 epoch: None,
3871 block_header: leaf.block_header().clone(),
3872 view_number: data_view,
3873 justify_qc: QuorumCertificate2::genesis(
3874 &ValidatedState::default(),
3875 &NodeState::mock(),
3876 TEST_VERSIONS.test,
3877 )
3878 .await,
3879 upgrade_certificate: None,
3880 view_change_evidence: None,
3881 next_drb_result: None,
3882 next_epoch_justify_qc: None,
3883 state_cert: None,
3884 },
3885 };
3886 let quorum_proposal_signature =
3887 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3888 .expect("Failed to sign quorum proposal");
3889 let quorum_proposal = Proposal {
3890 data: quorum_proposal,
3891 signature: quorum_proposal_signature,
3892 _pd: Default::default(),
3893 };
3894
3895 let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
3896 .expect("Failed to sign block payload");
3897 let da_proposal = Proposal {
3898 data: DaProposal2::<SeqTypes> {
3899 encoded_transactions: leaf_payload_bytes_arc.clone(),
3900 metadata: leaf_payload.ns_table().clone(),
3901 view_number: data_view,
3902 epoch: Some(EpochNumber::new(0)),
3903 epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
3904 },
3905 signature: block_payload_signature,
3906 _pd: Default::default(),
3907 };
3908
3909 tracing::info!(?vid, ?da_proposal, ?quorum_proposal, "append data");
3910 storage.append_vid(&vid).await.unwrap();
3911 storage
3912 .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
3913 .await
3914 .unwrap();
3915 storage
3916 .append_quorum_proposal2(&quorum_proposal)
3917 .await
3918 .unwrap();
3919
3920 tracing::info!("decide view 1");
3923 storage
3924 .append_decided_leaves(data_view + 1, [], None, &NullEventConsumer)
3925 .await
3926 .unwrap();
3927 assert_eq!(
3928 storage.load_vid_share(data_view).await.unwrap().unwrap(),
3929 vid
3930 );
3931 assert_eq!(
3932 storage.load_da_proposal(data_view).await.unwrap().unwrap(),
3933 da_proposal
3934 );
3935 assert_eq!(
3936 storage.load_quorum_proposal(data_view).await.unwrap(),
3937 quorum_proposal
3938 );
3939
3940 tracing::info!("decide view 2");
3943 storage
3944 .append_decided_leaves(data_view + 2, [], None, &NullEventConsumer)
3945 .await
3946 .unwrap();
3947 assert!(storage.load_vid_share(data_view).await.unwrap().is_none(),);
3948 assert!(storage.load_da_proposal(data_view).await.unwrap().is_none());
3949 storage.load_quorum_proposal(data_view).await.unwrap_err();
3950 }
3951
3952 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3953 async fn test_pruning_minimum_retention() {
3954 test_pruning_helper(ConsensusPruningOptions {
3955 target_usage: 0,
3958 minimum_retention: 1,
3959 target_retention: u64::MAX,
3962 })
3963 .await
3964 }
3965
3966 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3967 async fn test_pruning_target_retention() {
3968 test_pruning_helper(ConsensusPruningOptions {
3969 target_retention: 1,
3970 minimum_retention: 0,
3973 target_usage: u64::MAX,
3976 })
3977 .await
3978 }
3979
3980 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3981 async fn test_consensus_migration() {
3982 let tmp = Persistence::tmp_storage().await;
3983 let mut opt = Persistence::options(&tmp);
3984
3985 let storage = opt.create().await.unwrap();
3986
3987 let rows = 300;
3988
3989 assert!(storage.load_state_cert().await.unwrap().is_none());
3990
3991 for i in 0..rows {
3992 let view = ViewNumber::new(i);
3993 let validated_state = ValidatedState::default();
3994 let instance_state = NodeState::default();
3995
3996 let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
3997 let (payload, metadata) =
3998 Payload::from_transactions([], &validated_state, &instance_state)
3999 .await
4000 .unwrap();
4001
4002 let payload_bytes = payload.encode();
4003
4004 let block_header = Header::genesis(
4005 &instance_state,
4006 payload.clone(),
4007 &metadata,
4008 TEST_VERSIONS.test.base,
4009 );
4010
4011 let null_quorum_data = QuorumData {
4012 leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
4013 };
4014
4015 let justify_qc = QuorumCertificate::new(
4016 null_quorum_data.clone(),
4017 null_quorum_data.commit(),
4018 view,
4019 None,
4020 std::marker::PhantomData,
4021 );
4022
4023 let quorum_proposal = QuorumProposal {
4024 block_header,
4025 view_number: view,
4026 justify_qc: justify_qc.clone(),
4027 upgrade_certificate: None,
4028 proposal_certificate: None,
4029 };
4030
4031 let quorum_proposal_signature =
4032 BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
4033 .expect("Failed to sign quorum proposal");
4034
4035 let proposal = Proposal {
4036 data: quorum_proposal.clone(),
4037 signature: quorum_proposal_signature,
4038 _pd: std::marker::PhantomData::<SeqTypes>,
4039 };
4040
4041 let proposal_bytes = bincode::serialize(&proposal)
4042 .context("serializing proposal")
4043 .unwrap();
4044
4045 let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
4046 leaf.fill_block_payload(
4047 payload,
4048 GENESIS_VID_NUM_STORAGE_NODES,
4049 TEST_VERSIONS.test.base,
4050 )
4051 .unwrap();
4052
4053 let mut tx = storage.db.write().await.unwrap();
4054
4055 let qc_bytes = bincode::serialize(&justify_qc).unwrap();
4056 let leaf_bytes = bincode::serialize(&leaf).unwrap();
4057
4058 tx.upsert(
4059 "anchor_leaf",
4060 ["view", "leaf", "qc"],
4061 ["view"],
4062 [(i as i64, leaf_bytes, qc_bytes)],
4063 )
4064 .await
4065 .unwrap();
4066
4067 let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
4068 epoch: EpochNumber::new(i),
4069 light_client_state: Default::default(), next_stake_table_state: Default::default(), signatures: vec![], auth_root: Default::default(),
4073 };
4074 let state_cert_bytes = bincode::serialize(&state_cert).unwrap();
4076 tx.upsert(
4077 "finalized_state_cert",
4078 ["epoch", "state_cert"],
4079 ["epoch"],
4080 [(i as i64, state_cert_bytes)],
4081 )
4082 .await
4083 .unwrap();
4084
4085 tx.commit().await.unwrap();
4086
4087 let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
4088 .disperse(payload_bytes.clone())
4089 .unwrap();
4090
4091 let vid = VidDisperseShare0::<SeqTypes> {
4092 view_number: ViewNumber::new(i),
4093 payload_commitment: Default::default(),
4094 share: disperse.shares[0].clone(),
4095 common: disperse.common,
4096 recipient_key: pubkey,
4097 };
4098
4099 let (payload, metadata) =
4100 Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
4101 .await
4102 .unwrap();
4103
4104 let da = DaProposal::<SeqTypes> {
4105 encoded_transactions: payload.encode(),
4106 metadata,
4107 view_number: ViewNumber::new(i),
4108 };
4109
4110 let block_payload_signature =
4111 BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
4112
4113 let da_proposal = Proposal {
4114 data: da,
4115 signature: block_payload_signature,
4116 _pd: Default::default(),
4117 };
4118
4119 storage
4120 .append_vid(&convert_proposal(vid.to_proposal(&privkey).unwrap()))
4121 .await
4122 .unwrap();
4123 storage
4124 .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
4125 .await
4126 .unwrap();
4127
4128 let leaf_hash = Committable::commit(&leaf);
4129 let mut tx = storage.db.write().await.expect("failed to start write tx");
4130 tx.upsert(
4131 "quorum_proposals",
4132 ["view", "leaf_hash", "data"],
4133 ["view"],
4134 [(i as i64, leaf_hash.to_string(), proposal_bytes)],
4135 )
4136 .await
4137 .expect("failed to upsert quorum proposal");
4138
4139 let justify_qc = &proposal.data.justify_qc;
4140 let justify_qc_bytes = bincode::serialize(&justify_qc)
4141 .context("serializing QC")
4142 .unwrap();
4143 tx.upsert(
4144 "quorum_certificate",
4145 ["view", "leaf_hash", "data"],
4146 ["view"],
4147 [(
4148 justify_qc.view_number.u64() as i64,
4149 justify_qc.data.leaf_commit.to_string(),
4150 &justify_qc_bytes,
4151 )],
4152 )
4153 .await
4154 .expect("failed to upsert qc");
4155
4156 tx.commit().await.expect("failed to commit");
4157 }
4158
4159 storage.migrate_storage().await.unwrap();
4160
4161 let mut tx = storage.db.read().await.unwrap();
4162 let (anchor_leaf2_count,) = query_as::<(i64,)>("SELECT COUNT(*) from anchor_leaf2")
4163 .fetch_one(tx.as_mut())
4164 .await
4165 .unwrap();
4166 assert_eq!(
4167 anchor_leaf2_count, rows as i64,
4168 "anchor leaf count does not match rows",
4169 );
4170
4171 let (da_proposal_count,) = query_as::<(i64,)>("SELECT COUNT(*) from da_proposal2")
4172 .fetch_one(tx.as_mut())
4173 .await
4174 .unwrap();
4175 assert_eq!(
4176 da_proposal_count, rows as i64,
4177 "da proposal count does not match rows",
4178 );
4179
4180 let (vid_share_count,) = query_as::<(i64,)>("SELECT COUNT(*) from vid_share2")
4181 .fetch_one(tx.as_mut())
4182 .await
4183 .unwrap();
4184 assert_eq!(
4185 vid_share_count, rows as i64,
4186 "vid share count does not match rows"
4187 );
4188
4189 let (quorum_proposals_count,) =
4190 query_as::<(i64,)>("SELECT COUNT(*) from quorum_proposals2")
4191 .fetch_one(tx.as_mut())
4192 .await
4193 .unwrap();
4194 assert_eq!(
4195 quorum_proposals_count, rows as i64,
4196 "quorum proposals count does not match rows",
4197 );
4198
4199 let (quorum_certificates_count,) =
4200 query_as::<(i64,)>("SELECT COUNT(*) from quorum_certificate2")
4201 .fetch_one(tx.as_mut())
4202 .await
4203 .unwrap();
4204 assert_eq!(
4205 quorum_certificates_count, rows as i64,
4206 "quorum certificates count does not match rows",
4207 );
4208
4209 let (state_cert_count,) = query_as::<(i64,)>("SELECT COUNT(*) from finalized_state_cert")
4210 .fetch_one(tx.as_mut())
4211 .await
4212 .unwrap();
4213 assert_eq!(
4214 state_cert_count, rows as i64,
4215 "Light client state update certificates count does not match rows",
4216 );
4217 assert_eq!(
4218 storage.load_state_cert().await.unwrap().unwrap(),
4219 LightClientStateUpdateCertificateV2::<SeqTypes> {
4220 epoch: EpochNumber::new(rows - 1),
4221 light_client_state: Default::default(),
4222 next_stake_table_state: Default::default(),
4223 signatures: vec![],
4224 auth_root: Default::default(),
4225 },
4226 "Wrong light client state update certificate in the storage",
4227 );
4228
4229 storage.migrate_storage().await.unwrap();
4230 }
4231
4232 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4255 async fn test_store_events_empty() {
4256 let tmp = Persistence::tmp_storage().await;
4257 let mut opt = Persistence::options(&tmp);
4258 let storage = opt.create().await.unwrap();
4259
4260 assert_eq!(storage.load_events(0, 100).await.unwrap(), (None, vec![]));
4261
4262 for i in 1..=2 {
4264 tracing::info!(i, "update l1 height");
4265 storage.store_events(i, vec![]).await.unwrap();
4266 assert_eq!(
4267 storage.load_events(0, 100).await.unwrap(),
4268 (Some(EventsPersistenceRead::UntilL1Block(i)), vec![])
4269 );
4270 }
4271 }
4272}
4273
4274#[cfg(test)]
4275#[cfg(not(feature = "embedded-db"))]
4276mod postgres_tests {
4277 use espresso_types::{FeeAccount, Header, Leaf, NodeState, Transaction as Tx};
4278 use hotshot_example_types::node_types::TEST_VERSIONS;
4279 use hotshot_query_service::{
4280 availability::BlockQueryData, data_source::storage::UpdateAvailabilityStorage,
4281 };
4282 use hotshot_types::{
4283 data::vid_commitment,
4284 simple_certificate::QuorumCertificate,
4285 traits::{
4286 EncodeBytes,
4287 block_contents::{BlockHeader, BuilderFee, GENESIS_VID_NUM_STORAGE_NODES},
4288 election::Membership,
4289 signature_key::BuilderSignatureKey,
4290 },
4291 };
4292
4293 use super::*;
4294 use crate::persistence::tests::TestablePersistence as _;
4295
4296 async fn test_postgres_read_ns_table(instance_state: NodeState) {
4297 instance_state
4298 .coordinator
4299 .membership()
4300 .write()
4301 .await
4302 .set_first_epoch(EpochNumber::genesis(), Default::default());
4303
4304 let tmp = Persistence::tmp_storage().await;
4305 let mut opt = Persistence::options(&tmp);
4306 let storage = opt.create().await.unwrap();
4307
4308 let txs = [
4309 Tx::new(10001u32.into(), vec![1, 2, 3]),
4310 Tx::new(10001u32.into(), vec![4, 5, 6]),
4311 Tx::new(10009u32.into(), vec![7, 8, 9]),
4312 ];
4313
4314 let validated_state = Default::default();
4315 let justify_qc =
4316 QuorumCertificate::genesis(&validated_state, &instance_state, TEST_VERSIONS.test).await;
4317 let view_number: ViewNumber = justify_qc.view_number + 1;
4318 let parent_leaf = Leaf::genesis(&validated_state, &instance_state, TEST_VERSIONS.test.base)
4319 .await
4320 .into();
4321
4322 let (payload, ns_table) =
4323 Payload::from_transactions(txs.clone(), &validated_state, &instance_state)
4324 .await
4325 .unwrap();
4326 let payload_bytes = payload.encode();
4327 let payload_commitment = vid_commitment(
4328 &payload_bytes,
4329 &ns_table.encode(),
4330 GENESIS_VID_NUM_STORAGE_NODES,
4331 instance_state.current_version,
4332 );
4333 let builder_commitment = payload.builder_commitment(&ns_table);
4334 let (fee_account, fee_key) = FeeAccount::generated_from_seed_indexed([0; 32], 0);
4335 let fee_amount = 0;
4336 let fee_signature = FeeAccount::sign_fee(&fee_key, fee_amount, &ns_table).unwrap();
4337 let block_header = Header::new(
4338 &validated_state,
4339 &instance_state,
4340 &parent_leaf,
4341 payload_commitment,
4342 builder_commitment,
4343 ns_table,
4344 BuilderFee {
4345 fee_amount,
4346 fee_account,
4347 fee_signature,
4348 },
4349 instance_state.current_version,
4350 view_number.u64(),
4351 )
4352 .await
4353 .unwrap();
4354 let proposal = QuorumProposal {
4355 block_header: block_header.clone(),
4356 view_number,
4357 justify_qc: justify_qc.clone(),
4358 upgrade_certificate: None,
4359 proposal_certificate: None,
4360 };
4361 let leaf: Leaf2 = Leaf::from_quorum_proposal(&proposal).into();
4362 let mut qc = justify_qc.to_qc2();
4363 qc.data.leaf_commit = leaf.commit();
4364 qc.view_number = view_number;
4365
4366 let mut tx = storage.db.write().await.unwrap();
4367 tx.insert_leaf(&LeafQueryData::new(leaf, qc).unwrap())
4368 .await
4369 .unwrap();
4370 tx.insert_block(&BlockQueryData::<SeqTypes>::new(block_header, payload))
4371 .await
4372 .unwrap();
4373 tx.commit().await.unwrap();
4374
4375 let mut tx = storage.db.read().await.unwrap();
4376 let rows = query(
4377 "
4378 SELECT ns_id, read_ns_id(get_ns_table(h.data), t.ns_index) AS read_ns_id
4379 FROM header AS h
4380 JOIN transactions AS t ON t.block_height = h.height
4381 ORDER BY t.ns_index, t.position
4382 ",
4383 )
4384 .fetch_all(tx.as_mut())
4385 .await
4386 .unwrap();
4387 assert_eq!(rows.len(), txs.len());
4388 for (i, row) in rows.into_iter().enumerate() {
4389 let ns = u64::from(txs[i].namespace()) as i64;
4390 assert_eq!(row.get::<i64, _>("ns_id"), ns);
4391 assert_eq!(row.get::<i64, _>("read_ns_id"), ns);
4392 }
4393 }
4394
4395 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4396 async fn test_postgres_read_ns_table_v0_1() {
4397 test_postgres_read_ns_table(NodeState::mock()).await;
4398 }
4399
4400 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4401 async fn test_postgres_read_ns_table_v0_2() {
4402 test_postgres_read_ns_table(NodeState::mock_v2()).await;
4403 }
4404
4405 #[test_log::test(tokio::test(flavor = "multi_thread"))]
4406 async fn test_postgres_read_ns_table_v0_3() {
4407 test_postgres_read_ns_table(NodeState::mock_v3().with_epoch_height(0)).await;
4408 }
4409}