Skip to main content

espresso_node/persistence/
sql.rs

1use std::{
2    collections::BTreeMap,
3    path::PathBuf,
4    str::FromStr,
5    sync::Arc,
6    time::{Duration, Instant},
7};
8
9use alloy::primitives::Address;
10use anyhow::{Context, bail, ensure};
11use async_trait::async_trait;
12use clap::Parser;
13use committable::Committable;
14use derivative::Derivative;
15use derive_more::derive::{From, Into};
16use either::Either;
17use espresso_types::{
18    AuthenticatedValidatorMap, BackoffParams, BlockMerkleTree, FeeMerkleTree, Leaf, Leaf2,
19    NetworkConfig, Payload, PubKey, Ratio, RegisteredValidatorMap, StakeTableHash, parse_duration,
20    parse_size,
21    traits::{EventsPersistenceRead, MembershipPersistence, StakeTuple},
22    v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence, StateCatchup},
23    v0_3::{
24        AuthenticatedValidator, EventKey, IndexedStake, RegisteredValidator, RewardAmount,
25        StakeTableEvent,
26    },
27    v0_4::{REWARD_MERKLE_TREE_V2_HEIGHT, RewardAccountV2, RewardMerkleTreeV2},
28};
29use futures::stream::StreamExt;
30use hotshot::InitializerEpochInfo;
31use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
32    DhtPersistentStorage, SerializableRecord,
33};
34use hotshot_new_protocol::message::Certificate2;
35use hotshot_query_service::{
36    availability::BlockId,
37    data_source::{
38        Transaction as _, VersionedDataSource,
39        storage::{
40            AvailabilityStorage,
41            pruning::PrunerCfg,
42            sql::{
43                Config, Db, Read, SqlStorage, StorageConnectionType, Transaction, Write,
44                include_migrations, query_as, syntax_helpers::MAX_FN,
45            },
46        },
47    },
48    fetching::{
49        Provider,
50        request::{PayloadRequest, VidCommonRequest},
51    },
52    merklized_state::MerklizedState,
53};
54use hotshot_types::{
55    data::{
56        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
57        QuorumProposalWrapperLegacy, VidCommitment, VidCommon, VidDisperseShare, VidDisperseShare0,
58    },
59    drb::{DrbInput, DrbResult},
60    event::{Event, EventType, HotShotAction, LeafInfo},
61    message::{Proposal, convert_proposal},
62    new_protocol::CoordinatorEvent,
63    simple_certificate::{
64        CertificatePair, LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
65        NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
66    },
67    traits::{
68        block_contents::{BlockHeader, BlockPayload},
69        metrics::Metrics,
70    },
71    vote::HasViewNumber,
72};
73use indexmap::IndexMap;
74use itertools::Itertools;
75use jf_merkle_tree_compat::MerkleTreeScheme;
76use sqlx::{Executor, QueryBuilder, Row, query};
77
78use crate::{
79    NodeType, RECENT_STAKE_TABLES_LIMIT, SeqTypes, ViewNumber,
80    api::RewardMerkleTreeV2Data,
81    catchup::SqlStateCatchup,
82    persistence::{migrate_network_config, persistence_metrics::PersistenceMetricsValue},
83};
84
85/// Options for Postgres-backed persistence.
86#[derive(Parser, Clone, Derivative)]
87#[derivative(Debug)]
88pub struct PostgresOptions {
89    /// Hostname for the remote Postgres database server.
90    #[clap(long, env = "ESPRESSO_NODE_POSTGRES_HOST")]
91    pub(crate) host: Option<String>,
92
93    /// Port for the remote Postgres database server.
94    #[clap(long, env = "ESPRESSO_NODE_POSTGRES_PORT")]
95    pub(crate) port: Option<u16>,
96
97    /// Name of database to connect to.
98    #[clap(long, env = "ESPRESSO_NODE_POSTGRES_DATABASE")]
99    pub(crate) database: Option<String>,
100
101    /// Postgres user to connect as.
102    #[clap(long, env = "ESPRESSO_NODE_POSTGRES_USER")]
103    pub(crate) user: Option<String>,
104
105    /// Password for Postgres user.
106    #[clap(long, env = "ESPRESSO_NODE_POSTGRES_PASSWORD")]
107    // Hide from debug output since may contain sensitive data.
108    #[derivative(Debug = "ignore")]
109    pub(crate) password: Option<String>,
110
111    /// Use TLS for an encrypted connection to the database.
112    #[clap(long, env = "ESPRESSO_NODE_POSTGRES_USE_TLS")]
113    pub(crate) use_tls: bool,
114
115    /// Disable `DEFERRABLE` on read transactions for the query service.
116    ///
117    /// When true, read transactions on Postgres start with `SERIALIZABLE READ ONLY` (no
118    /// `DEFERRABLE`), so they begin immediately rather than waiting for a safe serializable
119    /// snapshot. This trades start-up latency for the chance of a serialization-error retry,
120    /// and is opt-in.
121    #[clap(
122        long,
123        env = "ESPRESSO_NODE_POSTGRES_NO_DEFERRABLE",
124        default_value_t = false
125    )]
126    pub(crate) no_deferrable: bool,
127}
128
129impl Default for PostgresOptions {
130    fn default() -> Self {
131        Self::parse_from(std::iter::empty::<String>())
132    }
133}
134
135#[derive(Parser, Clone, Derivative, Default, From, Into)]
136#[derivative(Debug)]
137pub struct SqliteOptions {
138    /// Base directory for the SQLite database.
139    /// The SQLite file will be created in the `sqlite` subdirectory with filename as `database`.
140    #[clap(
141        long,
142        env = "ESPRESSO_NODE_STORAGE_PATH",
143        value_parser = build_sqlite_path
144    )]
145    pub(crate) path: PathBuf,
146}
147
148pub fn build_sqlite_path(path: &str) -> anyhow::Result<PathBuf> {
149    let sub_dir = PathBuf::from_str(path)?.join("sqlite");
150
151    // if `sqlite` sub dir does not exist then create it
152    if !sub_dir.exists() {
153        std::fs::create_dir_all(&sub_dir)
154            .with_context(|| format!("failed to create directory: {sub_dir:?}"))?;
155    }
156
157    Ok(sub_dir.join("database"))
158}
159
160/// Options for database-backed persistence, supporting both Postgres and SQLite.
161#[derive(Parser, Clone, Derivative, From, Into)]
162#[derivative(Debug)]
163pub struct Options {
164    #[cfg(not(feature = "embedded-db"))]
165    #[clap(flatten)]
166    pub(crate) postgres_options: PostgresOptions,
167
168    #[cfg(feature = "embedded-db")]
169    #[clap(flatten)]
170    pub(crate) sqlite_options: SqliteOptions,
171
172    /// Database URI for Postgres or SQLite.
173    ///
174    /// This is a shorthand for setting a number of other options all at once. The URI has the
175    /// following format ([brackets] indicate optional segments):
176    ///
177    /// - **Postgres:** `postgres[ql]://[username[:password]@][host[:port],]/database[?parameter_list]`
178    /// - **SQLite:** `sqlite://path/to/db.sqlite`
179    ///
180    /// Options set explicitly via other env vars or flags will take precedence, so you can use this
181    /// URI to set a baseline and then use other parameters to override or add configuration. In
182    /// addition, there are some parameters which cannot be set via the URI, such as TLS.
183    // Hide from debug output since may contain sensitive data.
184    #[derivative(Debug = "ignore")]
185    pub(crate) uri: Option<String>,
186
187    /// This will enable the pruner and set the default pruning parameters unless provided.
188    /// Default parameters:
189    /// - pruning_threshold: 3 TB
190    /// - minimum_retention: 1 day
191    /// - target_retention: 7 days
192    /// - batch_size: 1000
193    /// - max_usage: 80%
194    /// - interval: 1 hour
195    #[clap(long, env = "ESPRESSO_NODE_DATABASE_PRUNE")]
196    pub(crate) prune: bool,
197
198    /// Pruning parameters.
199    #[clap(flatten)]
200    pub(crate) pruning: PruningOptions,
201
202    /// Pruning parameters for ephemeral consensus storage.
203    #[clap(flatten)]
204    pub(crate) consensus_pruning: ConsensusPruningOptions,
205
206    /// Specifies the maximum number of concurrent fetch requests allowed from peers.
207    #[clap(long, env = "ESPRESSO_NODE_FETCH_RATE_LIMIT")]
208    pub(crate) fetch_rate_limit: Option<usize>,
209
210    /// The minimum delay between active fetches in a stream.
211    #[clap(long, env = "ESPRESSO_NODE_ACTIVE_FETCH_DELAY", value_parser = parse_duration)]
212    pub(crate) active_fetch_delay: Option<Duration>,
213
214    /// The minimum delay between loading chunks in a stream.
215    #[clap(long, env = "ESPRESSO_NODE_CHUNK_FETCH_DELAY", value_parser = parse_duration)]
216    pub(crate) chunk_fetch_delay: Option<Duration>,
217
218    /// The number of items to process in a single transaction when scanning the database for
219    /// missing objects.
220    #[clap(long, env = "ESPRESSO_NODE_SYNC_STATUS_CHUNK_SIZE")]
221    pub(crate) sync_status_chunk_size: Option<usize>,
222
223    /// Duration to cache sync status results for.
224    #[clap(long, env = "ESPRESSO_NODE_SYNC_STATUS_TTL", value_parser = parse_duration)]
225    pub(crate) sync_status_ttl: Option<Duration>,
226
227    /// The number of items to process at a time when scanning for proactive fetching.
228    #[clap(long, env = "ESPRESSO_NODE_PROACTIVE_SCAN_CHUNK_SIZE")]
229    pub(crate) proactive_scan_chunk_size: Option<usize>,
230
231    /// The time interval between proactive fetching scans.
232    #[clap(long, env = "ESPRESSO_NODE_PROACTIVE_SCAN_INTERVAL", value_parser = parse_duration)]
233    pub(crate) proactive_scan_interval: Option<Duration>,
234
235    /// Disable the proactive scanner task.
236    #[clap(long, env = "ESPRESSO_NODE_DISABLE_PROACTIVE_FETCHING")]
237    pub(crate) disable_proactive_fetching: bool,
238
239    /// Disable pruning and reconstruct previously pruned data.
240    ///
241    /// While running without pruning is the default behavior, the default will not try to
242    /// reconstruct data that was pruned in a previous run where pruning was enabled. This option
243    /// instructs the service to run without pruning _and_ reconstruct all previously pruned data by
244    /// fetching from peers.
245    #[clap(long, env = "ESPRESSO_NODE_ARCHIVE", conflicts_with = "prune")]
246    pub(crate) archive: bool,
247
248    /// Turns on leaf only data storage
249    #[clap(
250        long,
251        env = "ESPRESSO_NODE_LIGHTWEIGHT",
252        default_value_t = false,
253        conflicts_with = "archive"
254    )]
255    pub(crate) lightweight: bool,
256
257    /// The maximum idle time of a database connection.
258    ///
259    /// Any connection which has been open and unused longer than this duration will be
260    /// automatically closed to reduce load on the server.
261    #[clap(long, env = "ESPRESSO_NODE_DATABASE_IDLE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "10m")]
262    pub(crate) idle_connection_timeout: Duration,
263
264    /// The maximum lifetime of a database connection.
265    ///
266    /// Any connection which has been open longer than this duration will be automatically closed
267    /// (and, if needed, replaced), even if it is otherwise healthy. It is good practice to refresh
268    /// even healthy connections once in a while (e.g. daily) in case of resource leaks in the
269    /// server implementation.
270    #[clap(long, env = "ESPRESSO_NODE_DATABASE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "30m")]
271    pub(crate) connection_timeout: Duration,
272
273    #[clap(long, env = "ESPRESSO_NODE_DATABASE_SLOW_STATEMENT_THRESHOLD", value_parser = parse_duration, default_value = "1s")]
274    pub(crate) slow_statement_threshold: Duration,
275
276    /// The maximum time a single SQL statement is allowed to run before being canceled.
277    ///
278    /// This helps prevent queries from running indefinitely and consuming resources.
279    /// Set to 10 minutes by default
280    #[clap(long, env = "ESPRESSO_NODE_DATABASE_STATEMENT_TIMEOUT", value_parser = parse_duration, default_value = "10m")]
281    pub(crate) statement_timeout: Duration,
282
283    /// The minimum number of database connections to maintain at any time.
284    ///
285    /// The database client will, to the best of its ability, maintain at least `min` open
286    /// connections at all times. This can be used to reduce the latency hit of opening new
287    /// connections when at least this many simultaneous connections are frequently needed.
288    #[clap(
289        long,
290        env = "ESPRESSO_NODE_DATABASE_MIN_CONNECTIONS",
291        default_value = "0"
292    )]
293    pub(crate) min_connections: u32,
294
295    /// Allows setting a different maximum number of connections for query operations.
296    /// Default value of None implies using the min_connections value.
297    #[cfg(not(feature = "embedded-db"))]
298    #[clap(long, env = "ESPRESSO_NODE_DATABASE_QUERY_MIN_CONNECTIONS", default_value = None)]
299    pub(crate) query_min_connections: Option<u32>,
300
301    /// The maximum number of database connections to maintain at any time.
302    ///
303    /// Once `max` connections are in use simultaneously, further attempts to acquire a connection
304    /// (or begin a transaction) will block until one of the existing connections is released.
305    #[clap(
306        long,
307        env = "ESPRESSO_NODE_DATABASE_MAX_CONNECTIONS",
308        default_value = "25"
309    )]
310    pub(crate) max_connections: u32,
311
312    /// Allows setting a different maximum number of connections for query operations.
313    /// Default value of None implies using the max_connections value.
314    #[cfg(not(feature = "embedded-db"))]
315    #[clap(long, env = "ESPRESSO_NODE_DATABASE_QUERY_MAX_CONNECTIONS", default_value = None)]
316    pub(crate) query_max_connections: Option<u32>,
317
318    // Keep the database connection pool when persistence is created,
319    // allowing it to be reused across multiple instances instead of creating
320    // a new pool each time such as for API, consensus storage etc
321    // This also ensures all storage instances adhere to the MAX_CONNECTIONS limit if set
322    //
323    // Note: Cloning the `Pool` is lightweight and efficient because it simply
324    // creates a new reference-counted handle to the underlying pool state.
325    #[clap(skip)]
326    pub(crate) pool: Option<sqlx::Pool<Db>>,
327}
328
329impl Default for Options {
330    fn default() -> Self {
331        Self::parse_from(std::iter::empty::<String>())
332    }
333}
334
335#[cfg(not(feature = "embedded-db"))]
336impl From<PostgresOptions> for Config {
337    fn from(opt: PostgresOptions) -> Self {
338        let mut cfg = Config::default();
339
340        if let Some(host) = opt.host {
341            cfg = cfg.host(host);
342        }
343
344        if let Some(port) = opt.port {
345            cfg = cfg.port(port);
346        }
347
348        if let Some(database) = &opt.database {
349            cfg = cfg.database(database);
350        }
351
352        if let Some(user) = &opt.user {
353            cfg = cfg.user(user);
354        }
355
356        if let Some(password) = &opt.password {
357            cfg = cfg.password(password);
358        }
359
360        if opt.use_tls {
361            cfg = cfg.tls();
362        }
363
364        cfg = cfg.max_connections(20);
365        cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
366        cfg = cfg.connection_timeout(Duration::from_secs(10240));
367        cfg = cfg.slow_statement_threshold(Duration::from_secs(1));
368        cfg = cfg.statement_timeout(Duration::from_secs(600)); // 10 minutes default
369
370        hotshot_query_service::data_source::storage::sql::set_no_deferrable_on_read(
371            opt.no_deferrable,
372        );
373
374        cfg
375    }
376}
377
378#[cfg(feature = "embedded-db")]
379impl From<SqliteOptions> for Config {
380    fn from(opt: SqliteOptions) -> Self {
381        let mut cfg = Config::default();
382
383        cfg = cfg.db_path(opt.path);
384
385        cfg = cfg.max_connections(20);
386        cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
387        cfg = cfg.connection_timeout(Duration::from_secs(10240));
388        cfg = cfg.slow_statement_threshold(Duration::from_secs(2));
389        cfg = cfg.statement_timeout(Duration::from_secs(600));
390        cfg
391    }
392}
393
394#[cfg(not(feature = "embedded-db"))]
395impl From<PostgresOptions> for Options {
396    fn from(opt: PostgresOptions) -> Self {
397        Options {
398            postgres_options: opt,
399            max_connections: 20,
400            idle_connection_timeout: Duration::from_secs(120),
401            connection_timeout: Duration::from_secs(10240),
402            slow_statement_threshold: Duration::from_secs(1),
403            statement_timeout: Duration::from_secs(600),
404            ..Default::default()
405        }
406    }
407}
408
409#[cfg(feature = "embedded-db")]
410impl From<SqliteOptions> for Options {
411    fn from(opt: SqliteOptions) -> Self {
412        Options {
413            sqlite_options: opt,
414            max_connections: 5,
415            idle_connection_timeout: Duration::from_secs(120),
416            connection_timeout: Duration::from_secs(10240),
417            slow_statement_threshold: Duration::from_secs(1),
418            uri: None,
419            statement_timeout: Duration::from_secs(600),
420            prune: false,
421            pruning: Default::default(),
422            consensus_pruning: Default::default(),
423            fetch_rate_limit: None,
424            active_fetch_delay: None,
425            chunk_fetch_delay: None,
426            sync_status_chunk_size: None,
427            sync_status_ttl: None,
428            proactive_scan_chunk_size: None,
429            proactive_scan_interval: None,
430            disable_proactive_fetching: false,
431            archive: false,
432            lightweight: false,
433            min_connections: 0,
434            pool: None,
435        }
436    }
437}
438impl TryFrom<&Options> for Config {
439    type Error = anyhow::Error;
440
441    fn try_from(opt: &Options) -> Result<Self, Self::Error> {
442        let mut cfg = match &opt.uri {
443            Some(uri) => uri.parse()?,
444            None => Self::default(),
445        };
446
447        if let Some(pool) = &opt.pool {
448            cfg = cfg.pool(pool.clone());
449        }
450
451        cfg = cfg.max_connections(opt.max_connections);
452        cfg = cfg.idle_connection_timeout(opt.idle_connection_timeout);
453        cfg = cfg.min_connections(opt.min_connections);
454
455        #[cfg(not(feature = "embedded-db"))]
456        {
457            cfg =
458                cfg.query_max_connections(opt.query_max_connections.unwrap_or(opt.max_connections));
459            cfg =
460                cfg.query_min_connections(opt.query_min_connections.unwrap_or(opt.min_connections));
461
462            hotshot_query_service::data_source::storage::sql::set_no_deferrable_on_read(
463                opt.postgres_options.no_deferrable,
464            );
465        }
466
467        cfg = cfg.connection_timeout(opt.connection_timeout);
468        cfg = cfg.slow_statement_threshold(opt.slow_statement_threshold);
469        cfg = cfg.statement_timeout(opt.statement_timeout);
470
471        #[cfg(not(feature = "embedded-db"))]
472        {
473            cfg = cfg.migrations(include_migrations!(
474                "$CARGO_MANIFEST_DIR/api/migrations/postgres"
475            ));
476
477            let pg_options = &opt.postgres_options;
478
479            if let Some(host) = &pg_options.host {
480                cfg = cfg.host(host.clone());
481            }
482
483            if let Some(port) = pg_options.port {
484                cfg = cfg.port(port);
485            }
486
487            if let Some(database) = &pg_options.database {
488                cfg = cfg.database(database);
489            }
490
491            if let Some(user) = &pg_options.user {
492                cfg = cfg.user(user);
493            }
494
495            if let Some(password) = &pg_options.password {
496                cfg = cfg.password(password);
497            }
498
499            if pg_options.use_tls {
500                cfg = cfg.tls();
501            }
502        }
503
504        #[cfg(feature = "embedded-db")]
505        {
506            cfg = cfg.migrations(include_migrations!(
507                "$CARGO_MANIFEST_DIR/api/migrations/sqlite"
508            ));
509
510            cfg = cfg.db_path(opt.sqlite_options.path.clone());
511        }
512
513        if opt.prune {
514            cfg = cfg.pruner_cfg(PrunerCfg::from(opt.pruning))?;
515        }
516        if opt.archive {
517            cfg = cfg.archive();
518        }
519
520        Ok(cfg)
521    }
522}
523
524/// Pruning parameters.
525#[derive(Parser, Clone, Copy, Debug)]
526pub struct PruningOptions {
527    /// Threshold for pruning, specified in bytes.
528    /// If the disk usage surpasses this threshold, pruning is initiated for data older than the specified minimum retention period.
529    /// Pruning continues until the disk usage drops below the MAX USAGE.
530    #[clap(long, env = "ESPRESSO_NODE_PRUNER_PRUNING_THRESHOLD", value_parser = parse_size)]
531    pub(crate) pruning_threshold: Option<u64>,
532
533    /// Minimum retention period.
534    /// Data is retained for at least this duration, even if there's no free disk space.
535    #[clap(
536        long,
537        env = "ESPRESSO_NODE_PRUNER_MINIMUM_RETENTION",
538        value_parser = parse_duration,
539    )]
540    pub(crate) minimum_retention: Option<Duration>,
541
542    /// Target retention period.
543    /// Data older than this is pruned to free up space.
544    #[clap(
545        long,
546        env = "ESPRESSO_NODE_PRUNER_TARGET_RETENTION",
547        value_parser = parse_duration,
548    )]
549    pub(crate) target_retention: Option<Duration>,
550
551    /// Batch size for pruning.
552    /// This is the number of blocks data to delete in a single transaction.
553    #[clap(long, env = "ESPRESSO_NODE_PRUNER_BATCH_SIZE")]
554    pub(crate) batch_size: Option<u64>,
555
556    /// Maximum disk usage (in basis points).
557    ///
558    /// Pruning stops once the disk usage falls below this value, even if
559    /// some data older than the `MINIMUM_RETENTION` remains. Values range
560    /// from 0 (0%) to 10000 (100%).
561    #[clap(long, env = "ESPRESSO_NODE_PRUNER_MAX_USAGE")]
562    pub(crate) max_usage: Option<u16>,
563
564    /// Interval for running the pruner.
565    #[clap(
566        long,
567        env = "ESPRESSO_NODE_PRUNER_INTERVAL",
568        value_parser = parse_duration,
569    )]
570    pub(crate) interval: Option<Duration>,
571
572    /// Number of SQLite pages to vacuum from the freelist
573    /// during each pruner cycle.
574    /// This value corresponds to `N` in the SQLite PRAGMA `incremental_vacuum(N)`,
575    #[clap(long, env = "ESPRESSO_NODE_PRUNER_INCREMENTAL_VACUUM_PAGES")]
576    pub(crate) pages: Option<u64>,
577}
578
579impl Default for PruningOptions {
580    fn default() -> Self {
581        Self::parse_from(std::iter::empty::<String>())
582    }
583}
584
585impl From<PruningOptions> for PrunerCfg {
586    fn from(opt: PruningOptions) -> Self {
587        let mut cfg = PrunerCfg::new();
588        if let Some(threshold) = opt.pruning_threshold {
589            cfg = cfg.with_pruning_threshold(threshold);
590        }
591        if let Some(min) = opt.minimum_retention {
592            cfg = cfg.with_minimum_retention(min);
593        }
594        if let Some(target) = opt.target_retention {
595            cfg = cfg.with_target_retention(target);
596        }
597        if let Some(batch) = opt.batch_size {
598            cfg = cfg.with_batch_size(batch);
599        }
600        if let Some(max) = opt.max_usage {
601            cfg = cfg.with_max_usage(max);
602        }
603        if let Some(interval) = opt.interval {
604            cfg = cfg.with_interval(interval);
605        }
606
607        if let Some(pages) = opt.pages {
608            cfg = cfg.with_incremental_vacuum_pages(pages)
609        }
610
611        cfg = cfg.with_state_tables(vec![
612            BlockMerkleTree::state_type().to_string(),
613            FeeMerkleTree::state_type().to_string(),
614        ]);
615
616        cfg
617    }
618}
619
620/// Pruning parameters for ephemeral consensus storage.
621#[derive(Parser, Clone, Copy, Debug)]
622pub struct ConsensusPruningOptions {
623    /// Number of views to try to retain in consensus storage before data that hasn't been archived
624    /// is garbage collected.
625    ///
626    /// The longer this is, the more certain that all data will eventually be archived, even if
627    /// there are temporary problems with archive storage or partially missing data. This can be set
628    /// very large, as most data is garbage collected as soon as it is finalized by consensus. This
629    /// setting only applies to views which never get decided (ie forks in consensus) and views for
630    /// which this node is partially offline. These should be exceptionally rare.
631    ///
632    /// Note that in extreme scenarios, data may be garbage collected even before TARGET_RETENTION
633    /// views, if consensus storage exceeds TARGET_USAGE. For a hard lower bound on how long
634    /// consensus data will be retained, see MINIMUM_RETENTION.
635    ///
636    /// The default of 302000 views equates to approximately 1 week (604800 seconds) at an average
637    /// view time of 2s.
638    #[clap(
639        name = "TARGET_RETENTION",
640        long = "consensus-storage-target-retention",
641        env = "ESPRESSO_NODE_CONSENSUS_STORAGE_TARGET_RETENTION",
642        default_value = "302000"
643    )]
644    pub(crate) target_retention: u64,
645
646    /// Minimum number of views to try to retain in consensus storage before data that hasn't been
647    /// archived is garbage collected.
648    ///
649    /// This bound allows data to be retained even if consensus storage occupies more than
650    /// TARGET_USAGE. This can be used to ensure sufficient time to move consensus data to archival
651    /// storage as necessary, even under extreme circumstances where otherwise garbage collection
652    /// would kick in based on TARGET_RETENTION.
653    ///
654    /// The default of 130000 views equates to approximately 3 days (259200 seconds) at an average
655    /// view time of 2s.
656    #[clap(
657        name = "MINIMUM_RETENTION",
658        long = "consensus-storage-minimum-retention",
659        env = "ESPRESSO_NODE_CONSENSUS_STORAGE_MINIMUM_RETENTION",
660        default_value = "130000"
661    )]
662    pub(crate) minimum_retention: u64,
663
664    /// Amount (in bytes) of data to retain in consensus storage before garbage collecting more
665    /// aggressively.
666    ///
667    /// See also TARGET_RETENTION and MINIMUM_RETENTION.
668    #[clap(
669        name = "TARGET_USAGE",
670        long = "consensus-storage-target-usage",
671        env = "ESPRESSO_NODE_CONSENSUS_STORAGE_TARGET_USAGE",
672        default_value = "1000000000"
673    )]
674    pub(crate) target_usage: u64,
675}
676
677impl Default for ConsensusPruningOptions {
678    fn default() -> Self {
679        Self::parse_from(std::iter::empty::<String>())
680    }
681}
682
683#[async_trait]
684impl PersistenceOptions for Options {
685    type Persistence = Persistence;
686
687    fn set_view_retention(&mut self, view_retention: u64) {
688        self.consensus_pruning.target_retention = view_retention;
689        self.consensus_pruning.minimum_retention = view_retention;
690    }
691
692    async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
693        let config = (&*self).try_into()?;
694        let persistence = Persistence {
695            db: SqlStorage::connect(config, StorageConnectionType::Sequencer).await?,
696            gc_opt: self.consensus_pruning,
697            internal_metrics: PersistenceMetricsValue::default(),
698        };
699        persistence.migrate_quorum_proposal_leaf_hashes().await?;
700        self.pool = Some(persistence.db.pool());
701        Ok(persistence)
702    }
703
704    async fn reset(self) -> anyhow::Result<()> {
705        SqlStorage::connect(
706            Config::try_from(&self)?.reset_schema(),
707            StorageConnectionType::Sequencer,
708        )
709        .await?;
710        Ok(())
711    }
712}
713
714#[derive(Debug, Clone, Copy)]
715pub enum DataMigration {
716    X25519Keys,
717}
718
719impl DataMigration {
720    pub fn as_str(&self) -> &'static str {
721        match self {
722            Self::X25519Keys => "x25519_keys",
723        }
724    }
725}
726
727/// Postgres-backed persistence.
728#[derive(Clone, Debug)]
729pub struct Persistence {
730    db: SqlStorage,
731    gc_opt: ConsensusPruningOptions,
732    /// A reference to the internal metrics
733    internal_metrics: PersistenceMetricsValue,
734}
735
736/// PostgreSQL error code for serialization failures under SERIALIZABLE isolation.
737/// Transactions that fail with this code are safe to retry from scratch.
738const PG_SERIALIZATION_FAILURE_CODE: &str = "40001";
739
740#[derive(Debug)]
741struct DecidedLeaf {
742    info: LeafInfo<SeqTypes>,
743    cert: CertificatePair<SeqTypes>,
744}
745
746fn decide_events_from_chain(
747    mut chain: Vec<DecidedLeaf>,
748    cert2: Option<Certificate2<SeqTypes>>,
749    deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
750) -> Vec<CoordinatorEvent<SeqTypes>> {
751    let split_idx = chain
752        .iter()
753        .position(|leaf| leaf.info.leaf.block_header().version() < versions::NEW_PROTOCOL_VERSION)
754        .unwrap_or(chain.len());
755    let legacy_leaves = chain.split_off(split_idx);
756    let new_leaves = chain;
757
758    let mut events = Vec::with_capacity(2);
759    if !legacy_leaves.is_empty() {
760        let committing_qc = legacy_leaves[0].cert.clone();
761        let deciding_qc = new_leaves
762            .is_empty()
763            .then_some(deciding_qc)
764            .flatten()
765            .filter(|qc| qc.view_number() == committing_qc.view_number() + 1);
766        let view_number = legacy_leaves[0].info.leaf.view_number();
767        let leaf_chain = legacy_leaves
768            .into_iter()
769            .map(|leaf| leaf.info)
770            .collect::<Vec<_>>();
771
772        events.push(CoordinatorEvent::LegacyEvent(Event {
773            view_number,
774            event: EventType::Decide {
775                leaf_chain: Arc::new(leaf_chain),
776                committing_qc: Arc::new(committing_qc),
777                deciding_qc,
778                block_size: None,
779            },
780        }));
781    }
782
783    if new_leaves.is_empty() && cert2.is_some() {
784        tracing::warn!(
785            "decide_events_from_chain called with cert2 but no new-protocol leaves; cert2 will be \
786             dropped"
787        );
788    }
789
790    if !new_leaves.is_empty() {
791        // cert1 is the QC for the newest leaf
792        // ancestors are certified by
793        // their successor's justify_qc. cert2 finalizes the newest leaf.
794        // update() uses cert1 to build LeafQueryData for
795        // the newest leaf and only attaches cert2 to it.
796        let cert1 = new_leaves[0].cert.qc().clone();
797        let leaf_infos = new_leaves.into_iter().map(|leaf| leaf.info).collect();
798
799        events.push(CoordinatorEvent::NewDecide {
800            leaf_infos,
801            cert1,
802            cert2,
803        });
804    }
805
806    events
807}
808
809impl Persistence {
810    /// Ensure the `leaf_hash` column is populated for all existing quorum proposals.
811    ///
812    /// This column was added in a migration, but because it requires computing a commitment of the
813    /// existing data, it is not easy to populate in the SQL migration itself. Thus, on startup, we
814    /// check if there are any just-migrated quorum proposals with a `NULL` value for this column,
815    /// and if so we populate the column manually.
816    async fn migrate_quorum_proposal_leaf_hashes(&self) -> anyhow::Result<()> {
817        WRITE_BACKOFF
818            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
819                let mut tx = self.db.write().await?;
820
821                let mut proposals = tx.fetch("SELECT * FROM quorum_proposals");
822
823                let mut updates = vec![];
824                while let Some(row) = proposals.next().await {
825                    let row = row?;
826
827                    let hash: Option<String> = row.try_get("leaf_hash")?;
828                    if hash.is_none() {
829                        let view: i64 = row.try_get("view")?;
830                        let data: Vec<u8> = row.try_get("data")?;
831                        let proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
832                            bincode::deserialize(&data)?;
833                        let leaf = Leaf::from_quorum_proposal(&proposal.data);
834                        let leaf_hash = Committable::commit(&leaf);
835                        tracing::info!(view, %leaf_hash, "populating quorum proposal leaf hash");
836                        updates.push((view, leaf_hash.to_string()));
837                    }
838                }
839                drop(proposals);
840
841                tx.upsert("quorum_proposals", ["view", "leaf_hash"], ["view"], updates)
842                    .await?;
843
844                tx.commit().await
845            })
846            .await
847    }
848
849    async fn is_migration_complete(&self, name: &str, table_name: &str) -> anyhow::Result<bool> {
850        let mut tx = self.db.read().await?;
851        let (completed,): (bool,) =
852            query_as("SELECT completed FROM data_migrations WHERE name = $1 AND table_name = $2")
853                .bind(name)
854                .bind(table_name)
855                .fetch_one(tx.as_mut())
856                .await
857                .context("migration tracking row missing - schema may be out of sync")?;
858        Ok(completed)
859    }
860
861    async fn mark_migration_complete(
862        tx: &mut Transaction<Write>,
863        name: &str,
864        table_name: &str,
865        migrated_rows: usize,
866    ) -> anyhow::Result<()> {
867        tx.execute(
868            query(
869                "UPDATE data_migrations SET completed = true, migrated_rows = $1 WHERE name = $2 \
870                 AND table_name = $3",
871            )
872            .bind(migrated_rows as i64)
873            .bind(name)
874            .bind(table_name),
875        )
876        .await?;
877        Ok(())
878    }
879
880    async fn generate_decide_events(
881        &self,
882        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
883        consumer: &impl EventConsumer,
884    ) -> anyhow::Result<()> {
885        let mut last_processed_view: Option<i64> = self
886            .db
887            .read()
888            .await?
889            .fetch_optional("SELECT last_processed_view FROM event_stream WHERE id = 1 LIMIT 1")
890            .await?
891            .map(|row| row.get("last_processed_view"));
892        loop {
893            // In SQLite, overlapping read and write transactions can lead to database errors. To
894            // avoid this:
895            // - start a read transaction to query and collect all the necessary data.
896            // - Commit (or implicitly drop) the read transaction once the data is fetched.
897            // - use the collected data to generate a "decide" event for the consumer.
898            // - begin a write transaction to delete the data and update the event stream.
899            let mut tx = self.db.read().await?;
900
901            // Collect a chain of consecutive leaves, starting from the first view after the last
902            // decide. This will correspond to a decide event, and defines a range of views which
903            // can be garbage collected. This may even include views for which there was no leaf,
904            // for which we might still have artifacts like proposals that never finalized.
905            let from_view = match last_processed_view {
906                Some(v) => v + 1,
907                None => 0,
908            };
909            tracing::debug!(?from_view, "generate decide event");
910
911            let mut parent = None;
912            let mut rows = query(
913                "SELECT leaf, qc, next_epoch_qc FROM anchor_leaf2 WHERE view >= $1 ORDER BY view",
914            )
915            .bind(from_view)
916            .fetch(tx.as_mut());
917            let mut leaves: Vec<(Leaf2, CertificatePair<SeqTypes>)> = vec![];
918            let mut final_qc = None;
919            while let Some(row) = rows.next().await {
920                let row = match row {
921                    Ok(row) => row,
922                    Err(err) => {
923                        // If there's an error getting a row, try generating an event with the rows
924                        // we do have.
925                        tracing::warn!("error loading row: {err:#}");
926                        break;
927                    },
928                };
929
930                let leaf_data: Vec<u8> = row.get("leaf");
931                let leaf = bincode::deserialize::<Leaf2>(&leaf_data)?;
932                let qc_data: Vec<u8> = row.get("qc");
933                let qc = bincode::deserialize::<QuorumCertificate2<SeqTypes>>(&qc_data)?;
934                let next_epoch_qc = match row.get::<Option<Vec<u8>>, _>("next_epoch_qc") {
935                    Some(bytes) => {
936                        Some(bincode::deserialize::<NextEpochQuorumCertificate2<SeqTypes>>(&bytes)?)
937                    },
938                    None => None,
939                };
940                let height = leaf.block_header().block_number();
941
942                // Ensure we are only dealing with a consecutive chain of leaves. We don't want to
943                // garbage collect any views for which we missed a leaf or decide event; at least
944                // not right away, in case we need to recover that data later.
945                if let Some(parent) = parent
946                    && height != parent + 1
947                {
948                    tracing::debug!(
949                        height,
950                        parent,
951                        "ending decide event at non-consecutive leaf"
952                    );
953                    break;
954                }
955                parent = Some(height);
956                let cert = CertificatePair::new(qc, next_epoch_qc);
957                final_qc = Some(cert.clone());
958                leaves.push((leaf, cert));
959            }
960            drop(rows);
961
962            let Some(final_qc) = final_qc else {
963                // End event processing when there are no more decided views.
964                tracing::debug!(from_view, "no new leaves at decide");
965                return Ok(());
966            };
967
968            // Find the range of views encompassed by this leaf chain. All data in this range can be
969            // processed by the consumer and then deleted.
970            let from_view = leaves[0].0.view_number();
971            let to_view = leaves[leaves.len() - 1].0.view_number();
972
973            // Collect VID shares for the decide event.
974            let mut vid_shares = tx
975                .fetch_all(
976                    query("SELECT view, data FROM vid_share2 where view >= $1 AND view <= $2")
977                        .bind(from_view.u64() as i64)
978                        .bind(to_view.u64() as i64),
979                )
980                .await?
981                .into_iter()
982                .map(|row| {
983                    let view: i64 = row.get("view");
984                    let data: Vec<u8> = row.get("data");
985                    let vid_proposal = bincode::deserialize::<
986                        Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
987                    >(&data)?;
988                    Ok((view as u64, vid_proposal))
989                })
990                .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
991
992            // Collect DA proposals for the decide event.
993            let mut da_proposals = tx
994                .fetch_all(
995                    query("SELECT view, data FROM da_proposal2 where view >= $1 AND view <= $2")
996                        .bind(from_view.u64() as i64)
997                        .bind(to_view.u64() as i64),
998                )
999                .await?
1000                .into_iter()
1001                .map(|row| {
1002                    let view: i64 = row.get("view");
1003                    let data: Vec<u8> = row.get("data");
1004                    let da_proposal =
1005                        bincode::deserialize::<Proposal<SeqTypes, DaProposal2<SeqTypes>>>(&data)?;
1006                    Ok((view as u64, da_proposal.data))
1007                })
1008                .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
1009
1010            // Collect state certs for the decide event.
1011            let state_certs = Self::load_state_certs(&mut tx, from_view, to_view)
1012                .await
1013                .inspect_err(|err| {
1014                    tracing::error!(
1015                        ?from_view,
1016                        ?to_view,
1017                        "failed to load state certificates. error={err:#}"
1018                    );
1019                })?;
1020
1021            let cert2 = tx
1022                .fetch_optional(
1023                    query("SELECT data FROM decided_cert2 WHERE view = $1")
1024                        .bind(to_view.u64() as i64),
1025                )
1026                .await?
1027                .map(|row| {
1028                    let bytes: Vec<u8> = row.get("data");
1029                    bincode::deserialize::<Certificate2<SeqTypes>>(&bytes)
1030                        .context("deserializing decided cert2")
1031                })
1032                .transpose()?;
1033            drop(tx);
1034
1035            // Collate all the information by view number and construct a chain of leaves.
1036            let chain = leaves
1037                .into_iter()
1038                // Go in reverse chronological order, as expected by Decide events.
1039                .rev()
1040                .map(|(mut leaf, cert)| {
1041                    let view = leaf.view_number();
1042
1043                    // Include the VID share if available.
1044                    let vid_proposal = vid_shares.remove(&view);
1045                    if vid_proposal.is_none() {
1046                        tracing::debug!(?view, "VID share not available at decide");
1047                    }
1048                    let vid_share = vid_proposal.as_ref().map(|proposal| proposal.data.clone());
1049
1050                    // Fill in the full block payload using the DA proposals we had persisted.
1051                    if let Some(proposal) = da_proposals.remove(&view) {
1052                        let payload =
1053                            Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata);
1054                        leaf.fill_block_payload_unchecked(payload);
1055                    } else if view == ViewNumber::genesis() {
1056                        // We don't get a DA proposal for the genesis view, but we know what the
1057                        // payload always is.
1058                        leaf.fill_block_payload_unchecked(Payload::empty().0);
1059                    } else {
1060                        tracing::debug!(?view, "DA proposal not available at decide");
1061                    }
1062
1063                    let state_cert = state_certs.get(&view).cloned();
1064
1065                    let info = LeafInfo {
1066                        leaf,
1067                        vid_share,
1068                        state_cert,
1069                        // Note: the following fields are not used in Decide event processing,
1070                        // and should be removed. For now, we just default them.
1071                        state: Default::default(),
1072                        delta: Default::default(),
1073                    };
1074                    DecidedLeaf { info, cert }
1075                })
1076                .collect();
1077
1078            tracing::debug!(
1079                ?from_view,
1080                ?to_view,
1081                ?final_qc,
1082                ?chain,
1083                "generating decide event"
1084            );
1085
1086            for event in decide_events_from_chain(chain, cert2, deciding_qc.clone()) {
1087                consumer.handle_event(&event).await?;
1088            }
1089
1090            let from_view_i64 = from_view.u64() as i64;
1091            let to_view_i64 = to_view.u64() as i64;
1092            let serialized_state_certs = state_certs
1093                .into_iter()
1094                .map(|(epoch, cert)| Ok((epoch as i64, bincode::serialize(&cert)?)))
1095                .collect::<anyhow::Result<Vec<(i64, Vec<u8>)>>>()?;
1096
1097            // Now that we have definitely processed leaves up to `to_view`, we can update
1098            // `last_processed_view` so we don't process these leaves again. We may still fail at
1099            // this point, or shut down, and fail to complete this update. At worst this will lead
1100            // to us sending a duplicate decide event the next time we are called; this is fine as
1101            // the event consumer is required to be idempotent.
1102            WRITE_BACKOFF
1103                .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1104                    let mut tx = self.db.write().await?;
1105                    tx.upsert(
1106                        "event_stream",
1107                        ["id", "last_processed_view"],
1108                        ["id"],
1109                        [(1i32, to_view_i64)],
1110                    )
1111                    .await?;
1112
1113                    // Store all the finalized state certs
1114                    for (epoch, state_cert_bytes) in &serialized_state_certs {
1115                        tx.upsert(
1116                            "finalized_state_cert",
1117                            ["epoch", "state_cert"],
1118                            ["epoch"],
1119                            [(*epoch, state_cert_bytes.clone())],
1120                        )
1121                        .await?;
1122                    }
1123
1124                    // Delete the data that has been fully processed.
1125                    tx.execute(
1126                        query("DELETE FROM vid_share2 where view >= $1 AND view <= $2")
1127                            .bind(from_view_i64)
1128                            .bind(to_view_i64),
1129                    )
1130                    .await?;
1131                    tx.execute(
1132                        query("DELETE FROM da_proposal2 where view >= $1 AND view <= $2")
1133                            .bind(from_view_i64)
1134                            .bind(to_view_i64),
1135                    )
1136                    .await?;
1137                    tx.execute(
1138                        query("DELETE FROM quorum_proposals2 where view >= $1 AND view <= $2")
1139                            .bind(from_view_i64)
1140                            .bind(to_view_i64),
1141                    )
1142                    .await?;
1143                    tx.execute(
1144                        query("DELETE FROM quorum_certificate2 where view >= $1 AND view <= $2")
1145                            .bind(from_view_i64)
1146                            .bind(to_view_i64),
1147                    )
1148                    .await?;
1149                    tx.execute(
1150                        query("DELETE FROM state_cert where view >= $1 AND view <= $2")
1151                            .bind(from_view_i64)
1152                            .bind(to_view_i64),
1153                    )
1154                    .await?;
1155                    tx.execute(
1156                        query("DELETE FROM decided_cert2 where view >= $1 AND view <= $2")
1157                            .bind(from_view_i64)
1158                            .bind(to_view_i64),
1159                    )
1160                    .await?;
1161
1162                    // Clean up leaves, but do not delete the most recent one (all leaves with a view
1163                    // number less than the given value). This is necessary to ensure that, in case of
1164                    // a restart, we can resume from the last decided leaf.
1165                    tx.execute(
1166                        query("DELETE FROM anchor_leaf2 WHERE view >= $1 AND view < $2")
1167                            .bind(from_view_i64)
1168                            .bind(to_view_i64),
1169                    )
1170                    .await?;
1171
1172                    tx.commit().await?;
1173                    Ok(())
1174                })
1175                .await?;
1176            last_processed_view = Some(to_view_i64);
1177        }
1178    }
1179
1180    async fn load_state_certs(
1181        tx: &mut Transaction<Read>,
1182        from_view: ViewNumber,
1183        to_view: ViewNumber,
1184    ) -> anyhow::Result<BTreeMap<u64, LightClientStateUpdateCertificateV2<SeqTypes>>> {
1185        let rows = tx
1186            .fetch_all(
1187                query("SELECT view, state_cert FROM state_cert WHERE view >= $1 AND view <= $2")
1188                    .bind(from_view.u64() as i64)
1189                    .bind(to_view.u64() as i64),
1190            )
1191            .await?;
1192
1193        let mut result = BTreeMap::new();
1194
1195        for row in rows {
1196            let data: Vec<u8> = row.get("state_cert");
1197
1198            let cert: LightClientStateUpdateCertificateV2<SeqTypes> = bincode::deserialize(&data)
1199                .or_else(|err_v2| {
1200                bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&data)
1201                    .map(Into::into)
1202                    .context(format!(
1203                        "Failed to deserialize LightClientStateUpdateCertificate: with v1 and v2. \
1204                         error: {err_v2}"
1205                    ))
1206            })?;
1207
1208            result.insert(cert.epoch.u64(), cert);
1209        }
1210
1211        Ok(result)
1212    }
1213
1214    #[tracing::instrument(skip(self))]
1215    async fn prune(&self, cur_view: ViewNumber) -> anyhow::Result<()> {
1216        WRITE_BACKOFF
1217            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1218                let mut tx = self.db.write().await?;
1219
1220                // Prune everything older than the target retention period.
1221                prune_to_view(
1222                    &mut tx,
1223                    cur_view.u64().saturating_sub(self.gc_opt.target_retention),
1224                )
1225                .await?;
1226
1227                // Check our storage usage; if necessary we will prune more aggressively (up to the
1228                // minimum retention) to get below the target usage.
1229                #[cfg(feature = "embedded-db")]
1230                let usage_query = format!(
1231                    "SELECT sum(pgsize) FROM dbstat WHERE name IN ({})",
1232                    PRUNE_TABLES
1233                        .iter()
1234                        .map(|table| format!("'{table}'"))
1235                        .join(",")
1236                );
1237
1238                #[cfg(not(feature = "embedded-db"))]
1239                let usage_query = {
1240                    let table_sizes = PRUNE_TABLES
1241                        .iter()
1242                        .map(|table| format!("pg_table_size('{table}')"))
1243                        .join(" + ");
1244                    format!("SELECT {table_sizes}")
1245                };
1246
1247                let (usage,): (i64,) = query_as(&usage_query).fetch_one(tx.as_mut()).await?;
1248                tracing::debug!(usage, "consensus storage usage after pruning");
1249
1250                if (usage as u64) > self.gc_opt.target_usage {
1251                    tracing::warn!(
1252                        usage,
1253                        gc_opt = ?self.gc_opt,
1254                        "consensus storage is running out of space, pruning to minimum retention"
1255                    );
1256                    prune_to_view(
1257                        &mut tx,
1258                        cur_view.u64().saturating_sub(self.gc_opt.minimum_retention),
1259                    )
1260                    .await?;
1261                }
1262
1263                tx.commit().await
1264            })
1265            .await
1266    }
1267}
1268
1269/// Maximum number of retries on PostgreSQL serialization conflicts (error 40001).
1270const WRITE_RETRY_MAX: u32 = 5;
1271
1272/// Backoff parameters for write-transaction retries.
1273const WRITE_BACKOFF: BackoffParams = BackoffParams::new(
1274    Duration::from_millis(10),
1275    Duration::from_millis(500),
1276    2,
1277    Ratio {
1278        numerator: 5,
1279        denominator: 10,
1280    },
1281);
1282
1283fn is_serialization_error(err: &anyhow::Error) -> bool {
1284    err.chain()
1285        .filter_map(|e| e.downcast_ref::<sqlx::Error>())
1286        .filter_map(|e| e.as_database_error())
1287        .any(|e| e.code().as_deref() == Some(PG_SERIALIZATION_FAILURE_CODE))
1288}
1289
1290const PRUNE_TABLES: &[&str] = &[
1291    "anchor_leaf2",
1292    "vid_share2",
1293    "da_proposal2",
1294    "quorum_proposals2",
1295    "quorum_certificate2",
1296];
1297
1298async fn prune_to_view(tx: &mut Transaction<Write>, view: u64) -> anyhow::Result<()> {
1299    if view == 0 {
1300        // Nothing to prune, the entire chain is younger than the retention period.
1301        return Ok(());
1302    }
1303    tracing::debug!(view, "pruning consensus storage");
1304
1305    for table in PRUNE_TABLES {
1306        let res = query(&format!("DELETE FROM {table} WHERE view < $1"))
1307            .bind(view as i64)
1308            .execute(tx.as_mut())
1309            .await
1310            .context(format!("pruning {table}"))?;
1311        if res.rows_affected() > 0 {
1312            tracing::info!(
1313                "garbage collected {} rows from {table}",
1314                res.rows_affected()
1315            );
1316        }
1317    }
1318
1319    Ok(())
1320}
1321
1322#[async_trait]
1323impl SequencerPersistence for Persistence {
1324    async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()> {
1325        let batch_size: i64 = 1000;
1326
1327        let result = {
1328            let mut tx = self.db.read().await?;
1329            query_as::<(bool, i64)>(
1330                "SELECT completed, migrated_rows FROM epoch_migration WHERE table_name = \
1331                 'reward_merkle_tree_v2_data'",
1332            )
1333            .fetch_optional(tx.as_mut())
1334            .await?
1335        };
1336
1337        let (is_completed, mut offset) = result.unwrap_or((false, 0));
1338
1339        if is_completed {
1340            tracing::info!("reward_merkle_tree_v2 migration already done");
1341            return Ok(());
1342        }
1343
1344        let max_height: Option<i64> = {
1345            let mut tx = self.db.read().await?;
1346            query_as::<(Option<i64>,)>("SELECT MAX(created) FROM reward_merkle_tree_v2")
1347                .fetch_one(tx.as_mut())
1348                .await?
1349                .0
1350        };
1351
1352        let max_height = match max_height {
1353            Some(h) => h,
1354            None => {
1355                tracing::info!("no reward data found in reward_merkle_tree_v2, skipping migration");
1356                return Ok(());
1357            },
1358        };
1359
1360        tracing::warn!(
1361            "migrating reward_merkle_tree_v2 to reward_merkle_tree_v2_data at height \
1362             {max_height}..."
1363        );
1364
1365        let mut balances: Vec<(RewardAccountV2, RewardAmount)> = Vec::new();
1366
1367        loop {
1368            let mut tx = self.db.read().await?;
1369
1370            #[cfg(not(feature = "embedded-db"))]
1371            let rows = query_as::<(serde_json::Value, serde_json::Value)>(
1372                "SELECT DISTINCT ON (idx) idx, entry
1373                   FROM reward_merkle_tree_v2
1374                  WHERE idx IS NOT NULL AND entry IS NOT NULL
1375                  ORDER BY idx, created DESC
1376                  LIMIT $1 OFFSET $2",
1377            )
1378            .bind(batch_size)
1379            .bind(offset)
1380            .fetch_all(tx.as_mut())
1381            .await
1382            .context("loading reward accounts from reward_merkle_tree_v2")?;
1383
1384            #[cfg(feature = "embedded-db")]
1385            let rows = query_as::<(serde_json::Value, serde_json::Value)>(
1386                "SELECT idx, entry FROM (
1387                     SELECT idx, entry, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY created DESC) \
1388                 as rn
1389                       FROM reward_merkle_tree_v2
1390                      WHERE idx IS NOT NULL AND entry IS NOT NULL
1391                 ) sub
1392                 WHERE rn = 1 ORDER BY idx
1393                 LIMIT $1 OFFSET $2",
1394            )
1395            .bind(batch_size)
1396            .bind(offset)
1397            .fetch_all(tx.as_mut())
1398            .await
1399            .context("loading reward accounts from reward_merkle_tree_v2")?;
1400
1401            drop(tx);
1402
1403            if rows.is_empty() {
1404                break;
1405            }
1406
1407            let rows_count = rows.len();
1408
1409            for (idx, entry) in rows {
1410                let account: RewardAccountV2 =
1411                    serde_json::from_value(idx).context("deserializing reward account")?;
1412                let balance: RewardAmount = serde_json::from_value(entry).context(format!(
1413                    "deserializing reward balance for account {account}"
1414                ))?;
1415                balances.push((account, balance));
1416            }
1417
1418            offset += rows_count as i64;
1419            let mut tx = self.db.write().await?;
1420            tx.upsert(
1421                "epoch_migration",
1422                ["table_name", "completed", "migrated_rows"],
1423                ["table_name"],
1424                [("reward_merkle_tree_v2_data".to_string(), false, offset)],
1425            )
1426            .await?;
1427            tx.commit().await?;
1428
1429            tracing::info!(
1430                "reward_merkle_tree_v2 progress: rows={} offset={}",
1431                rows_count,
1432                offset
1433            );
1434
1435            if rows_count < batch_size as usize {
1436                break;
1437            }
1438        }
1439
1440        if balances.is_empty() {
1441            tracing::info!("no reward accounts found, skipping tree rebuild");
1442            return Ok(());
1443        }
1444
1445        tracing::info!(
1446            "rebuilding RewardMerkleTreeV2 from {} accounts",
1447            balances.len()
1448        );
1449
1450        let tree = RewardMerkleTreeV2::from_kv_set(REWARD_MERKLE_TREE_V2_HEIGHT, balances)
1451            .context("failed to rebuild RewardMerkleTreeV2 from balances")?;
1452
1453        let mut tx = self.db.read().await?;
1454        let header = tx
1455            .get_header(BlockId::<SeqTypes>::from(max_height as usize))
1456            .await
1457            .context(format!("header {max_height} not available"))?;
1458        drop(tx);
1459
1460        match header.reward_merkle_tree_root() {
1461            Either::Right(expected_root) => {
1462                ensure!(
1463                    tree.commitment() == expected_root,
1464                    "rebuilt RewardMerkleTreeV2 commitment {} does not match header commitment {} \
1465                     at height {max_height}",
1466                    tree.commitment(),
1467                    expected_root,
1468                );
1469            },
1470            Either::Left(_) => {
1471                bail!(
1472                    "header at height {max_height} has a v1 reward merkle tree root, expected v2"
1473                );
1474            },
1475        }
1476
1477        let tree_data: RewardMerkleTreeV2Data = (&tree)
1478            .try_into()
1479            .context("failed to convert RewardMerkleTreeV2 to RewardMerkleTreeV2Data")?;
1480        let serialized =
1481            bincode::serialize(&tree_data).context("failed to serialize RewardMerkleTreeV2Data")?;
1482
1483        let mut tx = self.db.write().await?;
1484        tx.upsert(
1485            "reward_merkle_tree_v2_data",
1486            ["height", "balances"],
1487            ["height"],
1488            [(max_height, serialized)],
1489        )
1490        .await?;
1491        tx.commit().await?;
1492
1493        // Mark migration as complete, and clean up old tables.
1494        let mut tx = self.db.write().await?;
1495        tx.upsert(
1496            "epoch_migration",
1497            ["table_name", "completed", "migrated_rows"],
1498            ["table_name"],
1499            [("reward_merkle_tree_v2_data".to_string(), true, offset)],
1500        )
1501        .await?;
1502        let truncate = if cfg!(feature = "embedded-db") {
1503            "DELETE FROM"
1504        } else {
1505            "TRUNCATE"
1506        };
1507        query(&format!("{truncate} reward_merkle_tree_v2"))
1508            .execute(tx.as_mut())
1509            .await?;
1510        query(&format!("{truncate} reward_merkle_tree"))
1511            .execute(tx.as_mut())
1512            .await?;
1513        tx.commit().await?;
1514
1515        tracing::warn!("migrated reward_merkle_tree_v2 at height {max_height}");
1516
1517        Ok(())
1518    }
1519
1520    fn into_catchup_provider(
1521        self,
1522        backoff: BackoffParams,
1523    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
1524        Ok(Arc::new(SqlStateCatchup::new(Arc::new(self.db), backoff)))
1525    }
1526
1527    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
1528        tracing::info!("loading config from Postgres");
1529
1530        // Select the most recent config (although there should only be one).
1531        let Some(row) = self
1532            .db
1533            .read()
1534            .await?
1535            .fetch_optional("SELECT config FROM network_config ORDER BY id DESC LIMIT 1")
1536            .await?
1537        else {
1538            tracing::info!("config not found");
1539            return Ok(None);
1540        };
1541        let json = row.try_get("config")?;
1542
1543        let json = migrate_network_config(json).context("migration of network config failed")?;
1544        let config = serde_json::from_value(json).context("malformed config file")?;
1545
1546        Ok(Some(config))
1547    }
1548
1549    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
1550        tracing::info!("saving config to database");
1551        let json = serde_json::to_value(cfg)?;
1552
1553        WRITE_BACKOFF
1554            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1555                let mut tx = self.db.write().await?;
1556                tx.execute(
1557                    query("INSERT INTO network_config (config) VALUES ($1)").bind(json.clone()),
1558                )
1559                .await?;
1560                tx.commit().await
1561            })
1562            .await
1563    }
1564
1565    async fn append_decided_leaves(
1566        &self,
1567        view: ViewNumber,
1568        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
1569        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
1570        consumer: &(impl EventConsumer + 'static),
1571    ) -> anyhow::Result<()> {
1572        let values = leaf_chain
1573            .into_iter()
1574            .map(|(info, cert)| {
1575                // The leaf may come with a large payload attached. We don't care about this payload
1576                // because we already store it separately, as part of the DA proposal. Storing it
1577                // here contributes to load on the DB for no reason, so we remove it before
1578                // serializing the leaf.
1579                let mut leaf = info.leaf.clone();
1580                leaf.unfill_block_payload();
1581
1582                let view = cert.view_number().u64() as i64;
1583                let leaf_bytes = bincode::serialize(&leaf)?;
1584                let qc_bytes = bincode::serialize(cert.qc())?;
1585                let next_epoch_qc_bytes = match cert.next_epoch_qc() {
1586                    Some(qc) => Some(bincode::serialize(qc)?),
1587                    None => None,
1588                };
1589                Ok((view, leaf_bytes, qc_bytes, next_epoch_qc_bytes))
1590            })
1591            .collect::<anyhow::Result<Vec<_>>>()?;
1592
1593        // First, append the new leaves. We do this in its own transaction because even if GC or the
1594        // event consumer later fails, there is no need to abort the storage of the leaves.
1595        WRITE_BACKOFF
1596            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1597                let mut tx = self.db.write().await?;
1598                tx.upsert(
1599                    "anchor_leaf2",
1600                    ["view", "leaf", "qc", "next_epoch_qc"],
1601                    ["view"],
1602                    values.clone(),
1603                )
1604                .await?;
1605                tx.commit().await
1606            })
1607            .await?;
1608
1609        // Generate an event for the new leaves and, only if it succeeds, clean up data we no longer
1610        // need.
1611        if let Err(err) = self.generate_decide_events(deciding_qc, consumer).await {
1612            // GC/event processing failure is not an error, since by this point we have at least
1613            // managed to persist the decided leaves successfully, and GC will just run again at the
1614            // next decide. Log an error but do not return it.
1615            tracing::warn!(?view, "event processing failed: {err:#}");
1616            return Ok(());
1617        }
1618
1619        // Garbage collect data which was not included in any decide event, but which at this point
1620        // is old enough to just forget about.
1621        if let Err(err) = self.prune(view).await {
1622            tracing::warn!(?view, "pruning failed: {err:#}");
1623        }
1624
1625        Ok(())
1626    }
1627
1628    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1629        Ok(self
1630            .db
1631            .read()
1632            .await?
1633            .fetch_optional(query("SELECT view FROM highest_voted_view WHERE id = 0"))
1634            .await?
1635            .map(|row| {
1636                let view: i64 = row.get("view");
1637                ViewNumber::new(view as u64)
1638            }))
1639    }
1640
1641    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1642        Ok(self
1643            .db
1644            .read()
1645            .await?
1646            .fetch_optional(query("SELECT view FROM restart_view WHERE id = 0"))
1647            .await?
1648            .map(|row| {
1649                let view: i64 = row.get("view");
1650                ViewNumber::new(view as u64)
1651            }))
1652    }
1653
1654    async fn load_anchor_leaf(
1655        &self,
1656    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
1657        let Some(row) = self
1658            .db
1659            .read()
1660            .await?
1661            .fetch_optional("SELECT leaf, qc FROM anchor_leaf2 ORDER BY view DESC LIMIT 1")
1662            .await?
1663        else {
1664            return Ok(None);
1665        };
1666
1667        let leaf_bytes: Vec<u8> = row.get("leaf");
1668        let leaf2: Leaf2 = bincode::deserialize(&leaf_bytes)?;
1669
1670        let qc_bytes: Vec<u8> = row.get("qc");
1671        let qc2: QuorumCertificate2<SeqTypes> = bincode::deserialize(&qc_bytes)?;
1672
1673        Ok(Some((leaf2, qc2)))
1674    }
1675
1676    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
1677        let mut tx = self.db.read().await?;
1678        let (view,) = query_as::<(i64,)>("SELECT coalesce(max(view), 0) FROM anchor_leaf2")
1679            .fetch_one(tx.as_mut())
1680            .await?;
1681        Ok(ViewNumber::new(view as u64))
1682    }
1683
1684    async fn load_da_proposal(
1685        &self,
1686        view: ViewNumber,
1687    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
1688        let result = self
1689            .db
1690            .read()
1691            .await?
1692            .fetch_optional(
1693                query("SELECT data FROM da_proposal2 where view = $1").bind(view.u64() as i64),
1694            )
1695            .await?;
1696
1697        result
1698            .map(|row| {
1699                let bytes: Vec<u8> = row.get("data");
1700                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1701            })
1702            .transpose()
1703    }
1704
1705    async fn load_vid_share(
1706        &self,
1707        view: ViewNumber,
1708    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
1709        let result = self
1710            .db
1711            .read()
1712            .await?
1713            .fetch_optional(
1714                query("SELECT data FROM vid_share2 where view = $1").bind(view.u64() as i64),
1715            )
1716            .await?;
1717
1718        result
1719            .map(|row| {
1720                let bytes: Vec<u8> = row.get("data");
1721                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1722            })
1723            .transpose()
1724    }
1725
1726    async fn load_quorum_proposals(
1727        &self,
1728    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
1729    {
1730        let rows = self
1731            .db
1732            .read()
1733            .await?
1734            .fetch_all("SELECT * FROM quorum_proposals2")
1735            .await?;
1736
1737        Ok(BTreeMap::from_iter(
1738            rows.into_iter()
1739                .map(|row| {
1740                    let view: i64 = row.get("view");
1741                    let view_number: ViewNumber = ViewNumber::new(view.try_into()?);
1742                    let bytes: Vec<u8> = row.get("data");
1743                    let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1744                        bincode::deserialize(&bytes).or_else(|error| {
1745                            bincode::deserialize::<
1746                                Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>,
1747                            >(&bytes)
1748                            .map(convert_proposal)
1749                            .inspect_err(|err_v3| {
1750                                tracing::warn!(
1751                                    ?view_number,
1752                                    %error,
1753                                    %err_v3,
1754                                    "ignoring malformed quorum proposal DB row"
1755                                );
1756                            })
1757                        })?;
1758                    Ok((view_number, proposal))
1759                })
1760                .collect::<anyhow::Result<Vec<_>>>()?,
1761        ))
1762    }
1763
1764    async fn load_quorum_proposal(
1765        &self,
1766        view: ViewNumber,
1767    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
1768        let mut tx = self.db.read().await?;
1769        let (data,) =
1770            query_as::<(Vec<u8>,)>("SELECT data FROM quorum_proposals2 WHERE view = $1 LIMIT 1")
1771                .bind(view.u64() as i64)
1772                .fetch_one(tx.as_mut())
1773                .await?;
1774        let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1775            bincode::deserialize(&data).or_else(|error| {
1776                bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1777                    &data,
1778                )
1779                .map(convert_proposal)
1780                .context(format!(
1781                    "Failed to deserialize quorum proposal for view {view}. error={error}"
1782                ))
1783            })?;
1784
1785        Ok(proposal)
1786    }
1787
1788    async fn append_vid(
1789        &self,
1790        proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
1791    ) -> anyhow::Result<()> {
1792        let view = proposal.data.view_number().u64();
1793        let payload_hash = proposal.data.payload_commitment();
1794        let data_bytes = bincode::serialize(proposal).unwrap();
1795
1796        let now = Instant::now();
1797        let res = WRITE_BACKOFF
1798            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1799                let mut tx = self.db.write().await?;
1800                tx.upsert(
1801                    "vid_share2",
1802                    ["view", "data", "payload_hash"],
1803                    ["view"],
1804                    [(view as i64, data_bytes.clone(), payload_hash.to_string())],
1805                )
1806                .await?;
1807                tx.commit().await
1808            })
1809            .await;
1810        self.internal_metrics
1811            .internal_append_vid_duration
1812            .add_point(now.elapsed().as_secs_f64());
1813        res
1814    }
1815
1816    async fn append_da(
1817        &self,
1818        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1819        vid_commit: VidCommitment,
1820    ) -> anyhow::Result<()> {
1821        let data = &proposal.data;
1822        let view = data.view_number().u64();
1823        let data_bytes = bincode::serialize(proposal).unwrap();
1824
1825        let now = Instant::now();
1826        let res = WRITE_BACKOFF
1827            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1828                let mut tx = self.db.write().await?;
1829                tx.upsert(
1830                    "da_proposal",
1831                    ["view", "data", "payload_hash"],
1832                    ["view"],
1833                    [(view as i64, data_bytes.clone(), vid_commit.to_string())],
1834                )
1835                .await?;
1836                tx.commit().await
1837            })
1838            .await;
1839        self.internal_metrics
1840            .internal_append_da_duration
1841            .add_point(now.elapsed().as_secs_f64());
1842        res
1843    }
1844
1845    async fn record_action(
1846        &self,
1847        view: ViewNumber,
1848        _epoch: Option<EpochNumber>,
1849        action: HotShotAction,
1850    ) -> anyhow::Result<()> {
1851        // Todo Remove this after https://github.com/EspressoSystems/espresso-network/issues/1931
1852        if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
1853            return Ok(());
1854        }
1855
1856        WRITE_BACKOFF
1857            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1858                let stmt = format!(
1859                    "INSERT INTO highest_voted_view (id, view) VALUES (0, $1)
1860                ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(highest_voted_view.view, \
1861                     excluded.view)"
1862                );
1863
1864                let mut tx = self.db.write().await?;
1865                tx.execute(query(&stmt).bind(view.u64() as i64)).await?;
1866
1867                if matches!(action, HotShotAction::Vote) {
1868                    let restart_view = view + 1;
1869                    let stmt = format!(
1870                        "INSERT INTO restart_view (id, view) VALUES (0, $1)
1871                    ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(restart_view.view, \
1872                         excluded.view)"
1873                    );
1874                    tx.execute(query(&stmt).bind(restart_view.u64() as i64))
1875                        .await?;
1876                }
1877
1878                tx.commit().await
1879            })
1880            .await
1881    }
1882
1883    async fn append_quorum_proposal2(
1884        &self,
1885        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1886    ) -> anyhow::Result<()> {
1887        let view_number = proposal.data.view_number().u64();
1888
1889        let proposal_bytes = bincode::serialize(&proposal).context("serializing proposal")?;
1890        let leaf_hash = Committable::commit(&Leaf2::from_quorum_proposal(&proposal.data));
1891
1892        // We also keep track of any QC we see in case we need it to recover our archival storage.
1893        let justify_qc = proposal.data.justify_qc();
1894        let justify_qc_bytes = bincode::serialize(&justify_qc).context("serializing QC")?;
1895        let justify_qc_view = justify_qc.view_number.u64() as i64;
1896        let justify_qc_leaf_commit = justify_qc.data.leaf_commit.to_string();
1897
1898        let now = Instant::now();
1899        let res = WRITE_BACKOFF
1900            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1901                let mut tx = self.db.write().await?;
1902                tx.upsert(
1903                    "quorum_proposals2",
1904                    ["view", "leaf_hash", "data"],
1905                    ["view"],
1906                    [(
1907                        view_number as i64,
1908                        leaf_hash.to_string(),
1909                        proposal_bytes.clone(),
1910                    )],
1911                )
1912                .await?;
1913                tx.upsert(
1914                    "quorum_certificate2",
1915                    ["view", "leaf_hash", "data"],
1916                    ["view"],
1917                    [(
1918                        justify_qc_view,
1919                        justify_qc_leaf_commit.clone(),
1920                        justify_qc_bytes.clone(),
1921                    )],
1922                )
1923                .await?;
1924                tx.commit().await
1925            })
1926            .await;
1927        self.internal_metrics
1928            .internal_append_quorum2_duration
1929            .add_point(now.elapsed().as_secs_f64());
1930        res
1931    }
1932
1933    async fn append_cert2(
1934        &self,
1935        view: ViewNumber,
1936        cert2: Certificate2<SeqTypes>,
1937    ) -> anyhow::Result<()> {
1938        let data = bincode::serialize(&cert2).context("serializing cert2")?;
1939        let view_i64 = view.u64() as i64;
1940        WRITE_BACKOFF
1941            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
1942                let mut tx = self.db.write().await?;
1943                tx.upsert(
1944                    "decided_cert2",
1945                    ["view", "data"],
1946                    ["view"],
1947                    [(view_i64, data.clone())],
1948                )
1949                .await?;
1950                tx.commit().await
1951            })
1952            .await
1953    }
1954
1955    async fn load_cert2(&self, view: ViewNumber) -> anyhow::Result<Option<Certificate2<SeqTypes>>> {
1956        let row = self
1957            .db
1958            .read()
1959            .await?
1960            .fetch_optional(
1961                query("SELECT data FROM decided_cert2 WHERE view = $1").bind(view.u64() as i64),
1962            )
1963            .await?;
1964        row.map(|row| {
1965            let bytes: Vec<u8> = row.get("data");
1966            bincode::deserialize::<Certificate2<SeqTypes>>(&bytes).context("deserializing cert2")
1967        })
1968        .transpose()
1969    }
1970
1971    async fn load_upgrade_certificate(
1972        &self,
1973    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
1974        let result = self
1975            .db
1976            .read()
1977            .await?
1978            .fetch_optional("SELECT * FROM upgrade_certificate where id = true")
1979            .await?;
1980
1981        result
1982            .map(|row| {
1983                let bytes: Vec<u8> = row.get("data");
1984                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1985            })
1986            .transpose()
1987    }
1988
1989    async fn store_upgrade_certificate(
1990        &self,
1991        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1992    ) -> anyhow::Result<()> {
1993        let certificate = match decided_upgrade_certificate {
1994            Some(cert) => cert,
1995            None => return Ok(()),
1996        };
1997        let upgrade_certificate_bytes =
1998            bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1999        WRITE_BACKOFF
2000            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2001                let mut tx = self.db.write().await?;
2002                tx.upsert(
2003                    "upgrade_certificate",
2004                    ["id", "data"],
2005                    ["id"],
2006                    [(true, upgrade_certificate_bytes.clone())],
2007                )
2008                .await?;
2009                tx.commit().await
2010            })
2011            .await
2012    }
2013
2014    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
2015        let batch_size: i64 = 10000;
2016        let mut tx = self.db.read().await?;
2017
2018        // The SQL migration populates the table name and sets a default value of 0 for migrated rows.
2019        // so, fetch_one() would always return a row
2020        // The number of migrated rows is updated after each batch insert.
2021        // This allows the types migration to resume from where it left off.
2022        let (is_completed, mut offset) = query_as::<(bool, i64)>(
2023            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'anchor_leaf'",
2024        )
2025        .fetch_one(tx.as_mut())
2026        .await?;
2027
2028        if is_completed {
2029            tracing::info!("decided leaves already migrated");
2030            return Ok(());
2031        }
2032
2033        tracing::warn!("migrating decided leaves..");
2034        loop {
2035            let mut tx = self.db.read().await?;
2036            let rows = query(
2037                "SELECT view, leaf, qc FROM anchor_leaf WHERE view >= $1 ORDER BY view LIMIT $2",
2038            )
2039            .bind(offset)
2040            .bind(batch_size)
2041            .fetch_all(tx.as_mut())
2042            .await?;
2043
2044            drop(tx);
2045            if rows.is_empty() {
2046                break;
2047            }
2048            let mut values = Vec::new();
2049
2050            for row in rows.iter() {
2051                let leaf: Vec<u8> = row.try_get("leaf")?;
2052                let qc: Vec<u8> = row.try_get("qc")?;
2053                let leaf1: Leaf = bincode::deserialize(&leaf)?;
2054                let qc1: QuorumCertificate<SeqTypes> = bincode::deserialize(&qc)?;
2055                let view: i64 = row.try_get("view")?;
2056
2057                let leaf2: Leaf2 = leaf1.into();
2058                let qc2: QuorumCertificate2<SeqTypes> = qc1.to_qc2();
2059
2060                let leaf2_bytes = bincode::serialize(&leaf2)?;
2061                let qc2_bytes = bincode::serialize(&qc2)?;
2062
2063                values.push((view, leaf2_bytes, qc2_bytes));
2064            }
2065
2066            let mut query_builder: sqlx::QueryBuilder<Db> =
2067                sqlx::QueryBuilder::new("INSERT INTO anchor_leaf2 (view, leaf, qc) ");
2068
2069            offset = values.last().context("last row")?.0;
2070
2071            query_builder.push_values(values, |mut b, (view, leaf, qc)| {
2072                b.push_bind(view).push_bind(leaf).push_bind(qc);
2073            });
2074
2075            // Offset tracking prevents duplicate inserts
2076            // Added as a safeguard.
2077            query_builder.push(" ON CONFLICT DO NOTHING");
2078
2079            let query = query_builder.build();
2080
2081            let mut tx = self.db.write().await?;
2082            query.execute(tx.as_mut()).await?;
2083
2084            tx.upsert(
2085                "epoch_migration",
2086                ["table_name", "completed", "migrated_rows"],
2087                ["table_name"],
2088                [("anchor_leaf".to_string(), false, offset)],
2089            )
2090            .await?;
2091            tx.commit().await?;
2092
2093            tracing::info!(
2094                "anchor leaf migration progress: rows={} offset={}",
2095                rows.len(),
2096                offset
2097            );
2098
2099            if rows.len() < batch_size as usize {
2100                break;
2101            }
2102        }
2103
2104        tracing::warn!("migrated decided leaves");
2105
2106        let mut tx = self.db.write().await?;
2107        tx.upsert(
2108            "epoch_migration",
2109            ["table_name", "completed", "migrated_rows"],
2110            ["table_name"],
2111            [("anchor_leaf".to_string(), true, offset)],
2112        )
2113        .await?;
2114        tx.commit().await?;
2115
2116        tracing::info!("updated epoch_migration table for anchor_leaf");
2117
2118        Ok(())
2119    }
2120
2121    async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
2122        let batch_size: i64 = 10000;
2123        let mut tx = self.db.read().await?;
2124
2125        let (is_completed, mut offset) = query_as::<(bool, i64)>(
2126            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'da_proposal'",
2127        )
2128        .fetch_one(tx.as_mut())
2129        .await?;
2130
2131        if is_completed {
2132            tracing::info!("da proposals migration already done");
2133            return Ok(());
2134        }
2135
2136        tracing::warn!("migrating da proposals..");
2137
2138        loop {
2139            let mut tx = self.db.read().await?;
2140            let rows = query(
2141                "SELECT payload_hash, data FROM da_proposal WHERE view >= $1 ORDER BY view LIMIT \
2142                 $2",
2143            )
2144            .bind(offset)
2145            .bind(batch_size)
2146            .fetch_all(tx.as_mut())
2147            .await?;
2148
2149            drop(tx);
2150            if rows.is_empty() {
2151                break;
2152            }
2153            let mut values = Vec::new();
2154
2155            for row in rows.iter() {
2156                let data: Vec<u8> = row.try_get("data")?;
2157                let payload_hash: String = row.try_get("payload_hash")?;
2158
2159                let da_proposal: Proposal<SeqTypes, DaProposal<SeqTypes>> =
2160                    bincode::deserialize(&data)?;
2161                let da_proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
2162                    convert_proposal(da_proposal);
2163
2164                let view = da_proposal2.data.view_number.u64() as i64;
2165                let data = bincode::serialize(&da_proposal2)?;
2166
2167                values.push((view, payload_hash, data));
2168            }
2169
2170            let mut query_builder: sqlx::QueryBuilder<Db> =
2171                sqlx::QueryBuilder::new("INSERT INTO da_proposal2 (view, payload_hash, data) ");
2172
2173            offset = values.last().context("last row")?.0;
2174            query_builder.push_values(values, |mut b, (view, payload_hash, data)| {
2175                b.push_bind(view).push_bind(payload_hash).push_bind(data);
2176            });
2177            query_builder.push(" ON CONFLICT DO NOTHING");
2178            let query = query_builder.build();
2179
2180            let mut tx = self.db.write().await?;
2181            query.execute(tx.as_mut()).await?;
2182
2183            tx.upsert(
2184                "epoch_migration",
2185                ["table_name", "completed", "migrated_rows"],
2186                ["table_name"],
2187                [("da_proposal".to_string(), false, offset)],
2188            )
2189            .await?;
2190            tx.commit().await?;
2191
2192            tracing::info!(
2193                "DA proposals migration progress: rows={} offset={}",
2194                rows.len(),
2195                offset
2196            );
2197            if rows.len() < batch_size as usize {
2198                break;
2199            }
2200        }
2201
2202        tracing::warn!("migrated da proposals");
2203
2204        let mut tx = self.db.write().await?;
2205        tx.upsert(
2206            "epoch_migration",
2207            ["table_name", "completed", "migrated_rows"],
2208            ["table_name"],
2209            [("da_proposal".to_string(), true, offset)],
2210        )
2211        .await?;
2212        tx.commit().await?;
2213
2214        tracing::info!("updated epoch_migration table for da_proposal");
2215
2216        Ok(())
2217    }
2218
2219    async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
2220        let batch_size: i64 = 10000;
2221
2222        let mut tx = self.db.read().await?;
2223
2224        let (is_completed, mut offset) = query_as::<(bool, i64)>(
2225            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'vid_share'",
2226        )
2227        .fetch_one(tx.as_mut())
2228        .await?;
2229
2230        if is_completed {
2231            tracing::info!("vid_share migration already done");
2232            return Ok(());
2233        }
2234
2235        tracing::warn!("migrating vid shares..");
2236        loop {
2237            let mut tx = self.db.read().await?;
2238            let rows = query(
2239                "SELECT payload_hash, data FROM vid_share WHERE view >= $1 ORDER BY view LIMIT $2",
2240            )
2241            .bind(offset)
2242            .bind(batch_size)
2243            .fetch_all(tx.as_mut())
2244            .await?;
2245
2246            drop(tx);
2247            if rows.is_empty() {
2248                break;
2249            }
2250            let mut values = Vec::new();
2251
2252            for row in rows.iter() {
2253                let data: Vec<u8> = row.try_get("data")?;
2254                let payload_hash: String = row.try_get("payload_hash")?;
2255
2256                let vid_share: Proposal<SeqTypes, VidDisperseShare0<SeqTypes>> =
2257                    bincode::deserialize(&data)?;
2258                let vid_share2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
2259                    convert_proposal(vid_share);
2260
2261                let view = vid_share2.data.view_number().u64() as i64;
2262                let data = bincode::serialize(&vid_share2)?;
2263
2264                values.push((view, payload_hash, data));
2265            }
2266
2267            let mut query_builder: sqlx::QueryBuilder<Db> =
2268                sqlx::QueryBuilder::new("INSERT INTO vid_share2 (view, payload_hash, data) ");
2269
2270            offset = values.last().context("last row")?.0;
2271
2272            query_builder.push_values(values, |mut b, (view, payload_hash, data)| {
2273                b.push_bind(view).push_bind(payload_hash).push_bind(data);
2274            });
2275
2276            let query = query_builder.build();
2277
2278            let mut tx = self.db.write().await?;
2279            query.execute(tx.as_mut()).await?;
2280
2281            tx.upsert(
2282                "epoch_migration",
2283                ["table_name", "completed", "migrated_rows"],
2284                ["table_name"],
2285                [("vid_share".to_string(), false, offset)],
2286            )
2287            .await?;
2288            tx.commit().await?;
2289
2290            tracing::info!(
2291                "VID shares migration progress: rows={} offset={}",
2292                rows.len(),
2293                offset
2294            );
2295            if rows.len() < batch_size as usize {
2296                break;
2297            }
2298        }
2299
2300        tracing::warn!("migrated vid shares");
2301
2302        let mut tx = self.db.write().await?;
2303        tx.upsert(
2304            "epoch_migration",
2305            ["table_name", "completed", "migrated_rows"],
2306            ["table_name"],
2307            [("vid_share".to_string(), true, offset)],
2308        )
2309        .await?;
2310        tx.commit().await?;
2311
2312        tracing::info!("updated epoch_migration table for vid_share");
2313
2314        Ok(())
2315    }
2316
2317    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
2318        let batch_size: i64 = 10000;
2319        let mut tx = self.db.read().await?;
2320
2321        let (is_completed, mut offset) = query_as::<(bool, i64)>(
2322            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
2323             'quorum_proposals'",
2324        )
2325        .fetch_one(tx.as_mut())
2326        .await?;
2327
2328        if is_completed {
2329            tracing::info!("quorum proposals migration already done");
2330            return Ok(());
2331        }
2332
2333        tracing::warn!("migrating quorum proposals..");
2334
2335        loop {
2336            let mut tx = self.db.read().await?;
2337            let rows = query(
2338                "SELECT view, leaf_hash, data FROM quorum_proposals WHERE view >= $1 ORDER BY \
2339                 view LIMIT $2",
2340            )
2341            .bind(offset)
2342            .bind(batch_size)
2343            .fetch_all(tx.as_mut())
2344            .await?;
2345
2346            drop(tx);
2347
2348            if rows.is_empty() {
2349                break;
2350            }
2351
2352            let mut values = Vec::new();
2353
2354            for row in rows.iter() {
2355                let leaf_hash: String = row.try_get("leaf_hash")?;
2356                let data: Vec<u8> = row.try_get("data")?;
2357
2358                let quorum_proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
2359                    bincode::deserialize(&data)?;
2360                let quorum_proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
2361                    convert_proposal(quorum_proposal);
2362
2363                let view = quorum_proposal2.data.view_number().u64() as i64;
2364                let data = bincode::serialize(&quorum_proposal2)?;
2365
2366                values.push((view, leaf_hash, data));
2367            }
2368
2369            let mut query_builder: sqlx::QueryBuilder<Db> =
2370                sqlx::QueryBuilder::new("INSERT INTO quorum_proposals2 (view, leaf_hash, data) ");
2371
2372            offset = values.last().context("last row")?.0;
2373            query_builder.push_values(values, |mut b, (view, leaf_hash, data)| {
2374                b.push_bind(view).push_bind(leaf_hash).push_bind(data);
2375            });
2376
2377            query_builder.push(" ON CONFLICT DO NOTHING");
2378
2379            let query = query_builder.build();
2380
2381            let mut tx = self.db.write().await?;
2382            query.execute(tx.as_mut()).await?;
2383
2384            tx.upsert(
2385                "epoch_migration",
2386                ["table_name", "completed", "migrated_rows"],
2387                ["table_name"],
2388                [("quorum_proposals".to_string(), false, offset)],
2389            )
2390            .await?;
2391            tx.commit().await?;
2392
2393            tracing::info!(
2394                "quorum proposals migration progress: rows={} offset={}",
2395                rows.len(),
2396                offset
2397            );
2398
2399            if rows.len() < batch_size as usize {
2400                break;
2401            }
2402        }
2403
2404        tracing::warn!("migrated quorum proposals");
2405
2406        let mut tx = self.db.write().await?;
2407        tx.upsert(
2408            "epoch_migration",
2409            ["table_name", "completed", "migrated_rows"],
2410            ["table_name"],
2411            [("quorum_proposals".to_string(), true, offset)],
2412        )
2413        .await?;
2414        tx.commit().await?;
2415
2416        tracing::info!("updated epoch_migration table for quorum_proposals");
2417
2418        Ok(())
2419    }
2420
2421    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
2422        let batch_size: i64 = 10000;
2423        let mut tx = self.db.read().await?;
2424
2425        let (is_completed, mut offset) = query_as::<(bool, i64)>(
2426            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
2427             'quorum_certificate'",
2428        )
2429        .fetch_one(tx.as_mut())
2430        .await?;
2431
2432        if is_completed {
2433            tracing::info!("quorum certificates migration already done");
2434            return Ok(());
2435        }
2436
2437        tracing::warn!("migrating quorum certificates..");
2438        loop {
2439            let mut tx = self.db.read().await?;
2440            let rows = query(
2441                "SELECT view, leaf_hash, data FROM quorum_certificate WHERE view >= $1 ORDER BY \
2442                 view LIMIT $2",
2443            )
2444            .bind(offset)
2445            .bind(batch_size)
2446            .fetch_all(tx.as_mut())
2447            .await?;
2448
2449            drop(tx);
2450            if rows.is_empty() {
2451                break;
2452            }
2453            let mut values = Vec::new();
2454
2455            for row in rows.iter() {
2456                let leaf_hash: String = row.try_get("leaf_hash")?;
2457                let data: Vec<u8> = row.try_get("data")?;
2458
2459                let qc: QuorumCertificate<SeqTypes> = bincode::deserialize(&data)?;
2460                let qc2: QuorumCertificate2<SeqTypes> = qc.to_qc2();
2461
2462                let view = qc2.view_number().u64() as i64;
2463                let data = bincode::serialize(&qc2)?;
2464
2465                values.push((view, leaf_hash, data));
2466            }
2467
2468            let mut query_builder: sqlx::QueryBuilder<Db> =
2469                sqlx::QueryBuilder::new("INSERT INTO quorum_certificate2 (view, leaf_hash, data) ");
2470
2471            offset = values.last().context("last row")?.0;
2472
2473            query_builder.push_values(values, |mut b, (view, leaf_hash, data)| {
2474                b.push_bind(view).push_bind(leaf_hash).push_bind(data);
2475            });
2476
2477            query_builder.push(" ON CONFLICT DO NOTHING");
2478            let query = query_builder.build();
2479
2480            let mut tx = self.db.write().await?;
2481            query.execute(tx.as_mut()).await?;
2482
2483            tx.upsert(
2484                "epoch_migration",
2485                ["table_name", "completed", "migrated_rows"],
2486                ["table_name"],
2487                [("quorum_certificate".to_string(), false, offset)],
2488            )
2489            .await?;
2490            tx.commit().await?;
2491
2492            tracing::info!(
2493                "Quorum certificates migration progress: rows={} offset={}",
2494                rows.len(),
2495                offset
2496            );
2497
2498            if rows.len() < batch_size as usize {
2499                break;
2500            }
2501        }
2502
2503        tracing::warn!("migrated quorum certificates");
2504
2505        let mut tx = self.db.write().await?;
2506        tx.upsert(
2507            "epoch_migration",
2508            ["table_name", "completed", "migrated_rows"],
2509            ["table_name"],
2510            [("quorum_certificate".to_string(), true, offset)],
2511        )
2512        .await?;
2513        tx.commit().await?;
2514        tracing::info!("updated epoch_migration table for quorum_certificate");
2515
2516        Ok(())
2517    }
2518
2519    /// Migrate stake table data to include x25519_key and p2p_addr fields.
2520    ///
2521    /// Data written before x25519 support lacks these fields. This migration
2522    /// deserializes legacy records and re-serializes them with the new fields set to None.
2523    async fn migrate_x25519_keys(&self) -> anyhow::Result<()> {
2524        use super::RegisteredValidatorNoX25519;
2525
2526        let name = DataMigration::X25519Keys.as_str();
2527
2528        // Migrate bincode storage (epoch_drb_and_root.stake).
2529        if !self
2530            .is_migration_complete(name, "epoch_drb_and_root")
2531            .await?
2532        {
2533            let rows: Vec<(i64, Vec<u8>)> = {
2534                let mut tx = self.db.read().await?;
2535                query_as("SELECT epoch, stake FROM epoch_drb_and_root WHERE stake IS NOT NULL")
2536                    .fetch_all(tx.as_mut())
2537                    .await?
2538            };
2539
2540            let num_rows = rows.len();
2541            let mut tx = self.db.write().await?;
2542            for (epoch, stake_bytes) in rows {
2543                // Try current format first
2544                if bincode::deserialize::<AuthenticatedValidatorMap>(&stake_bytes).is_ok() {
2545                    continue;
2546                }
2547
2548                // Legacy format without x25519 fields
2549                let old_validators: IndexMap<Address, RegisteredValidatorNoX25519> =
2550                    bincode::deserialize(&stake_bytes)
2551                        .context("deserializing legacy stake table")?;
2552                let validators: AuthenticatedValidatorMap = old_validators
2553                    .into_iter()
2554                    .map(|(addr, v)| {
2555                        let registered = v.migrate();
2556                        (
2557                            addr,
2558                            AuthenticatedValidator::try_from(registered)
2559                                .expect("stake tables only contain authenticated validators"),
2560                        )
2561                    })
2562                    .collect();
2563
2564                let new_bytes =
2565                    bincode::serialize(&validators).context("serializing stake table")?;
2566
2567                tracing::debug!(epoch, "migrating x25519 keys in stake table");
2568                tx.execute(
2569                    query("UPDATE epoch_drb_and_root SET stake = $1 WHERE epoch = $2")
2570                        .bind(&new_bytes)
2571                        .bind(epoch),
2572                )
2573                .await?;
2574            }
2575            Self::mark_migration_complete(&mut tx, name, "epoch_drb_and_root", num_rows).await?;
2576            tx.commit().await?;
2577            tracing::info!(
2578                num_rows,
2579                "x25519_keys migration completed for epoch_drb_and_root"
2580            );
2581        }
2582
2583        // Migrate JSONB storage (stake_table_validators).
2584        if !self
2585            .is_migration_complete(name, "stake_table_validators")
2586            .await?
2587        {
2588            let rows: Vec<(i64, String, serde_json::Value)> = {
2589                let mut tx = self.db.read().await?;
2590                query_as("SELECT epoch, address, validator FROM stake_table_validators")
2591                    .fetch_all(tx.as_mut())
2592                    .await?
2593            };
2594
2595            let num_rows = rows.len();
2596            let mut tx = self.db.write().await?;
2597            for (epoch, address, validator_json) in rows {
2598                // Check if JSON already has x25519 fields (can't rely on deserialization
2599                // since serde_json fills missing Option<T> fields with None).
2600                if validator_json
2601                    .as_object()
2602                    .is_some_and(|obj| obj.contains_key("x25519_key"))
2603                {
2604                    continue;
2605                }
2606
2607                // Deserialize (serde_json fills missing Option fields with None),
2608                // then re-serialize to ensure x25519_key and p2p_addr are present.
2609                let validator: RegisteredValidator<PubKey> =
2610                    serde_json::from_value(validator_json).context("deserializing validator")?;
2611
2612                let new_json = serde_json::to_value(&validator).context("serializing validator")?;
2613
2614                tracing::debug!(epoch, %address, "migrating x25519 keys for validator");
2615                tx.execute(
2616                    query(
2617                        "UPDATE stake_table_validators SET validator = $1 WHERE epoch = $2 AND \
2618                         address = $3",
2619                    )
2620                    .bind(&new_json)
2621                    .bind(epoch)
2622                    .bind(&address),
2623                )
2624                .await?;
2625            }
2626            Self::mark_migration_complete(&mut tx, name, "stake_table_validators", num_rows)
2627                .await?;
2628            tx.commit().await?;
2629            tracing::info!(
2630                num_rows,
2631                "x25519_keys migration completed for stake_table_validators"
2632            );
2633        }
2634
2635        Ok(())
2636    }
2637
2638    async fn store_next_epoch_quorum_certificate(
2639        &self,
2640        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
2641    ) -> anyhow::Result<()> {
2642        let qc2_bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
2643        WRITE_BACKOFF
2644            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2645                let mut tx = self.db.write().await?;
2646                tx.upsert(
2647                    "next_epoch_quorum_certificate",
2648                    ["id", "data"],
2649                    ["id"],
2650                    [(true, qc2_bytes.clone())],
2651                )
2652                .await?;
2653                tx.commit().await
2654            })
2655            .await
2656    }
2657
2658    async fn load_next_epoch_quorum_certificate(
2659        &self,
2660    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
2661        let result = self
2662            .db
2663            .read()
2664            .await?
2665            .fetch_optional("SELECT * FROM next_epoch_quorum_certificate where id = true")
2666            .await?;
2667
2668        result
2669            .map(|row| {
2670                let bytes: Vec<u8> = row.get("data");
2671                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
2672            })
2673            .transpose()
2674    }
2675
2676    async fn store_eqc(
2677        &self,
2678        high_qc: QuorumCertificate2<SeqTypes>,
2679        next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
2680    ) -> anyhow::Result<()> {
2681        let eqc_bytes =
2682            bincode::serialize(&(high_qc, next_epoch_high_qc)).context("serializing eqc")?;
2683        WRITE_BACKOFF
2684            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2685                let mut tx = self.db.write().await?;
2686                tx.upsert("eqc", ["id", "data"], ["id"], [(true, eqc_bytes.clone())])
2687                    .await?;
2688                tx.commit().await
2689            })
2690            .await
2691    }
2692
2693    async fn load_eqc(
2694        &self,
2695    ) -> Option<(
2696        QuorumCertificate2<SeqTypes>,
2697        NextEpochQuorumCertificate2<SeqTypes>,
2698    )> {
2699        let result = self
2700            .db
2701            .read()
2702            .await
2703            .ok()?
2704            .fetch_optional("SELECT * FROM eqc where id = true")
2705            .await
2706            .ok()?;
2707
2708        result
2709            .map(|row| {
2710                let bytes: Vec<u8> = row.get("data");
2711                bincode::deserialize(&bytes)
2712            })
2713            .transpose()
2714            .ok()?
2715    }
2716
2717    async fn append_da2(
2718        &self,
2719        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
2720        vid_commit: VidCommitment,
2721    ) -> anyhow::Result<()> {
2722        let data = &proposal.data;
2723        let view = data.view_number().u64();
2724        let data_bytes = bincode::serialize(proposal).unwrap();
2725
2726        let now = Instant::now();
2727        let res = WRITE_BACKOFF
2728            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2729                let mut tx = self.db.write().await?;
2730                tx.upsert(
2731                    "da_proposal2",
2732                    ["view", "data", "payload_hash"],
2733                    ["view"],
2734                    [(view as i64, data_bytes.clone(), vid_commit.to_string())],
2735                )
2736                .await?;
2737                tx.commit().await
2738            })
2739            .await;
2740        self.internal_metrics
2741            .internal_append_da2_duration
2742            .add_point(now.elapsed().as_secs_f64());
2743        res
2744    }
2745
2746    async fn store_drb_result(
2747        &self,
2748        epoch: EpochNumber,
2749        drb_result: DrbResult,
2750    ) -> anyhow::Result<()> {
2751        let epoch_i64 = epoch.u64() as i64;
2752        let drb_result_vec = Vec::from(drb_result);
2753        WRITE_BACKOFF
2754            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2755                let mut tx = self.db.write().await?;
2756                tx.upsert(
2757                    "epoch_drb_and_root",
2758                    ["epoch", "drb_result"],
2759                    ["epoch"],
2760                    [(epoch_i64, drb_result_vec.clone())],
2761                )
2762                .await?;
2763                tx.commit().await
2764            })
2765            .await
2766    }
2767
2768    async fn store_epoch_root(
2769        &self,
2770        epoch: EpochNumber,
2771        block_header: <SeqTypes as NodeType>::BlockHeader,
2772    ) -> anyhow::Result<()> {
2773        let epoch_i64 = epoch.u64() as i64;
2774        let block_header_bytes =
2775            bincode::serialize(&block_header).context("serializing block header")?;
2776
2777        WRITE_BACKOFF
2778            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2779                let mut tx = self.db.write().await?;
2780                tx.upsert(
2781                    "epoch_drb_and_root",
2782                    ["epoch", "block_header"],
2783                    ["epoch"],
2784                    [(epoch_i64, block_header_bytes.clone())],
2785                )
2786                .await?;
2787                tx.commit().await
2788            })
2789            .await
2790    }
2791
2792    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
2793        if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
2794            if loaded_drb_input.difficulty_level != drb_input.difficulty_level {
2795                tracing::error!("Overwriting {loaded_drb_input:?} in storage with {drb_input:?}");
2796            } else if loaded_drb_input.iteration >= drb_input.iteration {
2797                anyhow::bail!(
2798                    "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
2799                    loaded_drb_input,
2800                    drb_input
2801                )
2802            }
2803        }
2804
2805        let drb_epoch_i64 = drb_input.epoch as i64;
2806        let drb_input_bytes = bincode::serialize(&drb_input)
2807            .context("Failed to serialize DrbInput. This is not fatal, but should never happen.")?;
2808
2809        WRITE_BACKOFF
2810            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2811                let mut tx = self.db.write().await?;
2812                tx.upsert(
2813                    "drb",
2814                    ["epoch", "drb_input"],
2815                    ["epoch"],
2816                    [(drb_epoch_i64, drb_input_bytes.clone())],
2817                )
2818                .await?;
2819                tx.commit().await
2820            })
2821            .await
2822    }
2823
2824    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
2825        let row = self
2826            .db
2827            .read()
2828            .await?
2829            .fetch_optional(query("SELECT drb_input FROM drb WHERE epoch = $1").bind(epoch as i64))
2830            .await?;
2831
2832        match row {
2833            None => anyhow::bail!("No DrbInput for epoch {} in storage", epoch),
2834            Some(row) => {
2835                let drb_input_bytes: Vec<u8> = row.try_get("drb_input")?;
2836                let drb_input = bincode::deserialize(&drb_input_bytes)
2837                    .context("Failed to deserialize drb_input from storage")?;
2838
2839                Ok(drb_input)
2840            },
2841        }
2842    }
2843
2844    async fn add_state_cert(
2845        &self,
2846        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
2847    ) -> anyhow::Result<()> {
2848        let view_number = state_cert.light_client_state.view_number as i64;
2849        let state_cert_bytes = bincode::serialize(&state_cert)
2850            .context("serializing light client state update certificate")?;
2851
2852        WRITE_BACKOFF
2853            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2854                let mut tx = self.db.write().await?;
2855                tx.upsert(
2856                    "state_cert",
2857                    ["view", "state_cert"],
2858                    ["view"],
2859                    [(view_number, state_cert_bytes.clone())],
2860                )
2861                .await?;
2862                tx.commit().await
2863            })
2864            .await
2865    }
2866
2867    async fn load_state_cert(
2868        &self,
2869    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
2870        let Some(row) = self
2871            .db
2872            .read()
2873            .await?
2874            .fetch_optional(
2875                "SELECT state_cert FROM finalized_state_cert ORDER BY epoch DESC LIMIT 1",
2876            )
2877            .await?
2878        else {
2879            return Ok(None);
2880        };
2881        let bytes: Vec<u8> = row.get("state_cert");
2882
2883        let cert = match bincode::deserialize(&bytes) {
2884            Ok(cert) => cert,
2885            Err(err) => {
2886                tracing::info!(
2887                    error = %err,
2888                    "Failed to deserialize state certificate with v2. attempting with v1"
2889                );
2890
2891                let v1_cert =
2892                    bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
2893                        .with_context(|| {
2894                        format!("Failed to deserialize using both v1 and v2. error: {err}")
2895                    })?;
2896
2897                v1_cert.into()
2898            },
2899        };
2900
2901        Ok(Some(cert))
2902    }
2903
2904    async fn get_state_cert_by_epoch(
2905        &self,
2906        epoch: u64,
2907    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
2908        let Some(row) = self
2909            .db
2910            .read()
2911            .await?
2912            .fetch_optional(
2913                query("SELECT state_cert FROM finalized_state_cert WHERE epoch = $1")
2914                    .bind(epoch as i64),
2915            )
2916            .await?
2917        else {
2918            return Ok(None);
2919        };
2920        let bytes: Vec<u8> = row.get("state_cert");
2921
2922        let cert = match bincode::deserialize(&bytes) {
2923            Ok(cert) => cert,
2924            Err(err) => {
2925                tracing::info!(
2926                    error = %err,
2927                    "Failed to deserialize state certificate with v2. attempting with v1"
2928                );
2929
2930                let v1_cert =
2931                    bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
2932                        .with_context(|| {
2933                        format!("Failed to deserialize using both v1 and v2. error: {err}")
2934                    })?;
2935
2936                v1_cert.into()
2937            },
2938        };
2939
2940        Ok(Some(cert))
2941    }
2942
2943    async fn insert_state_cert(
2944        &self,
2945        epoch: u64,
2946        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
2947    ) -> anyhow::Result<()> {
2948        let epoch_i64 = epoch as i64;
2949        let bytes = bincode::serialize(&cert)
2950            .with_context(|| format!("Failed to serialize state cert for epoch {epoch}"))?;
2951
2952        WRITE_BACKOFF
2953            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
2954                let mut tx = self.db.write().await?;
2955                tx.upsert(
2956                    "finalized_state_cert",
2957                    ["epoch", "state_cert"],
2958                    ["epoch"],
2959                    [(epoch_i64, bytes.clone())],
2960                )
2961                .await?;
2962                tx.commit().await
2963            })
2964            .await
2965    }
2966
2967    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
2968        let rows = self
2969            .db
2970            .read()
2971            .await?
2972            .fetch_all(
2973                query("SELECT * from epoch_drb_and_root ORDER BY epoch DESC LIMIT $1")
2974                    .bind(RECENT_STAKE_TABLES_LIMIT as i64),
2975            )
2976            .await?;
2977
2978        // reverse the rows vector to return the most recent epochs, but in ascending order
2979        rows.into_iter()
2980            .rev()
2981            .map(|row| {
2982                let epoch: i64 = row.try_get("epoch")?;
2983                let drb_result: Option<Vec<u8>> = row.try_get("drb_result")?;
2984                let block_header: Option<Vec<u8>> = row.try_get("block_header")?;
2985                if let Some(drb_result) = drb_result {
2986                    let drb_result_array = drb_result
2987                        .try_into()
2988                        .or_else(|_| bail!("invalid drb result"))?;
2989                    let block_header: Option<<SeqTypes as NodeType>::BlockHeader> = block_header
2990                        .map(|data| bincode::deserialize(&data))
2991                        .transpose()?;
2992                    Ok(Some(InitializerEpochInfo::<SeqTypes> {
2993                        epoch: EpochNumber::new(epoch as u64),
2994                        drb_result: drb_result_array,
2995                        block_header,
2996                    }))
2997                } else {
2998                    // Right now we skip the epoch_drb_and_root row if there is no drb result.
2999                    // This seems reasonable based on the expected order of events, but please double check!
3000                    Ok(None)
3001                }
3002            })
3003            .filter_map(|e| match e {
3004                Err(v) => Some(Err(v)),
3005                Ok(Some(v)) => Some(Ok(v)),
3006                Ok(None) => None,
3007            })
3008            .collect()
3009    }
3010
3011    fn enable_metrics(&mut self, metrics: &dyn Metrics) {
3012        self.internal_metrics = PersistenceMetricsValue::new(metrics);
3013    }
3014}
3015
3016#[async_trait]
3017impl MembershipPersistence for Persistence {
3018    async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<StakeTuple>> {
3019        let result = self
3020            .db
3021            .read()
3022            .await?
3023            .fetch_optional(
3024                query(
3025                    "SELECT stake, block_reward, stake_table_hash FROM epoch_drb_and_root WHERE \
3026                     epoch = $1",
3027                )
3028                .bind(epoch.u64() as i64),
3029            )
3030            .await?;
3031
3032        result
3033            .map(|row| {
3034                let stake_table_bytes: Vec<u8> = row.get("stake");
3035                let reward_bytes: Option<Vec<u8>> = row.get("block_reward");
3036                let stake_table_hash_bytes: Option<Vec<u8>> = row.get("stake_table_hash");
3037                let stake_table: AuthenticatedValidatorMap =
3038                    bincode::deserialize(&stake_table_bytes)
3039                        .context("deserializing stake table")?;
3040                let reward: Option<RewardAmount> = reward_bytes
3041                    .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
3042                    .transpose()?;
3043                let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes
3044                    .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
3045                    .transpose()?;
3046
3047                Ok((stake_table, reward, stake_table_hash))
3048            })
3049            .transpose()
3050    }
3051
3052    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
3053        let mut tx = self.db.read().await?;
3054
3055        let rows = match query_as::<(i64, Vec<u8>, Option<Vec<u8>>, Option<Vec<u8>>)>(
3056            "SELECT epoch, stake, block_reward, stake_table_hash FROM epoch_drb_and_root WHERE \
3057             stake is NOT NULL ORDER BY epoch DESC LIMIT $1",
3058        )
3059        .bind(limit as i64)
3060        .fetch_all(tx.as_mut())
3061        .await
3062        {
3063            Ok(bytes) => bytes,
3064            Err(err) => {
3065                tracing::error!("error loading stake tables: {err:#}");
3066                bail!("{err:#}");
3067            },
3068        };
3069
3070        let stakes: anyhow::Result<Vec<IndexedStake>> = rows
3071            .into_iter()
3072            .map(
3073                |(id, stake_bytes, reward_bytes_opt, stake_table_hash_bytes_opt)| {
3074                    let stake_table: AuthenticatedValidatorMap =
3075                        bincode::deserialize(&stake_bytes).context("deserializing stake table")?;
3076
3077                    let block_reward: Option<RewardAmount> = reward_bytes_opt
3078                        .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
3079                        .transpose()?;
3080
3081                    let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes_opt
3082                        .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
3083                        .transpose()?;
3084
3085                    Ok((
3086                        EpochNumber::new(id as u64),
3087                        (stake_table, block_reward),
3088                        stake_table_hash,
3089                    ))
3090                },
3091            )
3092            .collect();
3093
3094        Ok(Some(stakes?))
3095    }
3096
3097    async fn store_stake(
3098        &self,
3099        epoch: EpochNumber,
3100        stake: AuthenticatedValidatorMap,
3101        block_reward: Option<RewardAmount>,
3102        stake_table_hash: Option<StakeTableHash>,
3103    ) -> anyhow::Result<()> {
3104        let epoch_i64 = epoch.u64() as i64;
3105        let stake_table_bytes = bincode::serialize(&stake).context("serializing stake table")?;
3106        let reward_bytes = block_reward
3107            .map(|r| bincode::serialize(&r).context("serializing block reward"))
3108            .transpose()?;
3109        let stake_table_hash_bytes = stake_table_hash
3110            .map(|h| bincode::serialize(&h).context("serializing stake table hash"))
3111            .transpose()?;
3112        WRITE_BACKOFF
3113            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
3114                let mut tx = self.db.write().await?;
3115                tx.upsert(
3116                    "epoch_drb_and_root",
3117                    ["epoch", "stake", "block_reward", "stake_table_hash"],
3118                    ["epoch"],
3119                    [(
3120                        epoch_i64,
3121                        stake_table_bytes.clone(),
3122                        reward_bytes.clone(),
3123                        stake_table_hash_bytes.clone(),
3124                    )],
3125                )
3126                .await?;
3127                tx.commit().await
3128            })
3129            .await
3130    }
3131
3132    async fn store_events(
3133        &self,
3134        l1_finalized: u64,
3135        events: Vec<(EventKey, StakeTableEvent)>,
3136    ) -> anyhow::Result<()> {
3137        let l1_finalized_i64: i64 = l1_finalized.try_into()?;
3138        let serialized_events = events
3139            .into_iter()
3140            .map(|((block_number, index), event)| {
3141                Ok((
3142                    i64::try_from(block_number)?,
3143                    i64::try_from(index)?,
3144                    serde_json::to_value(event).context("l1 event to value")?,
3145                ))
3146            })
3147            .collect::<anyhow::Result<Vec<_>>>()?;
3148
3149        WRITE_BACKOFF
3150            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
3151                let mut tx = self.db.write().await?;
3152
3153                // check last l1 block if there is any
3154                let last_processed_l1_block = query_as::<(i64,)>(
3155                    "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
3156                )
3157                .fetch_optional(tx.as_mut())
3158                .await?
3159                .map(|(l1,)| l1);
3160
3161                tracing::debug!("last l1 finalizes in database = {last_processed_l1_block:?}");
3162
3163                // skip events storage if the database already has higher l1 block events
3164                let serialized_events_len = serialized_events.len();
3165                if last_processed_l1_block > Some(l1_finalized_i64) {
3166                    tracing::debug!(
3167                        ?last_processed_l1_block,
3168                        l1_finalized,
3169                        serialized_events_len,
3170                        "last l1 finalized stored is already higher"
3171                    );
3172                    return Ok(());
3173                }
3174
3175                if !serialized_events.is_empty() {
3176                    let mut query_builder: sqlx::QueryBuilder<Db> = sqlx::QueryBuilder::new(
3177                        "INSERT INTO stake_table_events (l1_block, log_index, event) ",
3178                    );
3179
3180                    query_builder.push_values(
3181                        serialized_events.iter().cloned(),
3182                        |mut b, (l1_block, log_index, event)| {
3183                            b.push_bind(l1_block).push_bind(log_index).push_bind(event);
3184                        },
3185                    );
3186
3187                    query_builder.push(" ON CONFLICT DO NOTHING");
3188                    let query = query_builder.build();
3189
3190                    query.execute(tx.as_mut()).await?;
3191                }
3192
3193                // update l1 block
3194                tx.upsert(
3195                    "stake_table_events_l1_block",
3196                    ["id", "last_l1_block"],
3197                    ["id"],
3198                    [(0_i32, l1_finalized_i64)],
3199                )
3200                .await?;
3201
3202                tx.commit().await?;
3203
3204                Ok(())
3205            })
3206            .await
3207    }
3208
3209    /// Loads all events from persistent storage up to the specified L1 block.
3210    ///
3211    /// # Returns
3212    ///
3213    /// Returns a tuple containing:
3214    /// - `Option<u64>` - The queried L1 block for which all events have been successfully fetched.
3215    /// - `Vec<(EventKey, StakeTableEvent)>` - A list of events, where each entry is a tuple of the event key
3216    /// event key is (l1 block number, log index)
3217    ///   and the corresponding StakeTable event.
3218    ///
3219    async fn load_events(
3220        &self,
3221        from_l1_block: u64,
3222        to_l1_block: u64,
3223    ) -> anyhow::Result<(
3224        Option<EventsPersistenceRead>,
3225        Vec<(EventKey, StakeTableEvent)>,
3226    )> {
3227        let mut tx = self.db.read().await?;
3228
3229        // check last l1 block if there is any
3230        let res = query_as::<(i64,)>(
3231            "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
3232        )
3233        .fetch_optional(tx.as_mut())
3234        .await?;
3235
3236        let Some((last_processed_l1_block,)) = res else {
3237            // this just means we dont have any events stored
3238            return Ok((None, Vec::new()));
3239        };
3240
3241        // Determine the L1 block for querying events.
3242        // If the last stored L1 block is greater than the requested block, limit the query to the requested block.
3243        // Otherwise, query up to the last stored block.
3244        let to_l1_block = to_l1_block.try_into()?;
3245        let query_l1_block = if last_processed_l1_block > to_l1_block {
3246            to_l1_block
3247        } else {
3248            last_processed_l1_block
3249        };
3250
3251        let rows = query(
3252            "SELECT l1_block, log_index, event FROM stake_table_events WHERE $1 <= l1_block AND \
3253             l1_block <= $2 ORDER BY l1_block ASC, log_index ASC",
3254        )
3255        .bind(i64::try_from(from_l1_block)?)
3256        .bind(query_l1_block)
3257        .fetch_all(tx.as_mut())
3258        .await?;
3259
3260        let events = rows
3261            .into_iter()
3262            .map(|row| {
3263                let l1_block: i64 = row.try_get("l1_block")?;
3264                let log_index: i64 = row.try_get("log_index")?;
3265                let event = serde_json::from_value(row.try_get("event")?)?;
3266
3267                Ok(((l1_block.try_into()?, log_index.try_into()?), event))
3268            })
3269            .collect::<anyhow::Result<Vec<_>>>()?;
3270
3271        // Determine the read state based on the queried block range.
3272        // - If the persistence returned events up to the requested block, the read is complete.
3273        // - Otherwise, indicate that the read is up to the last processed block.
3274        if query_l1_block == to_l1_block {
3275            Ok((Some(EventsPersistenceRead::Complete), events))
3276        } else {
3277            Ok((
3278                Some(EventsPersistenceRead::UntilL1Block(
3279                    query_l1_block.try_into()?,
3280                )),
3281                events,
3282            ))
3283        }
3284    }
3285
3286    async fn delete_stake_tables(&self) -> anyhow::Result<()> {
3287        WRITE_BACKOFF
3288            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
3289                let mut tx = self.db.write().await?;
3290                #[cfg(not(feature = "embedded-db"))]
3291                query(
3292                    "TRUNCATE stake_table_events, stake_table_events_l1_block, \
3293                     epoch_drb_and_root, stake_table_validators",
3294                )
3295                .execute(tx.as_mut())
3296                .await?;
3297                #[cfg(feature = "embedded-db")]
3298                {
3299                    query("DELETE FROM stake_table_events")
3300                        .execute(tx.as_mut())
3301                        .await?;
3302                    query("DELETE FROM stake_table_events_l1_block")
3303                        .execute(tx.as_mut())
3304                        .await?;
3305                    query("DELETE FROM epoch_drb_and_root")
3306                        .execute(tx.as_mut())
3307                        .await?;
3308                    query("DELETE FROM stake_table_validators")
3309                        .execute(tx.as_mut())
3310                        .await?;
3311                }
3312                tx.commit().await?;
3313                Ok(())
3314            })
3315            .await
3316    }
3317
3318    async fn store_all_validators(
3319        &self,
3320        epoch: EpochNumber,
3321        all_validators: RegisteredValidatorMap,
3322    ) -> anyhow::Result<()> {
3323        if all_validators.is_empty() {
3324            return Ok(());
3325        }
3326
3327        let epoch_i64 = epoch.u64() as i64;
3328        let serialized_validators = all_validators
3329            .into_iter()
3330            .map(|(address, validator)| {
3331                let validator_json =
3332                    serde_json::to_value(&validator).context("serializing validator to json")?;
3333                Ok((address.to_string(), validator_json))
3334            })
3335            .collect::<anyhow::Result<Vec<_>>>()?;
3336
3337        WRITE_BACKOFF
3338            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
3339                let mut tx = self.db.write().await?;
3340
3341                let mut query_builder = QueryBuilder::new(
3342                    "INSERT INTO stake_table_validators (epoch, address, validator) ",
3343                );
3344
3345                query_builder.push_values(
3346                    serialized_validators.iter().cloned(),
3347                    |mut b, (address, validator)| {
3348                        b.push_bind(epoch_i64)
3349                            .push_bind(address)
3350                            .push_bind(validator);
3351                    },
3352                );
3353
3354                query_builder.push(
3355                    " ON CONFLICT (epoch, address) DO UPDATE SET validator = EXCLUDED.validator",
3356                );
3357
3358                let query = query_builder.build();
3359
3360                query.execute(tx.as_mut()).await?;
3361
3362                tx.commit().await?;
3363                Ok(())
3364            })
3365            .await
3366    }
3367
3368    async fn load_all_validators(
3369        &self,
3370        epoch: EpochNumber,
3371        offset: u64,
3372        limit: u64,
3373    ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>> {
3374        let mut tx = self.db.read().await?;
3375
3376        // Use LOWER(address) in ORDER BY to ensure consistent ordering for SQlite and Postgres.
3377        // Postgres sorts text case sensitively by default, while SQLite sorts case insensitively.
3378        // Applying LOWER() makes the result consistent.
3379        let rows = query(
3380            "SELECT address, validator
3381         FROM stake_table_validators
3382         WHERE epoch = $1
3383         ORDER BY LOWER(address) ASC
3384         LIMIT $2 OFFSET $3",
3385        )
3386        .bind(epoch.u64() as i64)
3387        .bind(limit as i64)
3388        .bind(offset as i64)
3389        .fetch_all(tx.as_mut())
3390        .await?;
3391        rows.into_iter()
3392            .map(|row| {
3393                let validator_json: serde_json::Value = row.try_get("validator")?;
3394                serde_json::from_value::<RegisteredValidator<PubKey>>(validator_json)
3395                    .map_err(Into::into)
3396            })
3397            .collect()
3398    }
3399}
3400
3401#[async_trait]
3402impl DhtPersistentStorage for Persistence {
3403    /// Save the DHT to the database
3404    ///
3405    /// # Errors
3406    /// - If we fail to serialize the records
3407    /// - If we fail to write the serialized records to the DB
3408    async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
3409        // Bincode-serialize the records
3410        let to_save =
3411            bincode::serialize(&records).with_context(|| "failed to serialize records")?;
3412
3413        // Prepare the statement
3414        let stmt = "INSERT INTO libp2p_dht (id, serialized_records) VALUES (0, $1) ON CONFLICT \
3415                    (id) DO UPDATE SET serialized_records = $1";
3416
3417        WRITE_BACKOFF
3418            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || async {
3419                // Execute the query
3420                let mut tx = self
3421                    .db
3422                    .write()
3423                    .await
3424                    .with_context(|| "failed to start an atomic DB transaction")?;
3425                tx.execute(query(stmt).bind(to_save.clone()))
3426                    .await
3427                    .with_context(|| "failed to execute DB query")?;
3428
3429                // Commit the state
3430                tx.commit().await.with_context(|| "failed to commit to DB")
3431            })
3432            .await
3433    }
3434
3435    /// Load the DHT from the database
3436    ///
3437    /// # Errors
3438    /// - If we fail to read from the DB
3439    /// - If we fail to deserialize the records
3440    async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
3441        // Fetch the results from the DB
3442        let result = self
3443            .db
3444            .read()
3445            .await
3446            .with_context(|| "failed to start a DB read transaction")?
3447            .fetch_one("SELECT * FROM libp2p_dht where id = 0")
3448            .await
3449            .with_context(|| "failed to fetch from DB")?;
3450
3451        // Get the `serialized_records` row
3452        let serialied_records: Vec<u8> = result.get("serialized_records");
3453
3454        // Deserialize it
3455        let records: Vec<SerializableRecord> = bincode::deserialize(&serialied_records)
3456            .with_context(|| "Failed to deserialize records")?;
3457
3458        Ok(records)
3459    }
3460}
3461
3462#[async_trait]
3463impl Provider<SeqTypes, VidCommonRequest> for Persistence {
3464    #[tracing::instrument(skip(self))]
3465    async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
3466        let mut tx = match self.db.read().await {
3467            Ok(tx) => tx,
3468            Err(err) => {
3469                tracing::warn!("could not open transaction: {err:#}");
3470                return None;
3471            },
3472        };
3473
3474        let bytes = match query_as::<(Vec<u8>,)>(
3475            "SELECT data FROM vid_share2 WHERE payload_hash = $1 LIMIT 1",
3476        )
3477        .bind(req.0.to_string())
3478        .fetch_optional(tx.as_mut())
3479        .await
3480        {
3481            Ok(Some((bytes,))) => bytes,
3482            Ok(None) => return None,
3483            Err(err) => {
3484                tracing::error!("error loading VID share: {err:#}");
3485                return None;
3486            },
3487        };
3488
3489        let share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
3490            match bincode::deserialize(&bytes) {
3491                Ok(share) => share,
3492                Err(err) => {
3493                    tracing::warn!("error decoding VID share: {err:#}");
3494                    return None;
3495                },
3496            };
3497
3498        match share.data {
3499            VidDisperseShare::V0(vid) => Some(VidCommon::V0(vid.common)),
3500            VidDisperseShare::V1(vid) => Some(VidCommon::V1(vid.common)),
3501            VidDisperseShare::V2(vid) => Some(VidCommon::V2(vid.common)),
3502        }
3503    }
3504}
3505
3506#[async_trait]
3507impl Provider<SeqTypes, PayloadRequest> for Persistence {
3508    #[tracing::instrument(skip(self))]
3509    async fn fetch(&self, req: PayloadRequest) -> Option<Payload> {
3510        let mut tx = match self.db.read().await {
3511            Ok(tx) => tx,
3512            Err(err) => {
3513                tracing::warn!("could not open transaction: {err:#}");
3514                return None;
3515            },
3516        };
3517
3518        let bytes = match query_as::<(Vec<u8>,)>(
3519            "SELECT data FROM da_proposal2 WHERE payload_hash = $1 LIMIT 1",
3520        )
3521        .bind(req.0.to_string())
3522        .fetch_optional(tx.as_mut())
3523        .await
3524        {
3525            Ok(Some((bytes,))) => bytes,
3526            Ok(None) => return None,
3527            Err(err) => {
3528                tracing::warn!("error loading DA proposal: {err:#}");
3529                return None;
3530            },
3531        };
3532
3533        let proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> = match bincode::deserialize(&bytes)
3534        {
3535            Ok(proposal) => proposal,
3536            Err(err) => {
3537                tracing::error!("error decoding DA proposal: {err:#}");
3538                return None;
3539            },
3540        };
3541
3542        Some(Payload::from_bytes(
3543            &proposal.data.encoded_transactions,
3544            &proposal.data.metadata,
3545        ))
3546    }
3547}
3548
3549#[cfg(test)]
3550mod testing {
3551    use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
3552
3553    use super::*;
3554    use crate::persistence::tests::TestablePersistence;
3555
3556    #[async_trait]
3557    impl TestablePersistence for Persistence {
3558        type Storage = Arc<TmpDb>;
3559
3560        async fn tmp_storage() -> Self::Storage {
3561            Arc::new(TmpDb::init().await)
3562        }
3563
3564        #[allow(refining_impl_trait)]
3565        fn options(db: &Self::Storage) -> Options {
3566            #[cfg(not(feature = "embedded-db"))]
3567            {
3568                PostgresOptions {
3569                    port: Some(db.port()),
3570                    host: Some(db.host()),
3571                    user: Some("postgres".into()),
3572                    password: Some("password".into()),
3573                    ..Default::default()
3574                }
3575                .into()
3576            }
3577
3578            #[cfg(feature = "embedded-db")]
3579            {
3580                SqliteOptions { path: db.path() }.into()
3581            }
3582        }
3583    }
3584}
3585
3586#[cfg(test)]
3587mod test {
3588    use std::sync::atomic::{AtomicU32, Ordering};
3589
3590    use committable::{Commitment, CommitmentBoundsArkless};
3591    use espresso_types::{Header, Leaf, NodeState, ValidatedState, traits::NullEventConsumer};
3592    use futures::stream::TryStreamExt;
3593    use hotshot_example_types::node_types::TEST_VERSIONS;
3594    use hotshot_types::{
3595        data::{
3596            EpochNumber, QuorumProposal2, ns_table::parse_ns_table,
3597            vid_disperse::AvidMDisperseShare,
3598        },
3599        message::convert_proposal,
3600        simple_certificate::QuorumCertificate,
3601        simple_vote::QuorumData,
3602        traits::{
3603            EncodeBytes,
3604            block_contents::{BlockHeader, GENESIS_VID_NUM_STORAGE_NODES},
3605            signature_key::SignatureKey,
3606        },
3607        utils::EpochTransitionIndicator,
3608        vid::{
3609            advz::advz_scheme,
3610            avidm::{AvidMScheme, init_avidm_param},
3611        },
3612    };
3613    use jf_advz::VidScheme;
3614
3615    use super::*;
3616    use crate::{BLSPubKey, PubKey, persistence::tests::TestablePersistence as _};
3617
3618    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3619    async fn test_quorum_proposals_leaf_hash_migration() {
3620        // Create some quorum proposals to test with.
3621        let leaf: Leaf2 = Leaf::genesis(
3622            &ValidatedState::default(),
3623            &NodeState::mock(),
3624            TEST_VERSIONS.test.base,
3625        )
3626        .await
3627        .into();
3628        let privkey = BLSPubKey::generated_from_seed_indexed([0; 32], 1).1;
3629        let signature = PubKey::sign(&privkey, &[]).unwrap();
3630        let mut quorum_proposal = Proposal {
3631            data: QuorumProposal2::<SeqTypes> {
3632                epoch: None,
3633                block_header: leaf.block_header().clone(),
3634                view_number: ViewNumber::genesis(),
3635                justify_qc: QuorumCertificate::genesis(
3636                    &ValidatedState::default(),
3637                    &NodeState::mock(),
3638                    TEST_VERSIONS.test,
3639                )
3640                .await
3641                .to_qc2(),
3642                upgrade_certificate: None,
3643                view_change_evidence: None,
3644                next_drb_result: None,
3645                next_epoch_justify_qc: None,
3646                state_cert: None,
3647            },
3648            signature,
3649            _pd: Default::default(),
3650        };
3651
3652        let qp1: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
3653            convert_proposal(quorum_proposal.clone());
3654
3655        quorum_proposal.data.view_number = ViewNumber::new(1);
3656
3657        let qp2: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
3658            convert_proposal(quorum_proposal.clone());
3659        let qps = [qp1, qp2];
3660
3661        // Create persistence and add the quorum proposals with NULL leaf hash.
3662        let db = Persistence::tmp_storage().await;
3663        let persistence = Persistence::connect(&db).await;
3664        let mut tx = persistence.db.write().await.unwrap();
3665        let params = qps
3666            .iter()
3667            .map(|qp| {
3668                (
3669                    qp.data.view_number.u64() as i64,
3670                    bincode::serialize(&qp).unwrap(),
3671                )
3672            })
3673            .collect::<Vec<_>>();
3674        tx.upsert("quorum_proposals", ["view", "data"], ["view"], params)
3675            .await
3676            .unwrap();
3677        tx.commit().await.unwrap();
3678
3679        // Create a new persistence and ensure the commitments get populated.
3680        let persistence = Persistence::connect(&db).await;
3681        let mut tx = persistence.db.read().await.unwrap();
3682        let rows = tx
3683            .fetch("SELECT * FROM quorum_proposals ORDER BY view ASC")
3684            .try_collect::<Vec<_>>()
3685            .await
3686            .unwrap();
3687        assert_eq!(rows.len(), qps.len());
3688        for (row, qp) in rows.into_iter().zip(qps) {
3689            assert_eq!(row.get::<i64, _>("view"), qp.data.view_number.u64() as i64);
3690            assert_eq!(
3691                row.get::<Vec<u8>, _>("data"),
3692                bincode::serialize(&qp).unwrap()
3693            );
3694            assert_eq!(
3695                row.get::<String, _>("leaf_hash"),
3696                Committable::commit(&Leaf::from_quorum_proposal(&qp.data)).to_string()
3697            );
3698        }
3699    }
3700
3701    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3702    async fn test_x25519_keys_migration() {
3703        use std::collections::HashMap;
3704
3705        use crate::persistence::RegisteredValidatorNoX25519;
3706
3707        let mut validator = RegisteredValidator::mock();
3708        validator.delegators.clear();
3709        validator.stake = alloy::primitives::U256::from(1000u64);
3710
3711        let epoch = 1i64;
3712        let address = validator.account;
3713
3714        // Create legacy data without x25519 fields
3715        let legacy = RegisteredValidatorNoX25519 {
3716            account: validator.account,
3717            stake_table_key: validator.stake_table_key,
3718            state_ver_key: validator.state_ver_key.clone(),
3719            stake: validator.stake,
3720            commission: validator.commission,
3721            delegators: HashMap::new(),
3722            authenticated: true,
3723        };
3724
3725        // Bincode: serialize as legacy map
3726        let mut legacy_map: IndexMap<Address, RegisteredValidatorNoX25519> = IndexMap::new();
3727        legacy_map.insert(address, legacy);
3728        let stake_bytes = bincode::serialize(&legacy_map).unwrap();
3729
3730        // JSON: serialize without x25519 fields
3731        let json_legacy = RegisteredValidatorNoX25519 {
3732            account: validator.account,
3733            stake_table_key: validator.stake_table_key,
3734            state_ver_key: validator.state_ver_key.clone(),
3735            stake: validator.stake,
3736            commission: validator.commission,
3737            delegators: HashMap::new(),
3738            authenticated: true,
3739        };
3740        let validator_json = serde_json::to_value(&json_legacy).unwrap();
3741
3742        let db = Persistence::tmp_storage().await;
3743        let persistence = Persistence::connect(&db).await;
3744        let mut tx = persistence.db.write().await.unwrap();
3745
3746        tx.execute(
3747            query(
3748                "INSERT INTO stake_table_validators (epoch, address, validator) VALUES ($1, $2, \
3749                 $3)",
3750            )
3751            .bind(epoch)
3752            .bind(format!("{:?}", address))
3753            .bind(&validator_json),
3754        )
3755        .await
3756        .unwrap();
3757
3758        tx.execute(
3759            query("INSERT INTO epoch_drb_and_root (epoch, stake) VALUES ($1, $2)")
3760                .bind(epoch)
3761                .bind(&stake_bytes),
3762        )
3763        .await
3764        .unwrap();
3765
3766        // Reset migration state so it runs on the newly inserted data
3767        tx.execute(query(
3768            "UPDATE data_migrations SET completed = false, migrated_rows = 0 WHERE name = \
3769             'x25519_keys'",
3770        ))
3771        .await
3772        .unwrap();
3773
3774        tx.commit().await.unwrap();
3775
3776        // Verify JSON was inserted without x25519_key
3777        {
3778            let mut tx = persistence.db.read().await.unwrap();
3779            let row: (serde_json::Value,) =
3780                query_as("SELECT validator FROM stake_table_validators WHERE epoch = $1")
3781                    .bind(epoch)
3782                    .fetch_one(tx.as_mut())
3783                    .await
3784                    .unwrap();
3785            let json_obj = row.0.as_object().unwrap();
3786            assert!(!json_obj.contains_key("x25519_key"));
3787        }
3788
3789        // Run migrations
3790        let persistence = Persistence::connect(&db).await;
3791        persistence.migrate_storage().await.unwrap();
3792
3793        // Verify stake_table_validators now has x25519_key field
3794        {
3795            let mut tx = persistence.db.read().await.unwrap();
3796            let row: (serde_json::Value,) =
3797                query_as("SELECT validator FROM stake_table_validators WHERE epoch = $1")
3798                    .bind(epoch)
3799                    .fetch_one(tx.as_mut())
3800                    .await
3801                    .unwrap();
3802            let json_obj = row.0.as_object().unwrap();
3803            assert!(json_obj.contains_key("x25519_key"));
3804            assert!(json_obj.get("x25519_key").unwrap().is_null());
3805        }
3806
3807        // Verify epoch_drb_and_root stake was migrated
3808        {
3809            let mut tx = persistence.db.read().await.unwrap();
3810            let row: (Vec<u8>,) = query_as("SELECT stake FROM epoch_drb_and_root WHERE epoch = $1")
3811                .bind(epoch)
3812                .fetch_one(tx.as_mut())
3813                .await
3814                .unwrap();
3815            let migrated_map: AuthenticatedValidatorMap = bincode::deserialize(&row.0).unwrap();
3816            assert!(migrated_map.contains_key(&address));
3817            let v = migrated_map.get(&address).unwrap();
3818            assert!(v.x25519_key.is_none());
3819        }
3820
3821        // Verify migration tracking
3822        {
3823            let mut tx = persistence.db.read().await.unwrap();
3824            let row: (bool, i64) = query_as(
3825                "SELECT completed, migrated_rows FROM data_migrations WHERE name = 'x25519_keys' \
3826                 AND table_name = 'epoch_drb_and_root'",
3827            )
3828            .fetch_one(tx.as_mut())
3829            .await
3830            .unwrap();
3831            assert!(row.0);
3832            assert_eq!(row.1, 1);
3833        }
3834    }
3835
3836    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3837    async fn test_store_all_validators_authenticated_and_unauthenticated() {
3838        use std::collections::HashMap;
3839
3840        use alloy::primitives::{Address, U256};
3841        use hotshot_types::light_client::StateVerKey;
3842        use indexmap::IndexMap;
3843
3844        let tmp = Persistence::tmp_storage().await;
3845        let storage = Persistence::connect(&tmp).await;
3846
3847        // Create an authenticated validator
3848        let authenticated_validator = RegisteredValidator {
3849            account: Address::random(),
3850            stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
3851            state_ver_key: StateVerKey::default(),
3852            stake: U256::from(1000),
3853            commission: 100,
3854            delegators: HashMap::new(),
3855            authenticated: true,
3856            x25519_key: None,
3857            p2p_addr: None,
3858        };
3859
3860        // Create an unauthenticated validator
3861        let unauthenticated_validator = RegisteredValidator {
3862            account: Address::random(),
3863            stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 1).0,
3864            state_ver_key: StateVerKey::default(),
3865            stake: U256::from(2000),
3866            commission: 200,
3867            delegators: HashMap::new(),
3868            authenticated: false,
3869            x25519_key: None,
3870            p2p_addr: None,
3871        };
3872
3873        let mut validators: IndexMap<Address, RegisteredValidator<BLSPubKey>> = IndexMap::new();
3874        validators.insert(
3875            authenticated_validator.account,
3876            authenticated_validator.clone(),
3877        );
3878        validators.insert(
3879            unauthenticated_validator.account,
3880            unauthenticated_validator.clone(),
3881        );
3882
3883        // Store both validators
3884        storage
3885            .store_all_validators(EpochNumber::new(1), validators)
3886            .await
3887            .unwrap();
3888
3889        // Load and verify
3890        let loaded = storage
3891            .load_all_validators(EpochNumber::new(1), 0, 100)
3892            .await
3893            .unwrap();
3894        assert_eq!(loaded.len(), 2);
3895
3896        // Find each validator and verify authenticated state is preserved
3897        let loaded_auth = loaded
3898            .iter()
3899            .find(|v| v.account == authenticated_validator.account)
3900            .unwrap();
3901        assert!(
3902            loaded_auth.authenticated,
3903            "authenticated validator should remain authenticated"
3904        );
3905
3906        let loaded_unauth = loaded
3907            .iter()
3908            .find(|v| v.account == unauthenticated_validator.account)
3909            .unwrap();
3910        assert!(
3911            !loaded_unauth.authenticated,
3912            "unauthenticated validator should remain unauthenticated"
3913        );
3914    }
3915
3916    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3917    async fn test_fetching_providers() {
3918        let tmp = Persistence::tmp_storage().await;
3919        let storage = Persistence::connect(&tmp).await;
3920
3921        // Mock up some data.
3922        let leaf = Leaf2::genesis(
3923            &ValidatedState::default(),
3924            &NodeState::mock(),
3925            TEST_VERSIONS.test.base,
3926        )
3927        .await;
3928        let leaf_payload = leaf.block_payload().unwrap();
3929        let leaf_payload_bytes_arc = leaf_payload.encode();
3930
3931        let avidm_param = init_avidm_param(2).unwrap();
3932        let weights = vec![1u32; 2];
3933
3934        let ns_table = parse_ns_table(
3935            leaf_payload.byte_len().as_usize(),
3936            &leaf_payload.ns_table().encode(),
3937        );
3938        let (payload_commitment, shares) =
3939            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
3940                .unwrap();
3941        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
3942        let vid_share = convert_proposal(
3943            AvidMDisperseShare::<SeqTypes> {
3944                view_number: ViewNumber::new(0),
3945                payload_commitment,
3946                share: shares[0].clone(),
3947                recipient_key: pubkey,
3948                epoch: None,
3949                target_epoch: None,
3950                common: avidm_param.clone(),
3951            }
3952            .to_proposal(&privkey)
3953            .unwrap()
3954            .clone(),
3955        );
3956
3957        let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
3958            proposal: QuorumProposal2::<SeqTypes> {
3959                block_header: leaf.block_header().clone(),
3960                view_number: leaf.view_number(),
3961                justify_qc: leaf.justify_qc(),
3962                upgrade_certificate: None,
3963                view_change_evidence: None,
3964                next_drb_result: None,
3965                next_epoch_justify_qc: None,
3966                epoch: None,
3967                state_cert: None,
3968            },
3969        };
3970        let quorum_proposal_signature =
3971            BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3972                .expect("Failed to sign quorum proposal");
3973        let quorum_proposal = Proposal {
3974            data: quorum_proposal,
3975            signature: quorum_proposal_signature,
3976            _pd: Default::default(),
3977        };
3978
3979        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
3980            .expect("Failed to sign block payload");
3981        let da_proposal = Proposal {
3982            data: DaProposal2::<SeqTypes> {
3983                encoded_transactions: leaf_payload_bytes_arc,
3984                metadata: leaf_payload.ns_table().clone(),
3985                view_number: ViewNumber::new(0),
3986                epoch: None,
3987                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
3988            },
3989            signature: block_payload_signature,
3990            _pd: Default::default(),
3991        };
3992
3993        let mut next_quorum_proposal = quorum_proposal.clone();
3994        next_quorum_proposal.data.proposal.view_number += 1;
3995        next_quorum_proposal.data.proposal.justify_qc.view_number += 1;
3996        next_quorum_proposal
3997            .data
3998            .proposal
3999            .justify_qc
4000            .data
4001            .leaf_commit = Committable::commit(&leaf.clone());
4002
4003        // Add to database.
4004        storage
4005            .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
4006            .await
4007            .unwrap();
4008        storage.append_vid(&vid_share).await.unwrap();
4009        storage
4010            .append_quorum_proposal2(&quorum_proposal)
4011            .await
4012            .unwrap();
4013
4014        // Add an extra quorum proposal so we have a QC pointing back at `leaf`.
4015        storage
4016            .append_quorum_proposal2(&next_quorum_proposal)
4017            .await
4018            .unwrap();
4019
4020        // Fetch it as if we were rebuilding an archive.
4021        assert_eq!(
4022            Some(VidCommon::V1(avidm_param)),
4023            storage
4024                .fetch(VidCommonRequest(vid_share.data.payload_commitment()))
4025                .await
4026        );
4027        assert_eq!(
4028            leaf_payload,
4029            storage
4030                .fetch(PayloadRequest(vid_share.data.payload_commitment()))
4031                .await
4032                .unwrap()
4033        );
4034    }
4035
4036    /// Test conditions that trigger pruning.
4037    ///
4038    /// This is a configurable test that can be used to test different configurations of GC,
4039    /// `pruning_opt`. The test populates the database with some data for view 1, asserts that it is
4040    /// retained for view 2, and then asserts that it is pruned by view 3. There are various
4041    /// different configurations that can achieve this behavior, such that the data is retained and
4042    /// then pruned due to different logic and code paths.
4043    async fn test_pruning_helper(pruning_opt: ConsensusPruningOptions) {
4044        let tmp = Persistence::tmp_storage().await;
4045        let mut opt = Persistence::options(&tmp);
4046        opt.consensus_pruning = pruning_opt;
4047        let storage = opt.create().await.unwrap();
4048
4049        let data_view = ViewNumber::new(1);
4050
4051        // Populate some data.
4052        let leaf = Leaf2::genesis(
4053            &ValidatedState::default(),
4054            &NodeState::mock(),
4055            TEST_VERSIONS.test.base,
4056        )
4057        .await;
4058        let leaf_payload = leaf.block_payload().unwrap();
4059        let leaf_payload_bytes_arc = leaf_payload.encode();
4060
4061        let avidm_param = init_avidm_param(2).unwrap();
4062        let weights = vec![1u32; 2];
4063
4064        let ns_table = parse_ns_table(
4065            leaf_payload.byte_len().as_usize(),
4066            &leaf_payload.ns_table().encode(),
4067        );
4068        let (payload_commitment, shares) =
4069            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
4070                .unwrap();
4071
4072        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
4073        let vid = convert_proposal(
4074            AvidMDisperseShare::<SeqTypes> {
4075                view_number: data_view,
4076                payload_commitment,
4077                share: shares[0].clone(),
4078                recipient_key: pubkey,
4079                epoch: None,
4080                target_epoch: None,
4081                common: avidm_param,
4082            }
4083            .to_proposal(&privkey)
4084            .unwrap()
4085            .clone(),
4086        );
4087        let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
4088            proposal: QuorumProposal2::<SeqTypes> {
4089                epoch: None,
4090                block_header: leaf.block_header().clone(),
4091                view_number: data_view,
4092                justify_qc: QuorumCertificate2::genesis(
4093                    &ValidatedState::default(),
4094                    &NodeState::mock(),
4095                    TEST_VERSIONS.test,
4096                )
4097                .await,
4098                upgrade_certificate: None,
4099                view_change_evidence: None,
4100                next_drb_result: None,
4101                next_epoch_justify_qc: None,
4102                state_cert: None,
4103            },
4104        };
4105        let quorum_proposal_signature =
4106            BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
4107                .expect("Failed to sign quorum proposal");
4108        let quorum_proposal = Proposal {
4109            data: quorum_proposal,
4110            signature: quorum_proposal_signature,
4111            _pd: Default::default(),
4112        };
4113
4114        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
4115            .expect("Failed to sign block payload");
4116        let da_proposal = Proposal {
4117            data: DaProposal2::<SeqTypes> {
4118                encoded_transactions: leaf_payload_bytes_arc.clone(),
4119                metadata: leaf_payload.ns_table().clone(),
4120                view_number: data_view,
4121                epoch: Some(EpochNumber::new(0)),
4122                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
4123            },
4124            signature: block_payload_signature,
4125            _pd: Default::default(),
4126        };
4127
4128        tracing::info!(?vid, ?da_proposal, ?quorum_proposal, "append data");
4129        storage.append_vid(&vid).await.unwrap();
4130        storage
4131            .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
4132            .await
4133            .unwrap();
4134        storage
4135            .append_quorum_proposal2(&quorum_proposal)
4136            .await
4137            .unwrap();
4138
4139        // The first decide doesn't trigger any garbage collection, even though our usage exceeds
4140        // the target, because of the minimum retention.
4141        tracing::info!("decide view 1");
4142        storage
4143            .append_decided_leaves(data_view + 1, [], None, &NullEventConsumer)
4144            .await
4145            .unwrap();
4146        assert_eq!(
4147            storage.load_vid_share(data_view).await.unwrap().unwrap(),
4148            vid
4149        );
4150        assert_eq!(
4151            storage.load_da_proposal(data_view).await.unwrap().unwrap(),
4152            da_proposal
4153        );
4154        assert_eq!(
4155            storage.load_quorum_proposal(data_view).await.unwrap(),
4156            quorum_proposal
4157        );
4158
4159        // After another view, our data is beyond the minimum retention (though not the target
4160        // retention) so it gets pruned.
4161        tracing::info!("decide view 2");
4162        storage
4163            .append_decided_leaves(data_view + 2, [], None, &NullEventConsumer)
4164            .await
4165            .unwrap();
4166        assert!(storage.load_vid_share(data_view).await.unwrap().is_none(),);
4167        assert!(storage.load_da_proposal(data_view).await.unwrap().is_none());
4168        storage.load_quorum_proposal(data_view).await.unwrap_err();
4169    }
4170
4171    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4172    async fn test_pruning_minimum_retention() {
4173        test_pruning_helper(ConsensusPruningOptions {
4174            // Use a very low target usage, to show that we still retain data up to the minimum
4175            // retention even when usage is above target.
4176            target_usage: 0,
4177            minimum_retention: 1,
4178            // Use a very high target retention, so that pruning is only triggered by the minimum
4179            // retention.
4180            target_retention: u64::MAX,
4181        })
4182        .await
4183    }
4184
4185    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4186    async fn test_pruning_target_retention() {
4187        test_pruning_helper(ConsensusPruningOptions {
4188            target_retention: 1,
4189            // Use a very low minimum retention, so that data is only kept around due to the target
4190            // retention.
4191            minimum_retention: 0,
4192            // Use a very high target usage, so that pruning is only triggered by the target
4193            // retention.
4194            target_usage: u64::MAX,
4195        })
4196        .await
4197    }
4198
4199    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4200    async fn test_consensus_migration() {
4201        let tmp = Persistence::tmp_storage().await;
4202        let mut opt = Persistence::options(&tmp);
4203
4204        let storage = opt.create().await.unwrap();
4205
4206        let rows = 300;
4207
4208        assert!(storage.load_state_cert().await.unwrap().is_none());
4209
4210        for i in 0..rows {
4211            let view = ViewNumber::new(i);
4212            let validated_state = ValidatedState::default();
4213            let instance_state = NodeState::default();
4214
4215            let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
4216            let (payload, metadata) =
4217                Payload::from_transactions([], &validated_state, &instance_state)
4218                    .await
4219                    .unwrap();
4220
4221            let payload_bytes = payload.encode();
4222
4223            let block_header = Header::genesis(
4224                &instance_state,
4225                payload.clone(),
4226                &metadata,
4227                TEST_VERSIONS.test.base,
4228            );
4229
4230            let null_quorum_data = QuorumData {
4231                leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
4232            };
4233
4234            let justify_qc = QuorumCertificate::new(
4235                null_quorum_data.clone(),
4236                null_quorum_data.commit(),
4237                view,
4238                None,
4239                std::marker::PhantomData,
4240            );
4241
4242            let quorum_proposal = QuorumProposal {
4243                block_header,
4244                view_number: view,
4245                justify_qc: justify_qc.clone(),
4246                upgrade_certificate: None,
4247                proposal_certificate: None,
4248            };
4249
4250            let quorum_proposal_signature =
4251                BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
4252                    .expect("Failed to sign quorum proposal");
4253
4254            let proposal = Proposal {
4255                data: quorum_proposal.clone(),
4256                signature: quorum_proposal_signature,
4257                _pd: std::marker::PhantomData::<SeqTypes>,
4258            };
4259
4260            let proposal_bytes = bincode::serialize(&proposal)
4261                .context("serializing proposal")
4262                .unwrap();
4263
4264            let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
4265            leaf.fill_block_payload(
4266                payload,
4267                GENESIS_VID_NUM_STORAGE_NODES,
4268                TEST_VERSIONS.test.base,
4269            )
4270            .unwrap();
4271
4272            let mut tx = storage.db.write().await.unwrap();
4273
4274            let qc_bytes = bincode::serialize(&justify_qc).unwrap();
4275            let leaf_bytes = bincode::serialize(&leaf).unwrap();
4276
4277            tx.upsert(
4278                "anchor_leaf",
4279                ["view", "leaf", "qc"],
4280                ["view"],
4281                [(i as i64, leaf_bytes, qc_bytes)],
4282            )
4283            .await
4284            .unwrap();
4285
4286            let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
4287                epoch: EpochNumber::new(i),
4288                light_client_state: Default::default(), // filling arbitrary value
4289                next_stake_table_state: Default::default(), // filling arbitrary value
4290                signatures: vec![],                     // filling arbitrary value
4291                auth_root: Default::default(),
4292            };
4293            // manually upsert the state cert to the finalized database
4294            let state_cert_bytes = bincode::serialize(&state_cert).unwrap();
4295            tx.upsert(
4296                "finalized_state_cert",
4297                ["epoch", "state_cert"],
4298                ["epoch"],
4299                [(i as i64, state_cert_bytes)],
4300            )
4301            .await
4302            .unwrap();
4303
4304            tx.commit().await.unwrap();
4305
4306            let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
4307                .disperse(payload_bytes.clone())
4308                .unwrap();
4309
4310            let vid = VidDisperseShare0::<SeqTypes> {
4311                view_number: ViewNumber::new(i),
4312                payload_commitment: Default::default(),
4313                share: disperse.shares[0].clone(),
4314                common: disperse.common,
4315                recipient_key: pubkey,
4316            };
4317
4318            let (payload, metadata) =
4319                Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
4320                    .await
4321                    .unwrap();
4322
4323            let da = DaProposal::<SeqTypes> {
4324                encoded_transactions: payload.encode(),
4325                metadata,
4326                view_number: ViewNumber::new(i),
4327            };
4328
4329            let block_payload_signature =
4330                BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
4331
4332            let da_proposal = Proposal {
4333                data: da,
4334                signature: block_payload_signature,
4335                _pd: Default::default(),
4336            };
4337
4338            storage
4339                .append_vid(&convert_proposal(vid.to_proposal(&privkey).unwrap()))
4340                .await
4341                .unwrap();
4342            storage
4343                .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
4344                .await
4345                .unwrap();
4346
4347            let leaf_hash = Committable::commit(&leaf);
4348            let mut tx = storage.db.write().await.expect("failed to start write tx");
4349            tx.upsert(
4350                "quorum_proposals",
4351                ["view", "leaf_hash", "data"],
4352                ["view"],
4353                [(i as i64, leaf_hash.to_string(), proposal_bytes)],
4354            )
4355            .await
4356            .expect("failed to upsert quorum proposal");
4357
4358            let justify_qc = &proposal.data.justify_qc;
4359            let justify_qc_bytes = bincode::serialize(&justify_qc)
4360                .context("serializing QC")
4361                .unwrap();
4362            tx.upsert(
4363                "quorum_certificate",
4364                ["view", "leaf_hash", "data"],
4365                ["view"],
4366                [(
4367                    justify_qc.view_number.u64() as i64,
4368                    justify_qc.data.leaf_commit.to_string(),
4369                    &justify_qc_bytes,
4370                )],
4371            )
4372            .await
4373            .expect("failed to upsert qc");
4374
4375            tx.commit().await.expect("failed to commit");
4376        }
4377
4378        storage.migrate_storage().await.unwrap();
4379
4380        let mut tx = storage.db.read().await.unwrap();
4381        let (anchor_leaf2_count,) = query_as::<(i64,)>("SELECT COUNT(*) from anchor_leaf2")
4382            .fetch_one(tx.as_mut())
4383            .await
4384            .unwrap();
4385        assert_eq!(
4386            anchor_leaf2_count, rows as i64,
4387            "anchor leaf count does not match rows",
4388        );
4389
4390        let (da_proposal_count,) = query_as::<(i64,)>("SELECT COUNT(*) from da_proposal2")
4391            .fetch_one(tx.as_mut())
4392            .await
4393            .unwrap();
4394        assert_eq!(
4395            da_proposal_count, rows as i64,
4396            "da proposal count does not match rows",
4397        );
4398
4399        let (vid_share_count,) = query_as::<(i64,)>("SELECT COUNT(*) from vid_share2")
4400            .fetch_one(tx.as_mut())
4401            .await
4402            .unwrap();
4403        assert_eq!(
4404            vid_share_count, rows as i64,
4405            "vid share count does not match rows"
4406        );
4407
4408        let (quorum_proposals_count,) =
4409            query_as::<(i64,)>("SELECT COUNT(*) from quorum_proposals2")
4410                .fetch_one(tx.as_mut())
4411                .await
4412                .unwrap();
4413        assert_eq!(
4414            quorum_proposals_count, rows as i64,
4415            "quorum proposals count does not match rows",
4416        );
4417
4418        let (quorum_certificates_count,) =
4419            query_as::<(i64,)>("SELECT COUNT(*) from quorum_certificate2")
4420                .fetch_one(tx.as_mut())
4421                .await
4422                .unwrap();
4423        assert_eq!(
4424            quorum_certificates_count, rows as i64,
4425            "quorum certificates count does not match rows",
4426        );
4427
4428        let (state_cert_count,) = query_as::<(i64,)>("SELECT COUNT(*) from finalized_state_cert")
4429            .fetch_one(tx.as_mut())
4430            .await
4431            .unwrap();
4432        assert_eq!(
4433            state_cert_count, rows as i64,
4434            "Light client state update certificates count does not match rows",
4435        );
4436        assert_eq!(
4437            storage.load_state_cert().await.unwrap().unwrap(),
4438            LightClientStateUpdateCertificateV2::<SeqTypes> {
4439                epoch: EpochNumber::new(rows - 1),
4440                light_client_state: Default::default(),
4441                next_stake_table_state: Default::default(),
4442                signatures: vec![],
4443                auth_root: Default::default(),
4444            },
4445            "Wrong light client state update certificate in the storage",
4446        );
4447
4448        storage.migrate_storage().await.unwrap();
4449    }
4450
4451    /// Regression test for an ambiguous behavior in `store_events`/`load_events`.
4452    ///
4453    /// Previously, `store_events` did nothing when given an empty events list (in fact,
4454    /// `fetch_and_store_stake_table_events` was not even calling it). But this means that the
4455    /// `stake_table_events_l1_block` column does not get updated when we enter a new epoch with no
4456    /// new stake table events. This makes it impossible to distinguish between two very different
4457    /// scenarios:
4458    ///
4459    /// 1. The node has successfully processed events through the latest L1 finalized block, but
4460    ///    there are no new events from the last epoch.
4461    /// 2. The node is lagging behind the latest L1 finalized block, and is possibly missing some
4462    ///    new events.
4463    ///
4464    /// In scenario 1, clients of this node should be able to treat the empty list of stake table
4465    /// events as authoritative, and derive the stake table for the next epoch (which will end up
4466    /// being the same as the previous one. However, in scenario 2, clients need to wait, because we
4467    /// don't yet know whether there could be any events that modify the stake table. Thus,
4468    /// distinguishing these two scenarios is important.
4469    ///
4470    /// This regression test ensures that even if there are no new events, at least the
4471    /// `stake_table_events_l1_block` column gets updated. We can then distinguish the two scenarios
4472    /// using the `EventsPersistenceRead`` return value from load_events.
4473    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4474    async fn test_store_events_empty() {
4475        let tmp = Persistence::tmp_storage().await;
4476        let mut opt = Persistence::options(&tmp);
4477        let storage = opt.create().await.unwrap();
4478
4479        assert_eq!(storage.load_events(0, 100).await.unwrap(), (None, vec![]));
4480
4481        // Storing an empty events list still updates the latest L1 block.
4482        for i in 1..=2 {
4483            tracing::info!(i, "update l1 height");
4484            storage.store_events(i, vec![]).await.unwrap();
4485            assert_eq!(
4486                storage.load_events(0, 100).await.unwrap(),
4487                (Some(EventsPersistenceRead::UntilL1Block(i)), vec![])
4488            );
4489        }
4490    }
4491
4492    // ---------------------------------------------------------------------------
4493    // Tests for retry_if / is_serialization_error
4494    // ---------------------------------------------------------------------------
4495
4496    /// Minimal `DatabaseError` implementation that lets tests construct a
4497    /// `sqlx::Error::Database(...)` with an arbitrary SQLSTATE code without
4498    /// needing a live database connection.
4499    #[derive(Debug)]
4500    struct MockDatabaseError {
4501        code: &'static str,
4502    }
4503
4504    impl std::fmt::Display for MockDatabaseError {
4505        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4506            write!(f, "mock db error (code {})", self.code)
4507        }
4508    }
4509
4510    impl std::error::Error for MockDatabaseError {}
4511
4512    impl sqlx::error::DatabaseError for MockDatabaseError {
4513        fn message(&self) -> &str {
4514            "mock db error"
4515        }
4516
4517        fn code(&self) -> Option<std::borrow::Cow<'_, str>> {
4518            Some(std::borrow::Cow::Borrowed(self.code))
4519        }
4520
4521        fn kind(&self) -> sqlx::error::ErrorKind {
4522            sqlx::error::ErrorKind::Other
4523        }
4524
4525        fn as_error(&self) -> &(dyn std::error::Error + Send + Sync + 'static) {
4526            self
4527        }
4528
4529        fn as_error_mut(&mut self) -> &mut (dyn std::error::Error + Send + Sync + 'static) {
4530            self
4531        }
4532
4533        fn into_error(self: Box<Self>) -> Box<dyn std::error::Error + Send + Sync + 'static> {
4534            self
4535        }
4536    }
4537
4538    fn mock_serialization_error() -> anyhow::Error {
4539        anyhow::Error::from(sqlx::Error::Database(Box::new(MockDatabaseError {
4540            code: "40001",
4541        })))
4542    }
4543
4544    #[test]
4545    fn test_is_serialization_error() {
4546        // PostgreSQL error code 40001 must be recognised as a serialization failure.
4547        assert!(is_serialization_error(&mock_serialization_error()));
4548
4549        // Any other database error code must NOT match.
4550        let unique_violation =
4551            anyhow::Error::from(sqlx::Error::Database(Box::new(MockDatabaseError {
4552                code: "23505",
4553            })));
4554        assert!(!is_serialization_error(&unique_violation));
4555
4556        // Non-database errors must not match.
4557        assert!(!is_serialization_error(&anyhow::anyhow!("plain error")));
4558    }
4559
4560    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4561    async fn test_retry_if_succeeds_immediately() {
4562        let calls = Arc::new(AtomicU32::new(0));
4563        let calls_clone = calls.clone();
4564
4565        let result = WRITE_BACKOFF
4566            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || {
4567                let calls = calls_clone.clone();
4568                async move {
4569                    calls.fetch_add(1, Ordering::SeqCst);
4570                    Ok(())
4571                }
4572            })
4573            .await;
4574
4575        assert!(result.is_ok());
4576        assert_eq!(calls.load(Ordering::SeqCst), 1);
4577    }
4578
4579    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4580    async fn test_retry_if_retries_on_serialization_error() {
4581        let calls = Arc::new(AtomicU32::new(0));
4582        let calls_clone = calls.clone();
4583
4584        // The closure fails twice with a serialization error, then succeeds on the third attempt.
4585        let result = WRITE_BACKOFF
4586            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || {
4587                let calls = calls_clone.clone();
4588                async move {
4589                    let n = calls.fetch_add(1, Ordering::SeqCst);
4590                    if n < 2 {
4591                        Err(mock_serialization_error())
4592                    } else {
4593                        Ok(())
4594                    }
4595                }
4596            })
4597            .await;
4598
4599        assert!(result.is_ok());
4600        assert_eq!(calls.load(Ordering::SeqCst), 3);
4601    }
4602
4603    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4604    async fn test_retry_if_exhausts_retries() {
4605        let calls = Arc::new(AtomicU32::new(0));
4606        let calls_clone = calls.clone();
4607
4608        // The closure always fails; retry must give up after WRITE_RETRY_MAX (5) retries.
4609        let result: anyhow::Result<()> = WRITE_BACKOFF
4610            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || {
4611                let calls = calls_clone.clone();
4612                async move {
4613                    calls.fetch_add(1, Ordering::SeqCst);
4614                    Err(mock_serialization_error())
4615                }
4616            })
4617            .await;
4618
4619        assert!(result.is_err());
4620        // 1 initial attempt + 5 retries = 6 total calls.
4621        assert_eq!(calls.load(Ordering::SeqCst), 6);
4622    }
4623
4624    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4625    async fn test_retry_if_no_retry_on_other_errors() {
4626        let calls = Arc::new(AtomicU32::new(0));
4627        let calls_clone = calls.clone();
4628
4629        // Non-serialization errors must not be retried.
4630        let result: anyhow::Result<()> = WRITE_BACKOFF
4631            .retry_if(WRITE_RETRY_MAX, is_serialization_error, || {
4632                let calls = calls_clone.clone();
4633                async move {
4634                    calls.fetch_add(1, Ordering::SeqCst);
4635                    Err(anyhow::anyhow!("unrelated error"))
4636                }
4637            })
4638            .await;
4639
4640        assert!(result.is_err());
4641        assert_eq!(calls.load(Ordering::SeqCst), 1);
4642    }
4643}
4644
4645#[cfg(test)]
4646#[cfg(not(feature = "embedded-db"))]
4647mod postgres_tests {
4648    use espresso_types::{FeeAccount, Header, Leaf, NodeState, Transaction as Tx};
4649    use hotshot_example_types::node_types::TEST_VERSIONS;
4650    use hotshot_query_service::{
4651        availability::{BlockQueryData, LeafQueryData},
4652        data_source::storage::UpdateAvailabilityStorage,
4653    };
4654    use hotshot_types::{
4655        data::vid_commitment,
4656        simple_certificate::QuorumCertificate,
4657        traits::{
4658            EncodeBytes,
4659            block_contents::{BlockHeader, BuilderFee, GENESIS_VID_NUM_STORAGE_NODES},
4660            election::Membership,
4661            signature_key::BuilderSignatureKey,
4662        },
4663    };
4664
4665    use super::*;
4666    use crate::persistence::tests::TestablePersistence as _;
4667
4668    async fn test_postgres_read_ns_table(instance_state: NodeState) {
4669        instance_state
4670            .coordinator
4671            .membership()
4672            .set_first_epoch(EpochNumber::genesis(), Default::default());
4673
4674        let tmp = Persistence::tmp_storage().await;
4675        let mut opt = Persistence::options(&tmp);
4676        let storage = opt.create().await.unwrap();
4677
4678        let txs = [
4679            Tx::new(10001u32.into(), vec![1, 2, 3]),
4680            Tx::new(10001u32.into(), vec![4, 5, 6]),
4681            Tx::new(10009u32.into(), vec![7, 8, 9]),
4682        ];
4683
4684        let validated_state = Default::default();
4685        let justify_qc =
4686            QuorumCertificate::genesis(&validated_state, &instance_state, TEST_VERSIONS.test).await;
4687        let view_number: ViewNumber = justify_qc.view_number + 1;
4688        let parent_leaf = Leaf::genesis(&validated_state, &instance_state, TEST_VERSIONS.test.base)
4689            .await
4690            .into();
4691
4692        let (payload, ns_table) =
4693            Payload::from_transactions(txs.clone(), &validated_state, &instance_state)
4694                .await
4695                .unwrap();
4696        let payload_bytes = payload.encode();
4697        let payload_commitment = vid_commitment(
4698            &payload_bytes,
4699            &ns_table.encode(),
4700            GENESIS_VID_NUM_STORAGE_NODES,
4701            instance_state.current_version,
4702        );
4703        let builder_commitment = payload.builder_commitment(&ns_table);
4704        let (fee_account, fee_key) = FeeAccount::generated_from_seed_indexed([0; 32], 0);
4705        let fee_amount = 0;
4706        let fee_signature = FeeAccount::sign_fee(&fee_key, fee_amount, &ns_table).unwrap();
4707        let block_header = Header::new(
4708            &validated_state,
4709            &instance_state,
4710            &parent_leaf,
4711            payload_commitment,
4712            builder_commitment,
4713            ns_table,
4714            BuilderFee {
4715                fee_amount,
4716                fee_account,
4717                fee_signature,
4718            },
4719            instance_state.current_version,
4720            view_number.u64(),
4721        )
4722        .await
4723        .unwrap();
4724        let proposal = QuorumProposal {
4725            block_header: block_header.clone(),
4726            view_number,
4727            justify_qc: justify_qc.clone(),
4728            upgrade_certificate: None,
4729            proposal_certificate: None,
4730        };
4731        let leaf: Leaf2 = Leaf::from_quorum_proposal(&proposal).into();
4732        let mut qc = justify_qc.to_qc2();
4733        qc.data.leaf_commit = leaf.commit();
4734        qc.view_number = view_number;
4735
4736        let mut tx = storage.db.write().await.unwrap();
4737        tx.insert_leaf(&LeafQueryData::new(leaf, qc).unwrap())
4738            .await
4739            .unwrap();
4740        tx.insert_block(&BlockQueryData::<SeqTypes>::new(block_header, payload))
4741            .await
4742            .unwrap();
4743        tx.commit().await.unwrap();
4744
4745        let mut tx = storage.db.read().await.unwrap();
4746        let rows = query(
4747            "
4748            SELECT ns_id, read_ns_id(get_ns_table(h.data), t.ns_index) AS read_ns_id
4749              FROM header AS h
4750              JOIN transactions AS t ON t.block_height = h.height
4751              ORDER BY t.ns_index, t.position
4752        ",
4753        )
4754        .fetch_all(tx.as_mut())
4755        .await
4756        .unwrap();
4757        assert_eq!(rows.len(), txs.len());
4758        for (i, row) in rows.into_iter().enumerate() {
4759            let ns = u64::from(txs[i].namespace()) as i64;
4760            assert_eq!(row.get::<i64, _>("ns_id"), ns);
4761            assert_eq!(row.get::<i64, _>("read_ns_id"), ns);
4762        }
4763    }
4764
4765    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4766    async fn test_postgres_read_ns_table_v0_1() {
4767        test_postgres_read_ns_table(NodeState::mock()).await;
4768    }
4769
4770    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4771    async fn test_postgres_read_ns_table_v0_2() {
4772        test_postgres_read_ns_table(NodeState::mock_v2()).await;
4773    }
4774
4775    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4776    async fn test_postgres_read_ns_table_v0_3() {
4777        test_postgres_read_ns_table(NodeState::mock_v3().with_epoch_height(0)).await;
4778    }
4779
4780    /// Verify that concurrent calls to `record_action` all succeed under
4781    /// PostgreSQL SERIALIZABLE isolation. `WRITE_BACKOFF.retry_if` handles any
4782    /// 40001 serialization failures that arise when many tasks race to update
4783    /// the same row.
4784    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4785    async fn test_record_action_concurrent() {
4786        let tmp = Persistence::tmp_storage().await;
4787        let storage = Arc::new(Persistence::connect(&tmp).await);
4788
4789        let handles: Vec<_> = (0u64..20)
4790            .map(|i| {
4791                let storage = Arc::clone(&storage);
4792                tokio::spawn(async move {
4793                    storage
4794                        .record_action(ViewNumber::new(i), None, HotShotAction::Vote)
4795                        .await
4796                })
4797            })
4798            .collect();
4799
4800        for handle in handles {
4801            handle.await.unwrap().unwrap();
4802        }
4803
4804        let latest = storage.load_latest_acted_view().await.unwrap();
4805        assert_eq!(latest, Some(ViewNumber::new(19)));
4806    }
4807}