espresso_node/persistence/
sql.rs

1use std::{
2    collections::BTreeMap,
3    path::PathBuf,
4    str::FromStr,
5    sync::Arc,
6    time::{Duration, Instant},
7};
8
9use alloy::primitives::Address;
10use anyhow::{Context, bail, ensure};
11use async_trait::async_trait;
12use clap::Parser;
13use committable::Committable;
14use derivative::Derivative;
15use derive_more::derive::{From, Into};
16use either::Either;
17use espresso_types::{
18    AuthenticatedValidatorMap, BackoffParams, BlockMerkleTree, FeeMerkleTree, Leaf, Leaf2,
19    NetworkConfig, Payload, PubKey, RegisteredValidatorMap, StakeTableHash, parse_duration,
20    parse_size,
21    traits::{EventsPersistenceRead, MembershipPersistence, StakeTuple},
22    v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence, StateCatchup},
23    v0_3::{
24        AuthenticatedValidator, EventKey, IndexedStake, RegisteredValidator, RewardAmount,
25        StakeTableEvent,
26    },
27    v0_4::{REWARD_MERKLE_TREE_V2_HEIGHT, RewardAccountV2, RewardMerkleTreeV2},
28};
29use futures::stream::StreamExt;
30use hotshot::InitializerEpochInfo;
31use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
32    DhtPersistentStorage, SerializableRecord,
33};
34use hotshot_query_service::{
35    availability::{BlockId, LeafQueryData},
36    data_source::{
37        Transaction as _, VersionedDataSource,
38        storage::{
39            AvailabilityStorage,
40            pruning::PrunerCfg,
41            sql::{
42                Config, Db, Read, SqlStorage, StorageConnectionType, Transaction, TransactionMode,
43                Write, include_migrations, query_as, syntax_helpers::MAX_FN,
44            },
45        },
46    },
47    fetching::{
48        Provider,
49        request::{LeafRequest, PayloadRequest, VidCommonRequest},
50    },
51    merklized_state::MerklizedState,
52};
53use hotshot_types::{
54    data::{
55        DaProposal, DaProposal2, EpochNumber, QuorumProposal, QuorumProposalWrapper,
56        QuorumProposalWrapperLegacy, VidCommitment, VidCommon, VidDisperseShare, VidDisperseShare0,
57    },
58    drb::{DrbInput, DrbResult},
59    event::{Event, EventType, HotShotAction, LeafInfo},
60    message::{Proposal, convert_proposal},
61    simple_certificate::{
62        CertificatePair, LightClientStateUpdateCertificateV1, LightClientStateUpdateCertificateV2,
63        NextEpochQuorumCertificate2, QuorumCertificate, QuorumCertificate2, UpgradeCertificate,
64    },
65    traits::{
66        block_contents::{BlockHeader, BlockPayload},
67        metrics::Metrics,
68    },
69    vote::HasViewNumber,
70};
71use indexmap::IndexMap;
72use itertools::Itertools;
73use jf_merkle_tree_compat::MerkleTreeScheme;
74use sqlx::{Executor, QueryBuilder, Row, query};
75
76use crate::{
77    NodeType, RECENT_STAKE_TABLES_LIMIT, SeqTypes, ViewNumber,
78    api::RewardMerkleTreeV2Data,
79    catchup::SqlStateCatchup,
80    persistence::{migrate_network_config, persistence_metrics::PersistenceMetricsValue},
81};
82
83/// Options for Postgres-backed persistence.
84#[derive(Parser, Clone, Derivative)]
85#[derivative(Debug)]
86pub struct PostgresOptions {
87    /// Hostname for the remote Postgres database server.
88    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_HOST")]
89    pub(crate) host: Option<String>,
90
91    /// Port for the remote Postgres database server.
92    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PORT")]
93    pub(crate) port: Option<u16>,
94
95    /// Name of database to connect to.
96    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_DATABASE")]
97    pub(crate) database: Option<String>,
98
99    /// Postgres user to connect as.
100    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USER")]
101    pub(crate) user: Option<String>,
102
103    /// Password for Postgres user.
104    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_PASSWORD")]
105    // Hide from debug output since may contain sensitive data.
106    #[derivative(Debug = "ignore")]
107    pub(crate) password: Option<String>,
108
109    /// Use TLS for an encrypted connection to the database.
110    #[clap(long, env = "ESPRESSO_SEQUENCER_POSTGRES_USE_TLS")]
111    pub(crate) use_tls: bool,
112}
113
114impl Default for PostgresOptions {
115    fn default() -> Self {
116        Self::parse_from(std::iter::empty::<String>())
117    }
118}
119
120#[derive(Parser, Clone, Derivative, Default, From, Into)]
121#[derivative(Debug)]
122pub struct SqliteOptions {
123    /// Base directory for the SQLite database.
124    /// The SQLite file will be created in the `sqlite` subdirectory with filename as `database`.
125    #[clap(
126        long,
127        env = "ESPRESSO_SEQUENCER_STORAGE_PATH",
128        value_parser = build_sqlite_path
129    )]
130    pub(crate) path: PathBuf,
131}
132
133pub fn build_sqlite_path(path: &str) -> anyhow::Result<PathBuf> {
134    let sub_dir = PathBuf::from_str(path)?.join("sqlite");
135
136    // if `sqlite` sub dir does not exist then create it
137    if !sub_dir.exists() {
138        std::fs::create_dir_all(&sub_dir)
139            .with_context(|| format!("failed to create directory: {sub_dir:?}"))?;
140    }
141
142    Ok(sub_dir.join("database"))
143}
144
145/// Options for database-backed persistence, supporting both Postgres and SQLite.
146#[derive(Parser, Clone, Derivative, From, Into)]
147#[derivative(Debug)]
148pub struct Options {
149    #[cfg(not(feature = "embedded-db"))]
150    #[clap(flatten)]
151    pub(crate) postgres_options: PostgresOptions,
152
153    #[cfg(feature = "embedded-db")]
154    #[clap(flatten)]
155    pub(crate) sqlite_options: SqliteOptions,
156
157    /// Database URI for Postgres or SQLite.
158    ///
159    /// This is a shorthand for setting a number of other options all at once. The URI has the
160    /// following format ([brackets] indicate optional segments):
161    ///
162    /// - **Postgres:** `postgres[ql]://[username[:password]@][host[:port],]/database[?parameter_list]`
163    /// - **SQLite:** `sqlite://path/to/db.sqlite`
164    ///
165    /// Options set explicitly via other env vars or flags will take precedence, so you can use this
166    /// URI to set a baseline and then use other parameters to override or add configuration. In
167    /// addition, there are some parameters which cannot be set via the URI, such as TLS.
168    // Hide from debug output since may contain sensitive data.
169    #[derivative(Debug = "ignore")]
170    pub(crate) uri: Option<String>,
171
172    /// This will enable the pruner and set the default pruning parameters unless provided.
173    /// Default parameters:
174    /// - pruning_threshold: 3 TB
175    /// - minimum_retention: 1 day
176    /// - target_retention: 7 days
177    /// - batch_size: 1000
178    /// - max_usage: 80%
179    /// - interval: 1 hour
180    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_PRUNE")]
181    pub(crate) prune: bool,
182
183    /// Pruning parameters.
184    #[clap(flatten)]
185    pub(crate) pruning: PruningOptions,
186
187    /// Pruning parameters for ephemeral consensus storage.
188    #[clap(flatten)]
189    pub(crate) consensus_pruning: ConsensusPruningOptions,
190
191    /// Specifies the maximum number of concurrent fetch requests allowed from peers.
192    #[clap(long, env = "ESPRESSO_SEQUENCER_FETCH_RATE_LIMIT")]
193    pub(crate) fetch_rate_limit: Option<usize>,
194
195    /// The minimum delay between active fetches in a stream.
196    #[clap(long, env = "ESPRESSO_SEQUENCER_ACTIVE_FETCH_DELAY", value_parser = parse_duration)]
197    pub(crate) active_fetch_delay: Option<Duration>,
198
199    /// The minimum delay between loading chunks in a stream.
200    #[clap(long, env = "ESPRESSO_SEQUENCER_CHUNK_FETCH_DELAY", value_parser = parse_duration)]
201    pub(crate) chunk_fetch_delay: Option<Duration>,
202
203    /// The number of items to process in a single transaction when scanning the database for
204    /// missing objects.
205    #[clap(long, env = "ESPRESSO_SEQUENCER_SYNC_STATUS_CHUNK_SIZE")]
206    pub(crate) sync_status_chunk_size: Option<usize>,
207
208    /// Duration to cache sync status results for.
209    #[clap(long, env = "ESPRESSO_SEQUENCER_SYNC_STATUS_TTL", value_parser = parse_duration)]
210    pub(crate) sync_status_ttl: Option<Duration>,
211
212    /// The number of items to process at a time when scanning for proactive fetching.
213    #[clap(long, env = "ESPRESSO_SEQUENCER_PROACTIVE_SCAN_CHUNK_SIZE")]
214    pub(crate) proactive_scan_chunk_size: Option<usize>,
215
216    /// The time interval between proactive fetching scans.
217    #[clap(long, env = "ESPRESSO_SEQUENCER_PROACTIVE_SCAN_INTERVAL", value_parser = parse_duration)]
218    pub(crate) proactive_scan_interval: Option<Duration>,
219
220    /// Disable the proactive scanner task.
221    #[clap(long, env = "ESPRESSO_SEQUENCER_DISABLE_PROACTIVE_FETCHING")]
222    pub(crate) disable_proactive_fetching: bool,
223
224    /// Disable pruning and reconstruct previously pruned data.
225    ///
226    /// While running without pruning is the default behavior, the default will not try to
227    /// reconstruct data that was pruned in a previous run where pruning was enabled. This option
228    /// instructs the service to run without pruning _and_ reconstruct all previously pruned data by
229    /// fetching from peers.
230    #[clap(long, env = "ESPRESSO_SEQUENCER_ARCHIVE", conflicts_with = "prune")]
231    pub(crate) archive: bool,
232
233    /// Turns on leaf only data storage
234    #[clap(
235        long,
236        env = "ESPRESSO_SEQUENCER_LIGHTWEIGHT",
237        default_value_t = false,
238        conflicts_with = "archive"
239    )]
240    pub(crate) lightweight: bool,
241
242    /// The maximum idle time of a database connection.
243    ///
244    /// Any connection which has been open and unused longer than this duration will be
245    /// automatically closed to reduce load on the server.
246    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_IDLE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "10m")]
247    pub(crate) idle_connection_timeout: Duration,
248
249    /// The maximum lifetime of a database connection.
250    ///
251    /// Any connection which has been open longer than this duration will be automatically closed
252    /// (and, if needed, replaced), even if it is otherwise healthy. It is good practice to refresh
253    /// even healthy connections once in a while (e.g. daily) in case of resource leaks in the
254    /// server implementation.
255    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_CONNECTION_TIMEOUT", value_parser = parse_duration, default_value = "30m")]
256    pub(crate) connection_timeout: Duration,
257
258    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_SLOW_STATEMENT_THRESHOLD", value_parser = parse_duration, default_value = "1s")]
259    pub(crate) slow_statement_threshold: Duration,
260
261    /// The maximum time a single SQL statement is allowed to run before being canceled.
262    ///
263    /// This helps prevent queries from running indefinitely and consuming resources.
264    /// Set to 10 minutes by default
265    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_STATEMENT_TIMEOUT", value_parser = parse_duration, default_value = "10m")]
266    pub(crate) statement_timeout: Duration,
267
268    /// The minimum number of database connections to maintain at any time.
269    ///
270    /// The database client will, to the best of its ability, maintain at least `min` open
271    /// connections at all times. This can be used to reduce the latency hit of opening new
272    /// connections when at least this many simultaneous connections are frequently needed.
273    #[clap(
274        long,
275        env = "ESPRESSO_SEQUENCER_DATABASE_MIN_CONNECTIONS",
276        default_value = "0"
277    )]
278    pub(crate) min_connections: u32,
279
280    /// Allows setting a different maximum number of connections for query operations.
281    /// Default value of None implies using the min_connections value.
282    #[cfg(not(feature = "embedded-db"))]
283    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_QUERY_MIN_CONNECTIONS", default_value = None)]
284    pub(crate) query_min_connections: Option<u32>,
285
286    /// The maximum number of database connections to maintain at any time.
287    ///
288    /// Once `max` connections are in use simultaneously, further attempts to acquire a connection
289    /// (or begin a transaction) will block until one of the existing connections is released.
290    #[clap(
291        long,
292        env = "ESPRESSO_SEQUENCER_DATABASE_MAX_CONNECTIONS",
293        default_value = "25"
294    )]
295    pub(crate) max_connections: u32,
296
297    /// Allows setting a different maximum number of connections for query operations.
298    /// Default value of None implies using the max_connections value.
299    #[cfg(not(feature = "embedded-db"))]
300    #[clap(long, env = "ESPRESSO_SEQUENCER_DATABASE_QUERY_MAX_CONNECTIONS", default_value = None)]
301    pub(crate) query_max_connections: Option<u32>,
302
303    // Keep the database connection pool when persistence is created,
304    // allowing it to be reused across multiple instances instead of creating
305    // a new pool each time such as for API, consensus storage etc
306    // This also ensures all storage instances adhere to the MAX_CONNECTIONS limit if set
307    //
308    // Note: Cloning the `Pool` is lightweight and efficient because it simply
309    // creates a new reference-counted handle to the underlying pool state.
310    #[clap(skip)]
311    pub(crate) pool: Option<sqlx::Pool<Db>>,
312}
313
314impl Default for Options {
315    fn default() -> Self {
316        Self::parse_from(std::iter::empty::<String>())
317    }
318}
319
320#[cfg(not(feature = "embedded-db"))]
321impl From<PostgresOptions> for Config {
322    fn from(opt: PostgresOptions) -> Self {
323        let mut cfg = Config::default();
324
325        if let Some(host) = opt.host {
326            cfg = cfg.host(host);
327        }
328
329        if let Some(port) = opt.port {
330            cfg = cfg.port(port);
331        }
332
333        if let Some(database) = &opt.database {
334            cfg = cfg.database(database);
335        }
336
337        if let Some(user) = &opt.user {
338            cfg = cfg.user(user);
339        }
340
341        if let Some(password) = &opt.password {
342            cfg = cfg.password(password);
343        }
344
345        if opt.use_tls {
346            cfg = cfg.tls();
347        }
348
349        cfg = cfg.max_connections(20);
350        cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
351        cfg = cfg.connection_timeout(Duration::from_secs(10240));
352        cfg = cfg.slow_statement_threshold(Duration::from_secs(1));
353        cfg = cfg.statement_timeout(Duration::from_secs(600)); // 10 minutes default
354
355        cfg
356    }
357}
358
359#[cfg(feature = "embedded-db")]
360impl From<SqliteOptions> for Config {
361    fn from(opt: SqliteOptions) -> Self {
362        let mut cfg = Config::default();
363
364        cfg = cfg.db_path(opt.path);
365
366        cfg = cfg.max_connections(20);
367        cfg = cfg.idle_connection_timeout(Duration::from_secs(120));
368        cfg = cfg.connection_timeout(Duration::from_secs(10240));
369        cfg = cfg.slow_statement_threshold(Duration::from_secs(2));
370        cfg = cfg.statement_timeout(Duration::from_secs(600));
371        cfg
372    }
373}
374
375#[cfg(not(feature = "embedded-db"))]
376impl From<PostgresOptions> for Options {
377    fn from(opt: PostgresOptions) -> Self {
378        Options {
379            postgres_options: opt,
380            max_connections: 20,
381            idle_connection_timeout: Duration::from_secs(120),
382            connection_timeout: Duration::from_secs(10240),
383            slow_statement_threshold: Duration::from_secs(1),
384            statement_timeout: Duration::from_secs(600),
385            ..Default::default()
386        }
387    }
388}
389
390#[cfg(feature = "embedded-db")]
391impl From<SqliteOptions> for Options {
392    fn from(opt: SqliteOptions) -> Self {
393        Options {
394            sqlite_options: opt,
395            max_connections: 5,
396            idle_connection_timeout: Duration::from_secs(120),
397            connection_timeout: Duration::from_secs(10240),
398            slow_statement_threshold: Duration::from_secs(1),
399            uri: None,
400            statement_timeout: Duration::from_secs(600),
401            prune: false,
402            pruning: Default::default(),
403            consensus_pruning: Default::default(),
404            fetch_rate_limit: None,
405            active_fetch_delay: None,
406            chunk_fetch_delay: None,
407            sync_status_chunk_size: None,
408            sync_status_ttl: None,
409            proactive_scan_chunk_size: None,
410            proactive_scan_interval: None,
411            disable_proactive_fetching: false,
412            archive: false,
413            lightweight: false,
414            min_connections: 0,
415            pool: None,
416        }
417    }
418}
419impl TryFrom<&Options> for Config {
420    type Error = anyhow::Error;
421
422    fn try_from(opt: &Options) -> Result<Self, Self::Error> {
423        let mut cfg = match &opt.uri {
424            Some(uri) => uri.parse()?,
425            None => Self::default(),
426        };
427
428        if let Some(pool) = &opt.pool {
429            cfg = cfg.pool(pool.clone());
430        }
431
432        cfg = cfg.max_connections(opt.max_connections);
433        cfg = cfg.idle_connection_timeout(opt.idle_connection_timeout);
434        cfg = cfg.min_connections(opt.min_connections);
435
436        #[cfg(not(feature = "embedded-db"))]
437        {
438            cfg =
439                cfg.query_max_connections(opt.query_max_connections.unwrap_or(opt.max_connections));
440            cfg =
441                cfg.query_min_connections(opt.query_min_connections.unwrap_or(opt.min_connections));
442        }
443
444        cfg = cfg.connection_timeout(opt.connection_timeout);
445        cfg = cfg.slow_statement_threshold(opt.slow_statement_threshold);
446        cfg = cfg.statement_timeout(opt.statement_timeout);
447
448        #[cfg(not(feature = "embedded-db"))]
449        {
450            cfg = cfg.migrations(include_migrations!(
451                "$CARGO_MANIFEST_DIR/api/migrations/postgres"
452            ));
453
454            let pg_options = &opt.postgres_options;
455
456            if let Some(host) = &pg_options.host {
457                cfg = cfg.host(host.clone());
458            }
459
460            if let Some(port) = pg_options.port {
461                cfg = cfg.port(port);
462            }
463
464            if let Some(database) = &pg_options.database {
465                cfg = cfg.database(database);
466            }
467
468            if let Some(user) = &pg_options.user {
469                cfg = cfg.user(user);
470            }
471
472            if let Some(password) = &pg_options.password {
473                cfg = cfg.password(password);
474            }
475
476            if pg_options.use_tls {
477                cfg = cfg.tls();
478            }
479        }
480
481        #[cfg(feature = "embedded-db")]
482        {
483            cfg = cfg.migrations(include_migrations!(
484                "$CARGO_MANIFEST_DIR/api/migrations/sqlite"
485            ));
486
487            cfg = cfg.db_path(opt.sqlite_options.path.clone());
488        }
489
490        if opt.prune {
491            cfg = cfg.pruner_cfg(PrunerCfg::from(opt.pruning))?;
492        }
493        if opt.archive {
494            cfg = cfg.archive();
495        }
496
497        Ok(cfg)
498    }
499}
500
501/// Pruning parameters.
502#[derive(Parser, Clone, Copy, Debug)]
503pub struct PruningOptions {
504    /// Threshold for pruning, specified in bytes.
505    /// If the disk usage surpasses this threshold, pruning is initiated for data older than the specified minimum retention period.
506    /// Pruning continues until the disk usage drops below the MAX USAGE.
507    #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_PRUNING_THRESHOLD", value_parser = parse_size)]
508    pruning_threshold: Option<u64>,
509
510    /// Minimum retention period.
511    /// Data is retained for at least this duration, even if there's no free disk space.
512    #[clap(
513        long,
514        env = "ESPRESSO_SEQUENCER_PRUNER_MINIMUM_RETENTION",
515        value_parser = parse_duration,
516    )]
517    minimum_retention: Option<Duration>,
518
519    /// Target retention period.
520    /// Data older than this is pruned to free up space.
521    #[clap(
522        long,
523        env = "ESPRESSO_SEQUENCER_PRUNER_TARGET_RETENTION",
524        value_parser = parse_duration,
525    )]
526    target_retention: Option<Duration>,
527
528    /// Batch size for pruning.
529    /// This is the number of blocks data to delete in a single transaction.
530    #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_BATCH_SIZE")]
531    batch_size: Option<u64>,
532
533    /// Maximum disk usage (in basis points).
534    ///
535    /// Pruning stops once the disk usage falls below this value, even if
536    /// some data older than the `MINIMUM_RETENTION` remains. Values range
537    /// from 0 (0%) to 10000 (100%).
538    #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_MAX_USAGE")]
539    max_usage: Option<u16>,
540
541    /// Interval for running the pruner.
542    #[clap(
543        long,
544        env = "ESPRESSO_SEQUENCER_PRUNER_INTERVAL",
545        value_parser = parse_duration,
546    )]
547    interval: Option<Duration>,
548
549    /// Number of SQLite pages to vacuum from the freelist
550    /// during each pruner cycle.
551    /// This value corresponds to `N` in the SQLite PRAGMA `incremental_vacuum(N)`,
552    #[clap(long, env = "ESPRESSO_SEQUENCER_PRUNER_INCREMENTAL_VACUUM_PAGES")]
553    pages: Option<u64>,
554}
555
556impl Default for PruningOptions {
557    fn default() -> Self {
558        Self::parse_from(std::iter::empty::<String>())
559    }
560}
561
562impl From<PruningOptions> for PrunerCfg {
563    fn from(opt: PruningOptions) -> Self {
564        let mut cfg = PrunerCfg::new();
565        if let Some(threshold) = opt.pruning_threshold {
566            cfg = cfg.with_pruning_threshold(threshold);
567        }
568        if let Some(min) = opt.minimum_retention {
569            cfg = cfg.with_minimum_retention(min);
570        }
571        if let Some(target) = opt.target_retention {
572            cfg = cfg.with_target_retention(target);
573        }
574        if let Some(batch) = opt.batch_size {
575            cfg = cfg.with_batch_size(batch);
576        }
577        if let Some(max) = opt.max_usage {
578            cfg = cfg.with_max_usage(max);
579        }
580        if let Some(interval) = opt.interval {
581            cfg = cfg.with_interval(interval);
582        }
583
584        if let Some(pages) = opt.pages {
585            cfg = cfg.with_incremental_vacuum_pages(pages)
586        }
587
588        cfg = cfg.with_state_tables(vec![
589            BlockMerkleTree::state_type().to_string(),
590            FeeMerkleTree::state_type().to_string(),
591        ]);
592
593        cfg
594    }
595}
596
597/// Pruning parameters for ephemeral consensus storage.
598#[derive(Parser, Clone, Copy, Debug)]
599pub struct ConsensusPruningOptions {
600    /// Number of views to try to retain in consensus storage before data that hasn't been archived
601    /// is garbage collected.
602    ///
603    /// The longer this is, the more certain that all data will eventually be archived, even if
604    /// there are temporary problems with archive storage or partially missing data. This can be set
605    /// very large, as most data is garbage collected as soon as it is finalized by consensus. This
606    /// setting only applies to views which never get decided (ie forks in consensus) and views for
607    /// which this node is partially offline. These should be exceptionally rare.
608    ///
609    /// Note that in extreme scenarios, data may be garbage collected even before TARGET_RETENTION
610    /// views, if consensus storage exceeds TARGET_USAGE. For a hard lower bound on how long
611    /// consensus data will be retained, see MINIMUM_RETENTION.
612    ///
613    /// The default of 302000 views equates to approximately 1 week (604800 seconds) at an average
614    /// view time of 2s.
615    #[clap(
616        name = "TARGET_RETENTION",
617        long = "consensus-storage-target-retention",
618        env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_RETENTION",
619        default_value = "302000"
620    )]
621    target_retention: u64,
622
623    /// Minimum number of views to try to retain in consensus storage before data that hasn't been
624    /// archived is garbage collected.
625    ///
626    /// This bound allows data to be retained even if consensus storage occupies more than
627    /// TARGET_USAGE. This can be used to ensure sufficient time to move consensus data to archival
628    /// storage as necessary, even under extreme circumstances where otherwise garbage collection
629    /// would kick in based on TARGET_RETENTION.
630    ///
631    /// The default of 130000 views equates to approximately 3 days (259200 seconds) at an average
632    /// view time of 2s.
633    #[clap(
634        name = "MINIMUM_RETENTION",
635        long = "consensus-storage-minimum-retention",
636        env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_MINIMUM_RETENTION",
637        default_value = "130000"
638    )]
639    minimum_retention: u64,
640
641    /// Amount (in bytes) of data to retain in consensus storage before garbage collecting more
642    /// aggressively.
643    ///
644    /// See also TARGET_RETENTION and MINIMUM_RETENTION.
645    #[clap(
646        name = "TARGET_USAGE",
647        long = "consensus-storage-target-usage",
648        env = "ESPRESSO_SEQUENCER_CONSENSUS_STORAGE_TARGET_USAGE",
649        default_value = "1000000000"
650    )]
651    target_usage: u64,
652}
653
654impl Default for ConsensusPruningOptions {
655    fn default() -> Self {
656        Self::parse_from(std::iter::empty::<String>())
657    }
658}
659
660#[async_trait]
661impl PersistenceOptions for Options {
662    type Persistence = Persistence;
663
664    fn set_view_retention(&mut self, view_retention: u64) {
665        self.consensus_pruning.target_retention = view_retention;
666        self.consensus_pruning.minimum_retention = view_retention;
667    }
668
669    async fn create(&mut self) -> anyhow::Result<Self::Persistence> {
670        let config = (&*self).try_into()?;
671        let persistence = Persistence {
672            db: SqlStorage::connect(config, StorageConnectionType::Sequencer).await?,
673            gc_opt: self.consensus_pruning,
674            internal_metrics: PersistenceMetricsValue::default(),
675        };
676        persistence.migrate_quorum_proposal_leaf_hashes().await?;
677        self.pool = Some(persistence.db.pool());
678        Ok(persistence)
679    }
680
681    async fn reset(self) -> anyhow::Result<()> {
682        SqlStorage::connect(
683            Config::try_from(&self)?.reset_schema(),
684            StorageConnectionType::Sequencer,
685        )
686        .await?;
687        Ok(())
688    }
689}
690
691#[derive(Debug, Clone, Copy)]
692pub enum DataMigration {
693    X25519Keys,
694}
695
696impl DataMigration {
697    pub fn as_str(&self) -> &'static str {
698        match self {
699            Self::X25519Keys => "x25519_keys",
700        }
701    }
702}
703
704/// Postgres-backed persistence.
705#[derive(Clone, Debug)]
706pub struct Persistence {
707    db: SqlStorage,
708    gc_opt: ConsensusPruningOptions,
709    /// A reference to the internal metrics
710    internal_metrics: PersistenceMetricsValue,
711}
712
713impl Persistence {
714    /// Ensure the `leaf_hash` column is populated for all existing quorum proposals.
715    ///
716    /// This column was added in a migration, but because it requires computing a commitment of the
717    /// existing data, it is not easy to populate in the SQL migration itself. Thus, on startup, we
718    /// check if there are any just-migrated quorum proposals with a `NULL` value for this column,
719    /// and if so we populate the column manually.
720    async fn migrate_quorum_proposal_leaf_hashes(&self) -> anyhow::Result<()> {
721        let mut tx = self.db.write().await?;
722
723        let mut proposals = tx.fetch("SELECT * FROM quorum_proposals");
724
725        let mut updates = vec![];
726        while let Some(row) = proposals.next().await {
727            let row = row?;
728
729            let hash: Option<String> = row.try_get("leaf_hash")?;
730            if hash.is_none() {
731                let view: i64 = row.try_get("view")?;
732                let data: Vec<u8> = row.try_get("data")?;
733                let proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
734                    bincode::deserialize(&data)?;
735                let leaf = Leaf::from_quorum_proposal(&proposal.data);
736                let leaf_hash = Committable::commit(&leaf);
737                tracing::info!(view, %leaf_hash, "populating quorum proposal leaf hash");
738                updates.push((view, leaf_hash.to_string()));
739            }
740        }
741        drop(proposals);
742
743        tx.upsert("quorum_proposals", ["view", "leaf_hash"], ["view"], updates)
744            .await?;
745
746        tx.commit().await
747    }
748
749    async fn is_migration_complete(&self, name: &str, table_name: &str) -> anyhow::Result<bool> {
750        let mut tx = self.db.read().await?;
751        let (completed,): (bool,) =
752            query_as("SELECT completed FROM data_migrations WHERE name = $1 AND table_name = $2")
753                .bind(name)
754                .bind(table_name)
755                .fetch_one(tx.as_mut())
756                .await
757                .context("migration tracking row missing - schema may be out of sync")?;
758        Ok(completed)
759    }
760
761    async fn mark_migration_complete(
762        tx: &mut Transaction<Write>,
763        name: &str,
764        table_name: &str,
765        migrated_rows: usize,
766    ) -> anyhow::Result<()> {
767        tx.execute(
768            query(
769                "UPDATE data_migrations SET completed = true, migrated_rows = $1 WHERE name = $2 \
770                 AND table_name = $3",
771            )
772            .bind(migrated_rows as i64)
773            .bind(name)
774            .bind(table_name),
775        )
776        .await?;
777        Ok(())
778    }
779
780    async fn generate_decide_events(
781        &self,
782        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
783        consumer: &impl EventConsumer,
784    ) -> anyhow::Result<()> {
785        let mut last_processed_view: Option<i64> = self
786            .db
787            .read()
788            .await?
789            .fetch_optional("SELECT last_processed_view FROM event_stream WHERE id = 1 LIMIT 1")
790            .await?
791            .map(|row| row.get("last_processed_view"));
792        loop {
793            // In SQLite, overlapping read and write transactions can lead to database errors. To
794            // avoid this:
795            // - start a read transaction to query and collect all the necessary data.
796            // - Commit (or implicitly drop) the read transaction once the data is fetched.
797            // - use the collected data to generate a "decide" event for the consumer.
798            // - begin a write transaction to delete the data and update the event stream.
799            let mut tx = self.db.read().await?;
800
801            // Collect a chain of consecutive leaves, starting from the first view after the last
802            // decide. This will correspond to a decide event, and defines a range of views which
803            // can be garbage collected. This may even include views for which there was no leaf,
804            // for which we might still have artifacts like proposals that never finalized.
805            let from_view = match last_processed_view {
806                Some(v) => v + 1,
807                None => 0,
808            };
809            tracing::debug!(?from_view, "generate decide event");
810
811            let mut parent = None;
812            let mut rows = query(
813                "SELECT leaf, qc, next_epoch_qc FROM anchor_leaf2 WHERE view >= $1 ORDER BY view",
814            )
815            .bind(from_view)
816            .fetch(tx.as_mut());
817            let mut leaves = vec![];
818            let mut final_qc = None;
819            while let Some(row) = rows.next().await {
820                let row = match row {
821                    Ok(row) => row,
822                    Err(err) => {
823                        // If there's an error getting a row, try generating an event with the rows
824                        // we do have.
825                        tracing::warn!("error loading row: {err:#}");
826                        break;
827                    },
828                };
829
830                let leaf_data: Vec<u8> = row.get("leaf");
831                let leaf = bincode::deserialize::<Leaf2>(&leaf_data)?;
832                let qc_data: Vec<u8> = row.get("qc");
833                let qc = bincode::deserialize::<QuorumCertificate2<SeqTypes>>(&qc_data)?;
834                let next_epoch_qc = match row.get::<Option<Vec<u8>>, _>("next_epoch_qc") {
835                    Some(bytes) => {
836                        Some(bincode::deserialize::<NextEpochQuorumCertificate2<SeqTypes>>(&bytes)?)
837                    },
838                    None => None,
839                };
840                let height = leaf.block_header().block_number();
841
842                // Ensure we are only dealing with a consecutive chain of leaves. We don't want to
843                // garbage collect any views for which we missed a leaf or decide event; at least
844                // not right away, in case we need to recover that data later.
845                if let Some(parent) = parent
846                    && height != parent + 1
847                {
848                    tracing::debug!(
849                        height,
850                        parent,
851                        "ending decide event at non-consecutive leaf"
852                    );
853                    break;
854                }
855                parent = Some(height);
856                leaves.push(leaf);
857                final_qc = Some(CertificatePair::new(qc, next_epoch_qc));
858            }
859            drop(rows);
860
861            let Some(final_qc) = final_qc else {
862                // End event processing when there are no more decided views.
863                tracing::debug!(from_view, "no new leaves at decide");
864                return Ok(());
865            };
866
867            // Find the range of views encompassed by this leaf chain. All data in this range can be
868            // processed by the consumer and then deleted.
869            let from_view = leaves[0].view_number();
870            let to_view = leaves[leaves.len() - 1].view_number();
871
872            // Collect VID shares for the decide event.
873            let mut vid_shares = tx
874                .fetch_all(
875                    query("SELECT view, data FROM vid_share2 where view >= $1 AND view <= $2")
876                        .bind(from_view.u64() as i64)
877                        .bind(to_view.u64() as i64),
878                )
879                .await?
880                .into_iter()
881                .map(|row| {
882                    let view: i64 = row.get("view");
883                    let data: Vec<u8> = row.get("data");
884                    let vid_proposal = bincode::deserialize::<
885                        Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
886                    >(&data)?;
887                    Ok((view as u64, vid_proposal.data))
888                })
889                .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
890
891            // Collect DA proposals for the decide event.
892            let mut da_proposals = tx
893                .fetch_all(
894                    query("SELECT view, data FROM da_proposal2 where view >= $1 AND view <= $2")
895                        .bind(from_view.u64() as i64)
896                        .bind(to_view.u64() as i64),
897                )
898                .await?
899                .into_iter()
900                .map(|row| {
901                    let view: i64 = row.get("view");
902                    let data: Vec<u8> = row.get("data");
903                    let da_proposal =
904                        bincode::deserialize::<Proposal<SeqTypes, DaProposal2<SeqTypes>>>(&data)?;
905                    Ok((view as u64, da_proposal.data))
906                })
907                .collect::<anyhow::Result<BTreeMap<_, _>>>()?;
908
909            // Collect state certs for the decide event.
910            let state_certs = Self::load_state_certs(&mut tx, from_view, to_view)
911                .await
912                .inspect_err(|err| {
913                    tracing::error!(
914                        ?from_view,
915                        ?to_view,
916                        "failed to load state certificates. error={err:#}"
917                    );
918                })?;
919            drop(tx);
920
921            // Collate all the information by view number and construct a chain of leaves.
922            let leaf_chain = leaves
923                .into_iter()
924                // Go in reverse chronological order, as expected by Decide events.
925                .rev()
926                .map(|mut leaf| {
927                    let view = leaf.view_number();
928
929                    // Include the VID share if available.
930                    let vid_share = vid_shares.remove(&view);
931                    if vid_share.is_none() {
932                        tracing::debug!(?view, "VID share not available at decide");
933                    }
934
935                    // Fill in the full block payload using the DA proposals we had persisted.
936                    if let Some(proposal) = da_proposals.remove(&view) {
937                        let payload =
938                            Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata);
939                        leaf.fill_block_payload_unchecked(payload);
940                    } else if view == ViewNumber::genesis() {
941                        // We don't get a DA proposal for the genesis view, but we know what the
942                        // payload always is.
943                        leaf.fill_block_payload_unchecked(Payload::empty().0);
944                    } else {
945                        tracing::debug!(?view, "DA proposal not available at decide");
946                    }
947
948                    let state_cert = state_certs
949                        .get(&view)
950                        .cloned();
951
952                    LeafInfo {
953                        leaf,
954                        vid_share,
955                        state_cert,
956                        // Note: the following fields are not used in Decide event processing, and
957                        // should be removed. For now, we just default them.
958                        state: Default::default(),
959                        delta: Default::default(),
960                    }
961                })
962                .collect();
963
964            {
965                // Generate decide event for the consumer.
966                tracing::debug!(
967                    ?from_view,
968                    ?to_view,
969                    ?final_qc,
970                    ?leaf_chain,
971                    "generating decide event"
972                );
973                // Insert the deciding QC at the appropriate position, with the last decide event in
974                // the chain.
975                let deciding_qc = if let Some(deciding_qc) = &deciding_qc {
976                    (deciding_qc.view_number() == final_qc.view_number() + 1)
977                        .then_some(deciding_qc.clone())
978                } else {
979                    None
980                };
981                consumer
982                    .handle_event(&Event {
983                        view_number: to_view,
984                        event: EventType::Decide {
985                            leaf_chain: Arc::new(leaf_chain),
986                            committing_qc: Arc::new(final_qc),
987                            deciding_qc,
988                            block_size: None,
989                        },
990                    })
991                    .await?;
992            }
993
994            let mut tx = self.db.write().await?;
995
996            // Now that we have definitely processed leaves up to `to_view`, we can update
997            // `last_processed_view` so we don't process these leaves again. We may still fail at
998            // this point, or shut down, and fail to complete this update. At worst this will lead
999            // to us sending a duplicate decide event the next time we are called; this is fine as
1000            // the event consumer is required to be idempotent.
1001            tx.upsert(
1002                "event_stream",
1003                ["id", "last_processed_view"],
1004                ["id"],
1005                [(1i32, to_view.u64() as i64)],
1006            )
1007            .await?;
1008
1009            // Store all the finalized state certs
1010            for (epoch, state_cert) in state_certs {
1011                let state_cert_bytes = bincode::serialize(&state_cert)?;
1012                tx.upsert(
1013                    "finalized_state_cert",
1014                    ["epoch", "state_cert"],
1015                    ["epoch"],
1016                    [(epoch as i64, state_cert_bytes)],
1017                )
1018                .await?;
1019            }
1020
1021            // Delete the data that has been fully processed.
1022            tx.execute(
1023                query("DELETE FROM vid_share2 where view >= $1 AND view <= $2")
1024                    .bind(from_view.u64() as i64)
1025                    .bind(to_view.u64() as i64),
1026            )
1027            .await?;
1028            tx.execute(
1029                query("DELETE FROM da_proposal2 where view >= $1 AND view <= $2")
1030                    .bind(from_view.u64() as i64)
1031                    .bind(to_view.u64() as i64),
1032            )
1033            .await?;
1034            tx.execute(
1035                query("DELETE FROM quorum_proposals2 where view >= $1 AND view <= $2")
1036                    .bind(from_view.u64() as i64)
1037                    .bind(to_view.u64() as i64),
1038            )
1039            .await?;
1040            tx.execute(
1041                query("DELETE FROM quorum_certificate2 where view >= $1 AND view <= $2")
1042                    .bind(from_view.u64() as i64)
1043                    .bind(to_view.u64() as i64),
1044            )
1045            .await?;
1046            tx.execute(
1047                query("DELETE FROM state_cert where view >= $1 AND view <= $2")
1048                    .bind(from_view.u64() as i64)
1049                    .bind(to_view.u64() as i64),
1050            )
1051            .await?;
1052
1053            // Clean up leaves, but do not delete the most recent one (all leaves with a view number
1054            // less than the given value). This is necessary to ensure that, in case of a restart,
1055            // we can resume from the last decided leaf.
1056            tx.execute(
1057                query("DELETE FROM anchor_leaf2 WHERE view >= $1 AND view < $2")
1058                    .bind(from_view.u64() as i64)
1059                    .bind(to_view.u64() as i64),
1060            )
1061            .await?;
1062
1063            tx.commit().await?;
1064            last_processed_view = Some(to_view.u64() as i64);
1065        }
1066    }
1067
1068    async fn load_state_certs(
1069        tx: &mut Transaction<Read>,
1070        from_view: ViewNumber,
1071        to_view: ViewNumber,
1072    ) -> anyhow::Result<BTreeMap<u64, LightClientStateUpdateCertificateV2<SeqTypes>>> {
1073        let rows = tx
1074            .fetch_all(
1075                query("SELECT view, state_cert FROM state_cert WHERE view >= $1 AND view <= $2")
1076                    .bind(from_view.u64() as i64)
1077                    .bind(to_view.u64() as i64),
1078            )
1079            .await?;
1080
1081        let mut result = BTreeMap::new();
1082
1083        for row in rows {
1084            let data: Vec<u8> = row.get("state_cert");
1085
1086            let cert: LightClientStateUpdateCertificateV2<SeqTypes> = bincode::deserialize(&data)
1087                .or_else(|err_v2| {
1088                bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&data)
1089                    .map(Into::into)
1090                    .context(format!(
1091                        "Failed to deserialize LightClientStateUpdateCertificate: with v1 and v2. \
1092                         error: {err_v2}"
1093                    ))
1094            })?;
1095
1096            result.insert(cert.epoch.u64(), cert);
1097        }
1098
1099        Ok(result)
1100    }
1101
1102    #[tracing::instrument(skip(self))]
1103    async fn prune(&self, cur_view: ViewNumber) -> anyhow::Result<()> {
1104        let mut tx = self.db.write().await?;
1105
1106        // Prune everything older than the target retention period.
1107        prune_to_view(
1108            &mut tx,
1109            cur_view.u64().saturating_sub(self.gc_opt.target_retention),
1110        )
1111        .await?;
1112
1113        // Check our storage usage; if necessary we will prune more aggressively (up to the minimum
1114        // retention) to get below the target usage.
1115        #[cfg(feature = "embedded-db")]
1116        let usage_query = format!(
1117            "SELECT sum(pgsize) FROM dbstat WHERE name IN ({})",
1118            PRUNE_TABLES
1119                .iter()
1120                .map(|table| format!("'{table}'"))
1121                .join(",")
1122        );
1123
1124        #[cfg(not(feature = "embedded-db"))]
1125        let usage_query = {
1126            let table_sizes = PRUNE_TABLES
1127                .iter()
1128                .map(|table| format!("pg_table_size('{table}')"))
1129                .join(" + ");
1130            format!("SELECT {table_sizes}")
1131        };
1132
1133        let (usage,): (i64,) = query_as(&usage_query).fetch_one(tx.as_mut()).await?;
1134        tracing::debug!(usage, "consensus storage usage after pruning");
1135
1136        if (usage as u64) > self.gc_opt.target_usage {
1137            tracing::warn!(
1138                usage,
1139                gc_opt = ?self.gc_opt,
1140                "consensus storage is running out of space, pruning to minimum retention"
1141            );
1142            prune_to_view(
1143                &mut tx,
1144                cur_view.u64().saturating_sub(self.gc_opt.minimum_retention),
1145            )
1146            .await?;
1147        }
1148
1149        tx.commit().await
1150    }
1151}
1152
1153const PRUNE_TABLES: &[&str] = &[
1154    "anchor_leaf2",
1155    "vid_share2",
1156    "da_proposal2",
1157    "quorum_proposals2",
1158    "quorum_certificate2",
1159];
1160
1161async fn prune_to_view(tx: &mut Transaction<Write>, view: u64) -> anyhow::Result<()> {
1162    if view == 0 {
1163        // Nothing to prune, the entire chain is younger than the retention period.
1164        return Ok(());
1165    }
1166    tracing::debug!(view, "pruning consensus storage");
1167
1168    for table in PRUNE_TABLES {
1169        let res = query(&format!("DELETE FROM {table} WHERE view < $1"))
1170            .bind(view as i64)
1171            .execute(tx.as_mut())
1172            .await
1173            .context(format!("pruning {table}"))?;
1174        if res.rows_affected() > 0 {
1175            tracing::info!(
1176                "garbage collected {} rows from {table}",
1177                res.rows_affected()
1178            );
1179        }
1180    }
1181
1182    Ok(())
1183}
1184
1185#[async_trait]
1186impl SequencerPersistence for Persistence {
1187    async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()> {
1188        let batch_size: i64 = 1000;
1189
1190        let result = {
1191            let mut tx = self.db.read().await?;
1192            query_as::<(bool, i64)>(
1193                "SELECT completed, migrated_rows FROM epoch_migration WHERE table_name = \
1194                 'reward_merkle_tree_v2_data'",
1195            )
1196            .fetch_optional(tx.as_mut())
1197            .await?
1198        };
1199
1200        let (is_completed, mut offset) = result.unwrap_or((false, 0));
1201
1202        if is_completed {
1203            tracing::info!("reward_merkle_tree_v2 migration already done");
1204            return Ok(());
1205        }
1206
1207        let max_height: Option<i64> = {
1208            let mut tx = self.db.read().await?;
1209            query_as::<(Option<i64>,)>("SELECT MAX(created) FROM reward_merkle_tree_v2")
1210                .fetch_one(tx.as_mut())
1211                .await?
1212                .0
1213        };
1214
1215        let max_height = match max_height {
1216            Some(h) => h,
1217            None => {
1218                tracing::info!("no reward data found in reward_merkle_tree_v2, skipping migration");
1219                return Ok(());
1220            },
1221        };
1222
1223        tracing::warn!(
1224            "migrating reward_merkle_tree_v2 to reward_merkle_tree_v2_data at height \
1225             {max_height}..."
1226        );
1227
1228        let mut balances: Vec<(RewardAccountV2, RewardAmount)> = Vec::new();
1229
1230        loop {
1231            let mut tx = self.db.read().await?;
1232
1233            #[cfg(not(feature = "embedded-db"))]
1234            let rows = query_as::<(serde_json::Value, serde_json::Value)>(
1235                "SELECT DISTINCT ON (idx) idx, entry
1236                   FROM reward_merkle_tree_v2
1237                  WHERE idx IS NOT NULL AND entry IS NOT NULL
1238                  ORDER BY idx, created DESC
1239                  LIMIT $1 OFFSET $2",
1240            )
1241            .bind(batch_size)
1242            .bind(offset)
1243            .fetch_all(tx.as_mut())
1244            .await
1245            .context("loading reward accounts from reward_merkle_tree_v2")?;
1246
1247            #[cfg(feature = "embedded-db")]
1248            let rows = query_as::<(serde_json::Value, serde_json::Value)>(
1249                "SELECT idx, entry FROM (
1250                     SELECT idx, entry, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY created DESC) \
1251                 as rn
1252                       FROM reward_merkle_tree_v2
1253                      WHERE idx IS NOT NULL AND entry IS NOT NULL
1254                 ) sub
1255                 WHERE rn = 1 ORDER BY idx
1256                 LIMIT $1 OFFSET $2",
1257            )
1258            .bind(batch_size)
1259            .bind(offset)
1260            .fetch_all(tx.as_mut())
1261            .await
1262            .context("loading reward accounts from reward_merkle_tree_v2")?;
1263
1264            drop(tx);
1265
1266            if rows.is_empty() {
1267                break;
1268            }
1269
1270            let rows_count = rows.len();
1271
1272            for (idx, entry) in rows {
1273                let account: RewardAccountV2 =
1274                    serde_json::from_value(idx).context("deserializing reward account")?;
1275                let balance: RewardAmount = serde_json::from_value(entry).context(format!(
1276                    "deserializing reward balance for account {account}"
1277                ))?;
1278                balances.push((account, balance));
1279            }
1280
1281            offset += rows_count as i64;
1282            let mut tx = self.db.write().await?;
1283            tx.upsert(
1284                "epoch_migration",
1285                ["table_name", "completed", "migrated_rows"],
1286                ["table_name"],
1287                [("reward_merkle_tree_v2_data".to_string(), false, offset)],
1288            )
1289            .await?;
1290            tx.commit().await?;
1291
1292            tracing::info!(
1293                "reward_merkle_tree_v2 progress: rows={} offset={}",
1294                rows_count,
1295                offset
1296            );
1297
1298            if rows_count < batch_size as usize {
1299                break;
1300            }
1301        }
1302
1303        if balances.is_empty() {
1304            tracing::info!("no reward accounts found, skipping tree rebuild");
1305            return Ok(());
1306        }
1307
1308        tracing::info!(
1309            "rebuilding RewardMerkleTreeV2 from {} accounts",
1310            balances.len()
1311        );
1312
1313        let tree = RewardMerkleTreeV2::from_kv_set(REWARD_MERKLE_TREE_V2_HEIGHT, balances)
1314            .context("failed to rebuild RewardMerkleTreeV2 from balances")?;
1315
1316        let mut tx = self.db.read().await?;
1317        let header = tx
1318            .get_header(BlockId::<SeqTypes>::from(max_height as usize))
1319            .await
1320            .context(format!("header {max_height} not available"))?;
1321        drop(tx);
1322
1323        match header.reward_merkle_tree_root() {
1324            Either::Right(expected_root) => {
1325                ensure!(
1326                    tree.commitment() == expected_root,
1327                    "rebuilt RewardMerkleTreeV2 commitment {} does not match header commitment {} \
1328                     at height {max_height}",
1329                    tree.commitment(),
1330                    expected_root,
1331                );
1332            },
1333            Either::Left(_) => {
1334                bail!(
1335                    "header at height {max_height} has a v1 reward merkle tree root, expected v2"
1336                );
1337            },
1338        }
1339
1340        let tree_data: RewardMerkleTreeV2Data = (&tree)
1341            .try_into()
1342            .context("failed to convert RewardMerkleTreeV2 to RewardMerkleTreeV2Data")?;
1343        let serialized =
1344            bincode::serialize(&tree_data).context("failed to serialize RewardMerkleTreeV2Data")?;
1345
1346        let mut tx = self.db.write().await?;
1347        tx.upsert(
1348            "reward_merkle_tree_v2_data",
1349            ["height", "balances"],
1350            ["height"],
1351            [(max_height, serialized)],
1352        )
1353        .await?;
1354        tx.commit().await?;
1355
1356        // Mark migration as complete
1357        let mut tx = self.db.write().await?;
1358        tx.upsert(
1359            "epoch_migration",
1360            ["table_name", "completed", "migrated_rows"],
1361            ["table_name"],
1362            [("reward_merkle_tree_v2_data".to_string(), true, offset)],
1363        )
1364        .await?;
1365        tx.commit().await?;
1366
1367        tracing::warn!("migrated reward_merkle_tree_v2 at height {max_height}");
1368
1369        Ok(())
1370    }
1371
1372    fn into_catchup_provider(
1373        self,
1374        backoff: BackoffParams,
1375    ) -> anyhow::Result<Arc<dyn StateCatchup>> {
1376        Ok(Arc::new(SqlStateCatchup::new(Arc::new(self.db), backoff)))
1377    }
1378
1379    async fn load_config(&self) -> anyhow::Result<Option<NetworkConfig>> {
1380        tracing::info!("loading config from Postgres");
1381
1382        // Select the most recent config (although there should only be one).
1383        let Some(row) = self
1384            .db
1385            .read()
1386            .await?
1387            .fetch_optional("SELECT config FROM network_config ORDER BY id DESC LIMIT 1")
1388            .await?
1389        else {
1390            tracing::info!("config not found");
1391            return Ok(None);
1392        };
1393        let json = row.try_get("config")?;
1394
1395        let json = migrate_network_config(json).context("migration of network config failed")?;
1396        let config = serde_json::from_value(json).context("malformed config file")?;
1397
1398        Ok(Some(config))
1399    }
1400
1401    async fn save_config(&self, cfg: &NetworkConfig) -> anyhow::Result<()> {
1402        tracing::info!("saving config to database");
1403        let json = serde_json::to_value(cfg)?;
1404
1405        let mut tx = self.db.write().await?;
1406        tx.execute(query("INSERT INTO network_config (config) VALUES ($1)").bind(json))
1407            .await?;
1408        tx.commit().await
1409    }
1410
1411    async fn append_decided_leaves(
1412        &self,
1413        view: ViewNumber,
1414        leaf_chain: impl IntoIterator<Item = (&LeafInfo<SeqTypes>, CertificatePair<SeqTypes>)> + Send,
1415        deciding_qc: Option<Arc<CertificatePair<SeqTypes>>>,
1416        consumer: &(impl EventConsumer + 'static),
1417    ) -> anyhow::Result<()> {
1418        let values = leaf_chain
1419            .into_iter()
1420            .map(|(info, cert)| {
1421                // The leaf may come with a large payload attached. We don't care about this payload
1422                // because we already store it separately, as part of the DA proposal. Storing it
1423                // here contributes to load on the DB for no reason, so we remove it before
1424                // serializing the leaf.
1425                let mut leaf = info.leaf.clone();
1426                leaf.unfill_block_payload();
1427
1428                let view = cert.view_number().u64() as i64;
1429                let leaf_bytes = bincode::serialize(&leaf)?;
1430                let qc_bytes = bincode::serialize(cert.qc())?;
1431                let next_epoch_qc_bytes = match cert.next_epoch_qc() {
1432                    Some(qc) => Some(bincode::serialize(qc)?),
1433                    None => None,
1434                };
1435                Ok((view, leaf_bytes, qc_bytes, next_epoch_qc_bytes))
1436            })
1437            .collect::<anyhow::Result<Vec<_>>>()?;
1438
1439        // First, append the new leaves. We do this in its own transaction because even if GC or the
1440        // event consumer later fails, there is no need to abort the storage of the leaves.
1441        let mut tx = self.db.write().await?;
1442
1443        tx.upsert(
1444            "anchor_leaf2",
1445            ["view", "leaf", "qc", "next_epoch_qc"],
1446            ["view"],
1447            values,
1448        )
1449        .await?;
1450        tx.commit().await?;
1451
1452        // Generate an event for the new leaves and, only if it succeeds, clean up data we no longer
1453        // need.
1454        if let Err(err) = self.generate_decide_events(deciding_qc, consumer).await {
1455            // GC/event processing failure is not an error, since by this point we have at least
1456            // managed to persist the decided leaves successfully, and GC will just run again at the
1457            // next decide. Log an error but do not return it.
1458            tracing::warn!(?view, "event processing failed: {err:#}");
1459            return Ok(());
1460        }
1461
1462        // Garbage collect data which was not included in any decide event, but which at this point
1463        // is old enough to just forget about.
1464        if let Err(err) = self.prune(view).await {
1465            tracing::warn!(?view, "pruning failed: {err:#}");
1466        }
1467
1468        Ok(())
1469    }
1470
1471    async fn load_latest_acted_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1472        Ok(self
1473            .db
1474            .read()
1475            .await?
1476            .fetch_optional(query("SELECT view FROM highest_voted_view WHERE id = 0"))
1477            .await?
1478            .map(|row| {
1479                let view: i64 = row.get("view");
1480                ViewNumber::new(view as u64)
1481            }))
1482    }
1483
1484    async fn load_restart_view(&self) -> anyhow::Result<Option<ViewNumber>> {
1485        Ok(self
1486            .db
1487            .read()
1488            .await?
1489            .fetch_optional(query("SELECT view FROM restart_view WHERE id = 0"))
1490            .await?
1491            .map(|row| {
1492                let view: i64 = row.get("view");
1493                ViewNumber::new(view as u64)
1494            }))
1495    }
1496
1497    async fn load_anchor_leaf(
1498        &self,
1499    ) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
1500        let Some(row) = self
1501            .db
1502            .read()
1503            .await?
1504            .fetch_optional("SELECT leaf, qc FROM anchor_leaf2 ORDER BY view DESC LIMIT 1")
1505            .await?
1506        else {
1507            return Ok(None);
1508        };
1509
1510        let leaf_bytes: Vec<u8> = row.get("leaf");
1511        let leaf2: Leaf2 = bincode::deserialize(&leaf_bytes)?;
1512
1513        let qc_bytes: Vec<u8> = row.get("qc");
1514        let qc2: QuorumCertificate2<SeqTypes> = bincode::deserialize(&qc_bytes)?;
1515
1516        Ok(Some((leaf2, qc2)))
1517    }
1518
1519    async fn load_anchor_view(&self) -> anyhow::Result<ViewNumber> {
1520        let mut tx = self.db.read().await?;
1521        let (view,) = query_as::<(i64,)>("SELECT coalesce(max(view), 0) FROM anchor_leaf2")
1522            .fetch_one(tx.as_mut())
1523            .await?;
1524        Ok(ViewNumber::new(view as u64))
1525    }
1526
1527    async fn load_da_proposal(
1528        &self,
1529        view: ViewNumber,
1530    ) -> anyhow::Result<Option<Proposal<SeqTypes, DaProposal2<SeqTypes>>>> {
1531        let result = self
1532            .db
1533            .read()
1534            .await?
1535            .fetch_optional(
1536                query("SELECT data FROM da_proposal2 where view = $1").bind(view.u64() as i64),
1537            )
1538            .await?;
1539
1540        result
1541            .map(|row| {
1542                let bytes: Vec<u8> = row.get("data");
1543                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1544            })
1545            .transpose()
1546    }
1547
1548    async fn load_vid_share(
1549        &self,
1550        view: ViewNumber,
1551    ) -> anyhow::Result<Option<Proposal<SeqTypes, VidDisperseShare<SeqTypes>>>> {
1552        let result = self
1553            .db
1554            .read()
1555            .await?
1556            .fetch_optional(
1557                query("SELECT data FROM vid_share2 where view = $1").bind(view.u64() as i64),
1558            )
1559            .await?;
1560
1561        result
1562            .map(|row| {
1563                let bytes: Vec<u8> = row.get("data");
1564                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1565            })
1566            .transpose()
1567    }
1568
1569    async fn load_quorum_proposals(
1570        &self,
1571    ) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>>>
1572    {
1573        let rows = self
1574            .db
1575            .read()
1576            .await?
1577            .fetch_all("SELECT * FROM quorum_proposals2")
1578            .await?;
1579
1580        Ok(BTreeMap::from_iter(
1581            rows.into_iter()
1582                .map(|row| {
1583                    let view: i64 = row.get("view");
1584                    let view_number: ViewNumber = ViewNumber::new(view.try_into()?);
1585                    let bytes: Vec<u8> = row.get("data");
1586                    let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1587                        bincode::deserialize(&bytes).or_else(|error| {
1588                            bincode::deserialize::<
1589                                Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>,
1590                            >(&bytes)
1591                            .map(convert_proposal)
1592                            .inspect_err(|err_v3| {
1593                                tracing::warn!(
1594                                    ?view_number,
1595                                    %error,
1596                                    %err_v3,
1597                                    "ignoring malformed quorum proposal DB row"
1598                                );
1599                            })
1600                        })?;
1601                    Ok((view_number, proposal))
1602                })
1603                .collect::<anyhow::Result<Vec<_>>>()?,
1604        ))
1605    }
1606
1607    async fn load_quorum_proposal(
1608        &self,
1609        view: ViewNumber,
1610    ) -> anyhow::Result<Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>> {
1611        let mut tx = self.db.read().await?;
1612        let (data,) =
1613            query_as::<(Vec<u8>,)>("SELECT data FROM quorum_proposals2 WHERE view = $1 LIMIT 1")
1614                .bind(view.u64() as i64)
1615                .fetch_one(tx.as_mut())
1616                .await?;
1617        let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
1618            bincode::deserialize(&data).or_else(|error| {
1619                bincode::deserialize::<Proposal<SeqTypes, QuorumProposalWrapperLegacy<SeqTypes>>>(
1620                    &data,
1621                )
1622                .map(convert_proposal)
1623                .context(format!(
1624                    "Failed to deserialize quorum proposal for view {view}. error={error}"
1625                ))
1626            })?;
1627
1628        Ok(proposal)
1629    }
1630
1631    async fn append_vid(
1632        &self,
1633        proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
1634    ) -> anyhow::Result<()> {
1635        let view = proposal.data.view_number().u64();
1636        let payload_hash = proposal.data.payload_commitment();
1637        let data_bytes = bincode::serialize(proposal).unwrap();
1638
1639        let now = Instant::now();
1640        let mut tx = self.db.write().await?;
1641        tx.upsert(
1642            "vid_share2",
1643            ["view", "data", "payload_hash"],
1644            ["view"],
1645            [(view as i64, data_bytes, payload_hash.to_string())],
1646        )
1647        .await?;
1648        let res = tx.commit().await;
1649        self.internal_metrics
1650            .internal_append_vid_duration
1651            .add_point(now.elapsed().as_secs_f64());
1652        res
1653    }
1654
1655    async fn append_da(
1656        &self,
1657        proposal: &Proposal<SeqTypes, DaProposal<SeqTypes>>,
1658        vid_commit: VidCommitment,
1659    ) -> anyhow::Result<()> {
1660        let data = &proposal.data;
1661        let view = data.view_number().u64();
1662        let data_bytes = bincode::serialize(proposal).unwrap();
1663
1664        let now = Instant::now();
1665        let mut tx = self.db.write().await?;
1666        tx.upsert(
1667            "da_proposal",
1668            ["view", "data", "payload_hash"],
1669            ["view"],
1670            [(view as i64, data_bytes, vid_commit.to_string())],
1671        )
1672        .await?;
1673        let res = tx.commit().await;
1674        self.internal_metrics
1675            .internal_append_da_duration
1676            .add_point(now.elapsed().as_secs_f64());
1677        res
1678    }
1679
1680    async fn record_action(
1681        &self,
1682        view: ViewNumber,
1683        _epoch: Option<EpochNumber>,
1684        action: HotShotAction,
1685    ) -> anyhow::Result<()> {
1686        // Todo Remove this after https://github.com/EspressoSystems/espresso-network/issues/1931
1687        if !matches!(action, HotShotAction::Propose | HotShotAction::Vote) {
1688            return Ok(());
1689        }
1690
1691        let stmt = format!(
1692            "INSERT INTO highest_voted_view (id, view) VALUES (0, $1)
1693            ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(highest_voted_view.view, excluded.view)"
1694        );
1695
1696        let mut tx = self.db.write().await?;
1697        tx.execute(query(&stmt).bind(view.u64() as i64)).await?;
1698
1699        if matches!(action, HotShotAction::Vote) {
1700            let restart_view = view + 1;
1701            let stmt = format!(
1702                "INSERT INTO restart_view (id, view) VALUES (0, $1)
1703                ON CONFLICT (id) DO UPDATE SET view = {MAX_FN}(restart_view.view, excluded.view)"
1704            );
1705            tx.execute(query(&stmt).bind(restart_view.u64() as i64))
1706                .await?;
1707        }
1708
1709        tx.commit().await
1710    }
1711
1712    async fn append_quorum_proposal2(
1713        &self,
1714        proposal: &Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>>,
1715    ) -> anyhow::Result<()> {
1716        let view_number = proposal.data.view_number().u64();
1717
1718        let proposal_bytes = bincode::serialize(&proposal).context("serializing proposal")?;
1719        let leaf_hash = Committable::commit(&Leaf2::from_quorum_proposal(&proposal.data));
1720
1721        let now = Instant::now();
1722        let mut tx = self.db.write().await?;
1723        tx.upsert(
1724            "quorum_proposals2",
1725            ["view", "leaf_hash", "data"],
1726            ["view"],
1727            [(view_number as i64, leaf_hash.to_string(), proposal_bytes)],
1728        )
1729        .await?;
1730
1731        // We also keep track of any QC we see in case we need it to recover our archival storage.
1732        let justify_qc = proposal.data.justify_qc();
1733        let justify_qc_bytes = bincode::serialize(&justify_qc).context("serializing QC")?;
1734        tx.upsert(
1735            "quorum_certificate2",
1736            ["view", "leaf_hash", "data"],
1737            ["view"],
1738            [(
1739                justify_qc.view_number.u64() as i64,
1740                justify_qc.data.leaf_commit.to_string(),
1741                &justify_qc_bytes,
1742            )],
1743        )
1744        .await?;
1745        let res = tx.commit().await;
1746        self.internal_metrics
1747            .internal_append_quorum2_duration
1748            .add_point(now.elapsed().as_secs_f64());
1749        res
1750    }
1751
1752    async fn load_upgrade_certificate(
1753        &self,
1754    ) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
1755        let result = self
1756            .db
1757            .read()
1758            .await?
1759            .fetch_optional("SELECT * FROM upgrade_certificate where id = true")
1760            .await?;
1761
1762        result
1763            .map(|row| {
1764                let bytes: Vec<u8> = row.get("data");
1765                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
1766            })
1767            .transpose()
1768    }
1769
1770    async fn store_upgrade_certificate(
1771        &self,
1772        decided_upgrade_certificate: Option<UpgradeCertificate<SeqTypes>>,
1773    ) -> anyhow::Result<()> {
1774        let certificate = match decided_upgrade_certificate {
1775            Some(cert) => cert,
1776            None => return Ok(()),
1777        };
1778        let upgrade_certificate_bytes =
1779            bincode::serialize(&certificate).context("serializing upgrade certificate")?;
1780        let mut tx = self.db.write().await?;
1781        tx.upsert(
1782            "upgrade_certificate",
1783            ["id", "data"],
1784            ["id"],
1785            [(true, upgrade_certificate_bytes)],
1786        )
1787        .await?;
1788        tx.commit().await
1789    }
1790
1791    async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
1792        let batch_size: i64 = 10000;
1793        let mut tx = self.db.read().await?;
1794
1795        // The SQL migration populates the table name and sets a default value of 0 for migrated rows.
1796        // so, fetch_one() would always return a row
1797        // The number of migrated rows is updated after each batch insert.
1798        // This allows the types migration to resume from where it left off.
1799        let (is_completed, mut offset) = query_as::<(bool, i64)>(
1800            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'anchor_leaf'",
1801        )
1802        .fetch_one(tx.as_mut())
1803        .await?;
1804
1805        if is_completed {
1806            tracing::info!("decided leaves already migrated");
1807            return Ok(());
1808        }
1809
1810        tracing::warn!("migrating decided leaves..");
1811        loop {
1812            let mut tx = self.db.read().await?;
1813            let rows = query(
1814                "SELECT view, leaf, qc FROM anchor_leaf WHERE view >= $1 ORDER BY view LIMIT $2",
1815            )
1816            .bind(offset)
1817            .bind(batch_size)
1818            .fetch_all(tx.as_mut())
1819            .await?;
1820
1821            drop(tx);
1822            if rows.is_empty() {
1823                break;
1824            }
1825            let mut values = Vec::new();
1826
1827            for row in rows.iter() {
1828                let leaf: Vec<u8> = row.try_get("leaf")?;
1829                let qc: Vec<u8> = row.try_get("qc")?;
1830                let leaf1: Leaf = bincode::deserialize(&leaf)?;
1831                let qc1: QuorumCertificate<SeqTypes> = bincode::deserialize(&qc)?;
1832                let view: i64 = row.try_get("view")?;
1833
1834                let leaf2: Leaf2 = leaf1.into();
1835                let qc2: QuorumCertificate2<SeqTypes> = qc1.to_qc2();
1836
1837                let leaf2_bytes = bincode::serialize(&leaf2)?;
1838                let qc2_bytes = bincode::serialize(&qc2)?;
1839
1840                values.push((view, leaf2_bytes, qc2_bytes));
1841            }
1842
1843            let mut query_builder: sqlx::QueryBuilder<Db> =
1844                sqlx::QueryBuilder::new("INSERT INTO anchor_leaf2 (view, leaf, qc) ");
1845
1846            offset = values.last().context("last row")?.0;
1847
1848            query_builder.push_values(values, |mut b, (view, leaf, qc)| {
1849                b.push_bind(view).push_bind(leaf).push_bind(qc);
1850            });
1851
1852            // Offset tracking prevents duplicate inserts
1853            // Added as a safeguard.
1854            query_builder.push(" ON CONFLICT DO NOTHING");
1855
1856            let query = query_builder.build();
1857
1858            let mut tx = self.db.write().await?;
1859            query.execute(tx.as_mut()).await?;
1860
1861            tx.upsert(
1862                "epoch_migration",
1863                ["table_name", "completed", "migrated_rows"],
1864                ["table_name"],
1865                [("anchor_leaf".to_string(), false, offset)],
1866            )
1867            .await?;
1868            tx.commit().await?;
1869
1870            tracing::info!(
1871                "anchor leaf migration progress: rows={} offset={}",
1872                rows.len(),
1873                offset
1874            );
1875
1876            if rows.len() < batch_size as usize {
1877                break;
1878            }
1879        }
1880
1881        tracing::warn!("migrated decided leaves");
1882
1883        let mut tx = self.db.write().await?;
1884        tx.upsert(
1885            "epoch_migration",
1886            ["table_name", "completed", "migrated_rows"],
1887            ["table_name"],
1888            [("anchor_leaf".to_string(), true, offset)],
1889        )
1890        .await?;
1891        tx.commit().await?;
1892
1893        tracing::info!("updated epoch_migration table for anchor_leaf");
1894
1895        Ok(())
1896    }
1897
1898    async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
1899        let batch_size: i64 = 10000;
1900        let mut tx = self.db.read().await?;
1901
1902        let (is_completed, mut offset) = query_as::<(bool, i64)>(
1903            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'da_proposal'",
1904        )
1905        .fetch_one(tx.as_mut())
1906        .await?;
1907
1908        if is_completed {
1909            tracing::info!("da proposals migration already done");
1910            return Ok(());
1911        }
1912
1913        tracing::warn!("migrating da proposals..");
1914
1915        loop {
1916            let mut tx = self.db.read().await?;
1917            let rows = query(
1918                "SELECT payload_hash, data FROM da_proposal WHERE view >= $1 ORDER BY view LIMIT \
1919                 $2",
1920            )
1921            .bind(offset)
1922            .bind(batch_size)
1923            .fetch_all(tx.as_mut())
1924            .await?;
1925
1926            drop(tx);
1927            if rows.is_empty() {
1928                break;
1929            }
1930            let mut values = Vec::new();
1931
1932            for row in rows.iter() {
1933                let data: Vec<u8> = row.try_get("data")?;
1934                let payload_hash: String = row.try_get("payload_hash")?;
1935
1936                let da_proposal: Proposal<SeqTypes, DaProposal<SeqTypes>> =
1937                    bincode::deserialize(&data)?;
1938                let da_proposal2: Proposal<SeqTypes, DaProposal2<SeqTypes>> =
1939                    convert_proposal(da_proposal);
1940
1941                let view = da_proposal2.data.view_number.u64() as i64;
1942                let data = bincode::serialize(&da_proposal2)?;
1943
1944                values.push((view, payload_hash, data));
1945            }
1946
1947            let mut query_builder: sqlx::QueryBuilder<Db> =
1948                sqlx::QueryBuilder::new("INSERT INTO da_proposal2 (view, payload_hash, data) ");
1949
1950            offset = values.last().context("last row")?.0;
1951            query_builder.push_values(values, |mut b, (view, payload_hash, data)| {
1952                b.push_bind(view).push_bind(payload_hash).push_bind(data);
1953            });
1954            query_builder.push(" ON CONFLICT DO NOTHING");
1955            let query = query_builder.build();
1956
1957            let mut tx = self.db.write().await?;
1958            query.execute(tx.as_mut()).await?;
1959
1960            tx.upsert(
1961                "epoch_migration",
1962                ["table_name", "completed", "migrated_rows"],
1963                ["table_name"],
1964                [("da_proposal".to_string(), false, offset)],
1965            )
1966            .await?;
1967            tx.commit().await?;
1968
1969            tracing::info!(
1970                "DA proposals migration progress: rows={} offset={}",
1971                rows.len(),
1972                offset
1973            );
1974            if rows.len() < batch_size as usize {
1975                break;
1976            }
1977        }
1978
1979        tracing::warn!("migrated da proposals");
1980
1981        let mut tx = self.db.write().await?;
1982        tx.upsert(
1983            "epoch_migration",
1984            ["table_name", "completed", "migrated_rows"],
1985            ["table_name"],
1986            [("da_proposal".to_string(), true, offset)],
1987        )
1988        .await?;
1989        tx.commit().await?;
1990
1991        tracing::info!("updated epoch_migration table for da_proposal");
1992
1993        Ok(())
1994    }
1995
1996    async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
1997        let batch_size: i64 = 10000;
1998
1999        let mut tx = self.db.read().await?;
2000
2001        let (is_completed, mut offset) = query_as::<(bool, i64)>(
2002            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = 'vid_share'",
2003        )
2004        .fetch_one(tx.as_mut())
2005        .await?;
2006
2007        if is_completed {
2008            tracing::info!("vid_share migration already done");
2009            return Ok(());
2010        }
2011
2012        tracing::warn!("migrating vid shares..");
2013        loop {
2014            let mut tx = self.db.read().await?;
2015            let rows = query(
2016                "SELECT payload_hash, data FROM vid_share WHERE view >= $1 ORDER BY view LIMIT $2",
2017            )
2018            .bind(offset)
2019            .bind(batch_size)
2020            .fetch_all(tx.as_mut())
2021            .await?;
2022
2023            drop(tx);
2024            if rows.is_empty() {
2025                break;
2026            }
2027            let mut values = Vec::new();
2028
2029            for row in rows.iter() {
2030                let data: Vec<u8> = row.try_get("data")?;
2031                let payload_hash: String = row.try_get("payload_hash")?;
2032
2033                let vid_share: Proposal<SeqTypes, VidDisperseShare0<SeqTypes>> =
2034                    bincode::deserialize(&data)?;
2035                let vid_share2: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
2036                    convert_proposal(vid_share);
2037
2038                let view = vid_share2.data.view_number().u64() as i64;
2039                let data = bincode::serialize(&vid_share2)?;
2040
2041                values.push((view, payload_hash, data));
2042            }
2043
2044            let mut query_builder: sqlx::QueryBuilder<Db> =
2045                sqlx::QueryBuilder::new("INSERT INTO vid_share2 (view, payload_hash, data) ");
2046
2047            offset = values.last().context("last row")?.0;
2048
2049            query_builder.push_values(values, |mut b, (view, payload_hash, data)| {
2050                b.push_bind(view).push_bind(payload_hash).push_bind(data);
2051            });
2052
2053            let query = query_builder.build();
2054
2055            let mut tx = self.db.write().await?;
2056            query.execute(tx.as_mut()).await?;
2057
2058            tx.upsert(
2059                "epoch_migration",
2060                ["table_name", "completed", "migrated_rows"],
2061                ["table_name"],
2062                [("vid_share".to_string(), false, offset)],
2063            )
2064            .await?;
2065            tx.commit().await?;
2066
2067            tracing::info!(
2068                "VID shares migration progress: rows={} offset={}",
2069                rows.len(),
2070                offset
2071            );
2072            if rows.len() < batch_size as usize {
2073                break;
2074            }
2075        }
2076
2077        tracing::warn!("migrated vid shares");
2078
2079        let mut tx = self.db.write().await?;
2080        tx.upsert(
2081            "epoch_migration",
2082            ["table_name", "completed", "migrated_rows"],
2083            ["table_name"],
2084            [("vid_share".to_string(), true, offset)],
2085        )
2086        .await?;
2087        tx.commit().await?;
2088
2089        tracing::info!("updated epoch_migration table for vid_share");
2090
2091        Ok(())
2092    }
2093
2094    async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
2095        let batch_size: i64 = 10000;
2096        let mut tx = self.db.read().await?;
2097
2098        let (is_completed, mut offset) = query_as::<(bool, i64)>(
2099            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
2100             'quorum_proposals'",
2101        )
2102        .fetch_one(tx.as_mut())
2103        .await?;
2104
2105        if is_completed {
2106            tracing::info!("quorum proposals migration already done");
2107            return Ok(());
2108        }
2109
2110        tracing::warn!("migrating quorum proposals..");
2111
2112        loop {
2113            let mut tx = self.db.read().await?;
2114            let rows = query(
2115                "SELECT view, leaf_hash, data FROM quorum_proposals WHERE view >= $1 ORDER BY \
2116                 view LIMIT $2",
2117            )
2118            .bind(offset)
2119            .bind(batch_size)
2120            .fetch_all(tx.as_mut())
2121            .await?;
2122
2123            drop(tx);
2124
2125            if rows.is_empty() {
2126                break;
2127            }
2128
2129            let mut values = Vec::new();
2130
2131            for row in rows.iter() {
2132                let leaf_hash: String = row.try_get("leaf_hash")?;
2133                let data: Vec<u8> = row.try_get("data")?;
2134
2135                let quorum_proposal: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
2136                    bincode::deserialize(&data)?;
2137                let quorum_proposal2: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
2138                    convert_proposal(quorum_proposal);
2139
2140                let view = quorum_proposal2.data.view_number().u64() as i64;
2141                let data = bincode::serialize(&quorum_proposal2)?;
2142
2143                values.push((view, leaf_hash, data));
2144            }
2145
2146            let mut query_builder: sqlx::QueryBuilder<Db> =
2147                sqlx::QueryBuilder::new("INSERT INTO quorum_proposals2 (view, leaf_hash, data) ");
2148
2149            offset = values.last().context("last row")?.0;
2150            query_builder.push_values(values, |mut b, (view, leaf_hash, data)| {
2151                b.push_bind(view).push_bind(leaf_hash).push_bind(data);
2152            });
2153
2154            query_builder.push(" ON CONFLICT DO NOTHING");
2155
2156            let query = query_builder.build();
2157
2158            let mut tx = self.db.write().await?;
2159            query.execute(tx.as_mut()).await?;
2160
2161            tx.upsert(
2162                "epoch_migration",
2163                ["table_name", "completed", "migrated_rows"],
2164                ["table_name"],
2165                [("quorum_proposals".to_string(), false, offset)],
2166            )
2167            .await?;
2168            tx.commit().await?;
2169
2170            tracing::info!(
2171                "quorum proposals migration progress: rows={} offset={}",
2172                rows.len(),
2173                offset
2174            );
2175
2176            if rows.len() < batch_size as usize {
2177                break;
2178            }
2179        }
2180
2181        tracing::warn!("migrated quorum proposals");
2182
2183        let mut tx = self.db.write().await?;
2184        tx.upsert(
2185            "epoch_migration",
2186            ["table_name", "completed", "migrated_rows"],
2187            ["table_name"],
2188            [("quorum_proposals".to_string(), true, offset)],
2189        )
2190        .await?;
2191        tx.commit().await?;
2192
2193        tracing::info!("updated epoch_migration table for quorum_proposals");
2194
2195        Ok(())
2196    }
2197
2198    async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
2199        let batch_size: i64 = 10000;
2200        let mut tx = self.db.read().await?;
2201
2202        let (is_completed, mut offset) = query_as::<(bool, i64)>(
2203            "SELECT completed, migrated_rows from epoch_migration WHERE table_name = \
2204             'quorum_certificate'",
2205        )
2206        .fetch_one(tx.as_mut())
2207        .await?;
2208
2209        if is_completed {
2210            tracing::info!("quorum certificates migration already done");
2211            return Ok(());
2212        }
2213
2214        tracing::warn!("migrating quorum certificates..");
2215        loop {
2216            let mut tx = self.db.read().await?;
2217            let rows = query(
2218                "SELECT view, leaf_hash, data FROM quorum_certificate WHERE view >= $1 ORDER BY \
2219                 view LIMIT $2",
2220            )
2221            .bind(offset)
2222            .bind(batch_size)
2223            .fetch_all(tx.as_mut())
2224            .await?;
2225
2226            drop(tx);
2227            if rows.is_empty() {
2228                break;
2229            }
2230            let mut values = Vec::new();
2231
2232            for row in rows.iter() {
2233                let leaf_hash: String = row.try_get("leaf_hash")?;
2234                let data: Vec<u8> = row.try_get("data")?;
2235
2236                let qc: QuorumCertificate<SeqTypes> = bincode::deserialize(&data)?;
2237                let qc2: QuorumCertificate2<SeqTypes> = qc.to_qc2();
2238
2239                let view = qc2.view_number().u64() as i64;
2240                let data = bincode::serialize(&qc2)?;
2241
2242                values.push((view, leaf_hash, data));
2243            }
2244
2245            let mut query_builder: sqlx::QueryBuilder<Db> =
2246                sqlx::QueryBuilder::new("INSERT INTO quorum_certificate2 (view, leaf_hash, data) ");
2247
2248            offset = values.last().context("last row")?.0;
2249
2250            query_builder.push_values(values, |mut b, (view, leaf_hash, data)| {
2251                b.push_bind(view).push_bind(leaf_hash).push_bind(data);
2252            });
2253
2254            query_builder.push(" ON CONFLICT DO NOTHING");
2255            let query = query_builder.build();
2256
2257            let mut tx = self.db.write().await?;
2258            query.execute(tx.as_mut()).await?;
2259
2260            tx.upsert(
2261                "epoch_migration",
2262                ["table_name", "completed", "migrated_rows"],
2263                ["table_name"],
2264                [("quorum_certificate".to_string(), false, offset)],
2265            )
2266            .await?;
2267            tx.commit().await?;
2268
2269            tracing::info!(
2270                "Quorum certificates migration progress: rows={} offset={}",
2271                rows.len(),
2272                offset
2273            );
2274
2275            if rows.len() < batch_size as usize {
2276                break;
2277            }
2278        }
2279
2280        tracing::warn!("migrated quorum certificates");
2281
2282        let mut tx = self.db.write().await?;
2283        tx.upsert(
2284            "epoch_migration",
2285            ["table_name", "completed", "migrated_rows"],
2286            ["table_name"],
2287            [("quorum_certificate".to_string(), true, offset)],
2288        )
2289        .await?;
2290        tx.commit().await?;
2291        tracing::info!("updated epoch_migration table for quorum_certificate");
2292
2293        Ok(())
2294    }
2295
2296    /// Migrate stake table data to include x25519_key and p2p_addr fields.
2297    ///
2298    /// Data written before x25519 support lacks these fields. This migration
2299    /// deserializes legacy records and re-serializes them with the new fields set to None.
2300    async fn migrate_x25519_keys(&self) -> anyhow::Result<()> {
2301        use super::RegisteredValidatorNoX25519;
2302
2303        let name = DataMigration::X25519Keys.as_str();
2304
2305        // Migrate bincode storage (epoch_drb_and_root.stake).
2306        if !self
2307            .is_migration_complete(name, "epoch_drb_and_root")
2308            .await?
2309        {
2310            let rows: Vec<(i64, Vec<u8>)> = {
2311                let mut tx = self.db.read().await?;
2312                query_as("SELECT epoch, stake FROM epoch_drb_and_root WHERE stake IS NOT NULL")
2313                    .fetch_all(tx.as_mut())
2314                    .await?
2315            };
2316
2317            let num_rows = rows.len();
2318            let mut tx = self.db.write().await?;
2319            for (epoch, stake_bytes) in rows {
2320                // Try current format first
2321                if bincode::deserialize::<AuthenticatedValidatorMap>(&stake_bytes).is_ok() {
2322                    continue;
2323                }
2324
2325                // Legacy format without x25519 fields
2326                let old_validators: IndexMap<Address, RegisteredValidatorNoX25519> =
2327                    bincode::deserialize(&stake_bytes)
2328                        .context("deserializing legacy stake table")?;
2329                let validators: AuthenticatedValidatorMap = old_validators
2330                    .into_iter()
2331                    .map(|(addr, v)| {
2332                        let registered = v.migrate();
2333                        (
2334                            addr,
2335                            AuthenticatedValidator::try_from(registered)
2336                                .expect("stake tables only contain authenticated validators"),
2337                        )
2338                    })
2339                    .collect();
2340
2341                let new_bytes =
2342                    bincode::serialize(&validators).context("serializing stake table")?;
2343
2344                tracing::debug!(epoch, "migrating x25519 keys in stake table");
2345                tx.execute(
2346                    query("UPDATE epoch_drb_and_root SET stake = $1 WHERE epoch = $2")
2347                        .bind(&new_bytes)
2348                        .bind(epoch),
2349                )
2350                .await?;
2351            }
2352            Self::mark_migration_complete(&mut tx, name, "epoch_drb_and_root", num_rows).await?;
2353            tx.commit().await?;
2354            tracing::info!(
2355                num_rows,
2356                "x25519_keys migration completed for epoch_drb_and_root"
2357            );
2358        }
2359
2360        // Migrate JSONB storage (stake_table_validators).
2361        if !self
2362            .is_migration_complete(name, "stake_table_validators")
2363            .await?
2364        {
2365            let rows: Vec<(i64, String, serde_json::Value)> = {
2366                let mut tx = self.db.read().await?;
2367                query_as("SELECT epoch, address, validator FROM stake_table_validators")
2368                    .fetch_all(tx.as_mut())
2369                    .await?
2370            };
2371
2372            let num_rows = rows.len();
2373            let mut tx = self.db.write().await?;
2374            for (epoch, address, validator_json) in rows {
2375                // Check if JSON already has x25519 fields (can't rely on deserialization
2376                // since serde_json fills missing Option<T> fields with None).
2377                if validator_json
2378                    .as_object()
2379                    .is_some_and(|obj| obj.contains_key("x25519_key"))
2380                {
2381                    continue;
2382                }
2383
2384                // Deserialize (serde_json fills missing Option fields with None),
2385                // then re-serialize to ensure x25519_key and p2p_addr are present.
2386                let validator: RegisteredValidator<PubKey> =
2387                    serde_json::from_value(validator_json).context("deserializing validator")?;
2388
2389                let new_json = serde_json::to_value(&validator).context("serializing validator")?;
2390
2391                tracing::debug!(epoch, %address, "migrating x25519 keys for validator");
2392                tx.execute(
2393                    query(
2394                        "UPDATE stake_table_validators SET validator = $1 WHERE epoch = $2 AND \
2395                         address = $3",
2396                    )
2397                    .bind(&new_json)
2398                    .bind(epoch)
2399                    .bind(&address),
2400                )
2401                .await?;
2402            }
2403            Self::mark_migration_complete(&mut tx, name, "stake_table_validators", num_rows)
2404                .await?;
2405            tx.commit().await?;
2406            tracing::info!(
2407                num_rows,
2408                "x25519_keys migration completed for stake_table_validators"
2409            );
2410        }
2411
2412        Ok(())
2413    }
2414
2415    async fn store_next_epoch_quorum_certificate(
2416        &self,
2417        high_qc: NextEpochQuorumCertificate2<SeqTypes>,
2418    ) -> anyhow::Result<()> {
2419        let qc2_bytes = bincode::serialize(&high_qc).context("serializing next epoch qc")?;
2420        let mut tx = self.db.write().await?;
2421        tx.upsert(
2422            "next_epoch_quorum_certificate",
2423            ["id", "data"],
2424            ["id"],
2425            [(true, qc2_bytes)],
2426        )
2427        .await?;
2428        tx.commit().await
2429    }
2430
2431    async fn load_next_epoch_quorum_certificate(
2432        &self,
2433    ) -> anyhow::Result<Option<NextEpochQuorumCertificate2<SeqTypes>>> {
2434        let result = self
2435            .db
2436            .read()
2437            .await?
2438            .fetch_optional("SELECT * FROM next_epoch_quorum_certificate where id = true")
2439            .await?;
2440
2441        result
2442            .map(|row| {
2443                let bytes: Vec<u8> = row.get("data");
2444                anyhow::Result::<_>::Ok(bincode::deserialize(&bytes)?)
2445            })
2446            .transpose()
2447    }
2448
2449    async fn store_eqc(
2450        &self,
2451        high_qc: QuorumCertificate2<SeqTypes>,
2452        next_epoch_high_qc: NextEpochQuorumCertificate2<SeqTypes>,
2453    ) -> anyhow::Result<()> {
2454        let eqc_bytes =
2455            bincode::serialize(&(high_qc, next_epoch_high_qc)).context("serializing eqc")?;
2456        let mut tx = self.db.write().await?;
2457        tx.upsert("eqc", ["id", "data"], ["id"], [(true, eqc_bytes)])
2458            .await?;
2459        tx.commit().await
2460    }
2461
2462    async fn load_eqc(
2463        &self,
2464    ) -> Option<(
2465        QuorumCertificate2<SeqTypes>,
2466        NextEpochQuorumCertificate2<SeqTypes>,
2467    )> {
2468        let result = self
2469            .db
2470            .read()
2471            .await
2472            .ok()?
2473            .fetch_optional("SELECT * FROM eqc where id = true")
2474            .await
2475            .ok()?;
2476
2477        result
2478            .map(|row| {
2479                let bytes: Vec<u8> = row.get("data");
2480                bincode::deserialize(&bytes)
2481            })
2482            .transpose()
2483            .ok()?
2484    }
2485
2486    async fn append_da2(
2487        &self,
2488        proposal: &Proposal<SeqTypes, DaProposal2<SeqTypes>>,
2489        vid_commit: VidCommitment,
2490    ) -> anyhow::Result<()> {
2491        let data = &proposal.data;
2492        let view = data.view_number().u64();
2493        let data_bytes = bincode::serialize(proposal).unwrap();
2494
2495        let now = Instant::now();
2496        let mut tx = self.db.write().await?;
2497        tx.upsert(
2498            "da_proposal2",
2499            ["view", "data", "payload_hash"],
2500            ["view"],
2501            [(view as i64, data_bytes, vid_commit.to_string())],
2502        )
2503        .await?;
2504        let res = tx.commit().await;
2505        self.internal_metrics
2506            .internal_append_da2_duration
2507            .add_point(now.elapsed().as_secs_f64());
2508        res
2509    }
2510
2511    async fn store_drb_result(
2512        &self,
2513        epoch: EpochNumber,
2514        drb_result: DrbResult,
2515    ) -> anyhow::Result<()> {
2516        let drb_result_vec = Vec::from(drb_result);
2517        let mut tx = self.db.write().await?;
2518        tx.upsert(
2519            "epoch_drb_and_root",
2520            ["epoch", "drb_result"],
2521            ["epoch"],
2522            [(epoch.u64() as i64, drb_result_vec)],
2523        )
2524        .await?;
2525        tx.commit().await
2526    }
2527
2528    async fn store_epoch_root(
2529        &self,
2530        epoch: EpochNumber,
2531        block_header: <SeqTypes as NodeType>::BlockHeader,
2532    ) -> anyhow::Result<()> {
2533        let block_header_bytes =
2534            bincode::serialize(&block_header).context("serializing block header")?;
2535
2536        let mut tx = self.db.write().await?;
2537        tx.upsert(
2538            "epoch_drb_and_root",
2539            ["epoch", "block_header"],
2540            ["epoch"],
2541            [(epoch.u64() as i64, block_header_bytes)],
2542        )
2543        .await?;
2544        tx.commit().await
2545    }
2546
2547    async fn store_drb_input(&self, drb_input: DrbInput) -> anyhow::Result<()> {
2548        if let Ok(loaded_drb_input) = self.load_drb_input(drb_input.epoch).await {
2549            if loaded_drb_input.difficulty_level != drb_input.difficulty_level {
2550                tracing::error!("Overwriting {loaded_drb_input:?} in storage with {drb_input:?}");
2551            } else if loaded_drb_input.iteration >= drb_input.iteration {
2552                anyhow::bail!(
2553                    "DrbInput in storage {:?} is more recent than {:?}, refusing to update",
2554                    loaded_drb_input,
2555                    drb_input
2556                )
2557            }
2558        }
2559
2560        let drb_input_bytes = bincode::serialize(&drb_input)
2561            .context("Failed to serialize DrbInput. This is not fatal, but should never happen.")?;
2562
2563        let mut tx = self.db.write().await?;
2564
2565        tx.upsert(
2566            "drb",
2567            ["epoch", "drb_input"],
2568            ["epoch"],
2569            [(drb_input.epoch as i64, drb_input_bytes)],
2570        )
2571        .await?;
2572        tx.commit().await
2573    }
2574
2575    async fn load_drb_input(&self, epoch: u64) -> anyhow::Result<DrbInput> {
2576        let row = self
2577            .db
2578            .read()
2579            .await?
2580            .fetch_optional(query("SELECT drb_input FROM drb WHERE epoch = $1").bind(epoch as i64))
2581            .await?;
2582
2583        match row {
2584            None => anyhow::bail!("No DrbInput for epoch {} in storage", epoch),
2585            Some(row) => {
2586                let drb_input_bytes: Vec<u8> = row.try_get("drb_input")?;
2587                let drb_input = bincode::deserialize(&drb_input_bytes)
2588                    .context("Failed to deserialize drb_input from storage")?;
2589
2590                Ok(drb_input)
2591            },
2592        }
2593    }
2594
2595    async fn add_state_cert(
2596        &self,
2597        state_cert: LightClientStateUpdateCertificateV2<SeqTypes>,
2598    ) -> anyhow::Result<()> {
2599        let state_cert_bytes = bincode::serialize(&state_cert)
2600            .context("serializing light client state update certificate")?;
2601
2602        let mut tx = self.db.write().await?;
2603        tx.upsert(
2604            "state_cert",
2605            ["view", "state_cert"],
2606            ["view"],
2607            [(
2608                state_cert.light_client_state.view_number as i64,
2609                state_cert_bytes,
2610            )],
2611        )
2612        .await?;
2613        tx.commit().await
2614    }
2615
2616    async fn load_state_cert(
2617        &self,
2618    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
2619        let Some(row) = self
2620            .db
2621            .read()
2622            .await?
2623            .fetch_optional(
2624                "SELECT state_cert FROM finalized_state_cert ORDER BY epoch DESC LIMIT 1",
2625            )
2626            .await?
2627        else {
2628            return Ok(None);
2629        };
2630        let bytes: Vec<u8> = row.get("state_cert");
2631
2632        let cert = match bincode::deserialize(&bytes) {
2633            Ok(cert) => cert,
2634            Err(err) => {
2635                tracing::info!(
2636                    error = %err,
2637                    "Failed to deserialize state certificate with v2. attempting with v1"
2638                );
2639
2640                let v1_cert =
2641                    bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
2642                        .with_context(|| {
2643                        format!("Failed to deserialize using both v1 and v2. error: {err}")
2644                    })?;
2645
2646                v1_cert.into()
2647            },
2648        };
2649
2650        Ok(Some(cert))
2651    }
2652
2653    async fn get_state_cert_by_epoch(
2654        &self,
2655        epoch: u64,
2656    ) -> anyhow::Result<Option<LightClientStateUpdateCertificateV2<SeqTypes>>> {
2657        let Some(row) = self
2658            .db
2659            .read()
2660            .await?
2661            .fetch_optional(
2662                query("SELECT state_cert FROM finalized_state_cert WHERE epoch = $1")
2663                    .bind(epoch as i64),
2664            )
2665            .await?
2666        else {
2667            return Ok(None);
2668        };
2669        let bytes: Vec<u8> = row.get("state_cert");
2670
2671        let cert = match bincode::deserialize(&bytes) {
2672            Ok(cert) => cert,
2673            Err(err) => {
2674                tracing::info!(
2675                    error = %err,
2676                    "Failed to deserialize state certificate with v2. attempting with v1"
2677                );
2678
2679                let v1_cert =
2680                    bincode::deserialize::<LightClientStateUpdateCertificateV1<SeqTypes>>(&bytes)
2681                        .with_context(|| {
2682                        format!("Failed to deserialize using both v1 and v2. error: {err}")
2683                    })?;
2684
2685                v1_cert.into()
2686            },
2687        };
2688
2689        Ok(Some(cert))
2690    }
2691
2692    async fn insert_state_cert(
2693        &self,
2694        epoch: u64,
2695        cert: LightClientStateUpdateCertificateV2<SeqTypes>,
2696    ) -> anyhow::Result<()> {
2697        let bytes = bincode::serialize(&cert)
2698            .with_context(|| format!("Failed to serialize state cert for epoch {epoch}"))?;
2699
2700        let mut tx = self.db.write().await?;
2701
2702        tx.upsert(
2703            "finalized_state_cert",
2704            ["epoch", "state_cert"],
2705            ["epoch"],
2706            [(epoch as i64, bytes)],
2707        )
2708        .await
2709    }
2710
2711    async fn load_start_epoch_info(&self) -> anyhow::Result<Vec<InitializerEpochInfo<SeqTypes>>> {
2712        let rows = self
2713            .db
2714            .read()
2715            .await?
2716            .fetch_all(
2717                query("SELECT * from epoch_drb_and_root ORDER BY epoch DESC LIMIT $1")
2718                    .bind(RECENT_STAKE_TABLES_LIMIT as i64),
2719            )
2720            .await?;
2721
2722        // reverse the rows vector to return the most recent epochs, but in ascending order
2723        rows.into_iter()
2724            .rev()
2725            .map(|row| {
2726                let epoch: i64 = row.try_get("epoch")?;
2727                let drb_result: Option<Vec<u8>> = row.try_get("drb_result")?;
2728                let block_header: Option<Vec<u8>> = row.try_get("block_header")?;
2729                if let Some(drb_result) = drb_result {
2730                    let drb_result_array = drb_result
2731                        .try_into()
2732                        .or_else(|_| bail!("invalid drb result"))?;
2733                    let block_header: Option<<SeqTypes as NodeType>::BlockHeader> = block_header
2734                        .map(|data| bincode::deserialize(&data))
2735                        .transpose()?;
2736                    Ok(Some(InitializerEpochInfo::<SeqTypes> {
2737                        epoch: EpochNumber::new(epoch as u64),
2738                        drb_result: drb_result_array,
2739                        block_header,
2740                    }))
2741                } else {
2742                    // Right now we skip the epoch_drb_and_root row if there is no drb result.
2743                    // This seems reasonable based on the expected order of events, but please double check!
2744                    Ok(None)
2745                }
2746            })
2747            .filter_map(|e| match e {
2748                Err(v) => Some(Err(v)),
2749                Ok(Some(v)) => Some(Ok(v)),
2750                Ok(None) => None,
2751            })
2752            .collect()
2753    }
2754
2755    fn enable_metrics(&mut self, metrics: &dyn Metrics) {
2756        self.internal_metrics = PersistenceMetricsValue::new(metrics);
2757    }
2758}
2759
2760#[async_trait]
2761impl MembershipPersistence for Persistence {
2762    async fn load_stake(&self, epoch: EpochNumber) -> anyhow::Result<Option<StakeTuple>> {
2763        let result = self
2764            .db
2765            .read()
2766            .await?
2767            .fetch_optional(
2768                query(
2769                    "SELECT stake, block_reward, stake_table_hash FROM epoch_drb_and_root WHERE \
2770                     epoch = $1",
2771                )
2772                .bind(epoch.u64() as i64),
2773            )
2774            .await?;
2775
2776        result
2777            .map(|row| {
2778                let stake_table_bytes: Vec<u8> = row.get("stake");
2779                let reward_bytes: Option<Vec<u8>> = row.get("block_reward");
2780                let stake_table_hash_bytes: Option<Vec<u8>> = row.get("stake_table_hash");
2781                let stake_table: AuthenticatedValidatorMap =
2782                    bincode::deserialize(&stake_table_bytes)
2783                        .context("deserializing stake table")?;
2784                let reward: Option<RewardAmount> = reward_bytes
2785                    .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
2786                    .transpose()?;
2787                let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes
2788                    .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
2789                    .transpose()?;
2790
2791                Ok((stake_table, reward, stake_table_hash))
2792            })
2793            .transpose()
2794    }
2795
2796    async fn load_latest_stake(&self, limit: u64) -> anyhow::Result<Option<Vec<IndexedStake>>> {
2797        let mut tx = self.db.read().await?;
2798
2799        let rows = match query_as::<(i64, Vec<u8>, Option<Vec<u8>>, Option<Vec<u8>>)>(
2800            "SELECT epoch, stake, block_reward, stake_table_hash FROM epoch_drb_and_root WHERE \
2801             stake is NOT NULL ORDER BY epoch DESC LIMIT $1",
2802        )
2803        .bind(limit as i64)
2804        .fetch_all(tx.as_mut())
2805        .await
2806        {
2807            Ok(bytes) => bytes,
2808            Err(err) => {
2809                tracing::error!("error loading stake tables: {err:#}");
2810                bail!("{err:#}");
2811            },
2812        };
2813
2814        let stakes: anyhow::Result<Vec<IndexedStake>> = rows
2815            .into_iter()
2816            .map(
2817                |(id, stake_bytes, reward_bytes_opt, stake_table_hash_bytes_opt)| {
2818                    let stake_table: AuthenticatedValidatorMap =
2819                        bincode::deserialize(&stake_bytes).context("deserializing stake table")?;
2820
2821                    let block_reward: Option<RewardAmount> = reward_bytes_opt
2822                        .map(|b| bincode::deserialize(&b).context("deserializing block_reward"))
2823                        .transpose()?;
2824
2825                    let stake_table_hash: Option<StakeTableHash> = stake_table_hash_bytes_opt
2826                        .map(|b| bincode::deserialize(&b).context("deserializing stake table hash"))
2827                        .transpose()?;
2828
2829                    Ok((
2830                        EpochNumber::new(id as u64),
2831                        (stake_table, block_reward),
2832                        stake_table_hash,
2833                    ))
2834                },
2835            )
2836            .collect();
2837
2838        Ok(Some(stakes?))
2839    }
2840
2841    async fn store_stake(
2842        &self,
2843        epoch: EpochNumber,
2844        stake: AuthenticatedValidatorMap,
2845        block_reward: Option<RewardAmount>,
2846        stake_table_hash: Option<StakeTableHash>,
2847    ) -> anyhow::Result<()> {
2848        let mut tx = self.db.write().await?;
2849
2850        let stake_table_bytes = bincode::serialize(&stake).context("serializing stake table")?;
2851        let reward_bytes = block_reward
2852            .map(|r| bincode::serialize(&r).context("serializing block reward"))
2853            .transpose()?;
2854        let stake_table_hash_bytes = stake_table_hash
2855            .map(|h| bincode::serialize(&h).context("serializing stake table hash"))
2856            .transpose()?;
2857        tx.upsert(
2858            "epoch_drb_and_root",
2859            ["epoch", "stake", "block_reward", "stake_table_hash"],
2860            ["epoch"],
2861            [(
2862                epoch.u64() as i64,
2863                stake_table_bytes,
2864                reward_bytes,
2865                stake_table_hash_bytes,
2866            )],
2867        )
2868        .await?;
2869        tx.commit().await
2870    }
2871
2872    async fn store_events(
2873        &self,
2874        l1_finalized: u64,
2875        events: Vec<(EventKey, StakeTableEvent)>,
2876    ) -> anyhow::Result<()> {
2877        let mut tx = self.db.write().await?;
2878
2879        // check last l1 block if there is any
2880        let last_processed_l1_block = query_as::<(i64,)>(
2881            "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
2882        )
2883        .fetch_optional(tx.as_mut())
2884        .await?
2885        .map(|(l1,)| l1);
2886
2887        tracing::debug!("last l1 finalizes in database = {last_processed_l1_block:?}");
2888
2889        // skip events storage if the database already has higher l1 block events
2890        if last_processed_l1_block > Some(l1_finalized.try_into()?) {
2891            tracing::debug!(
2892                ?last_processed_l1_block,
2893                ?l1_finalized,
2894                ?events,
2895                "last l1 finalized stored is already higher"
2896            );
2897            return Ok(());
2898        }
2899
2900        if !events.is_empty() {
2901            let mut query_builder: sqlx::QueryBuilder<Db> = sqlx::QueryBuilder::new(
2902                "INSERT INTO stake_table_events (l1_block, log_index, event) ",
2903            );
2904
2905            let events = events
2906                .into_iter()
2907                .map(|((block_number, index), event)| {
2908                    Ok((
2909                        i64::try_from(block_number)?,
2910                        i64::try_from(index)?,
2911                        serde_json::to_value(event).context("l1 event to value")?,
2912                    ))
2913                })
2914                .collect::<anyhow::Result<Vec<_>>>()?;
2915
2916            query_builder.push_values(events, |mut b, (l1_block, log_index, event)| {
2917                b.push_bind(l1_block).push_bind(log_index).push_bind(event);
2918            });
2919
2920            query_builder.push(" ON CONFLICT DO NOTHING");
2921            let query = query_builder.build();
2922
2923            query.execute(tx.as_mut()).await?;
2924        }
2925
2926        // update l1 block
2927        tx.upsert(
2928            "stake_table_events_l1_block",
2929            ["id", "last_l1_block"],
2930            ["id"],
2931            [(0_i32, l1_finalized as i64)],
2932        )
2933        .await?;
2934
2935        tx.commit().await?;
2936
2937        Ok(())
2938    }
2939
2940    /// Loads all events from persistent storage up to the specified L1 block.
2941    ///
2942    /// # Returns
2943    ///
2944    /// Returns a tuple containing:
2945    /// - `Option<u64>` - The queried L1 block for which all events have been successfully fetched.
2946    /// - `Vec<(EventKey, StakeTableEvent)>` - A list of events, where each entry is a tuple of the event key
2947    /// event key is (l1 block number, log index)
2948    ///   and the corresponding StakeTable event.
2949    ///
2950    async fn load_events(
2951        &self,
2952        from_l1_block: u64,
2953        to_l1_block: u64,
2954    ) -> anyhow::Result<(
2955        Option<EventsPersistenceRead>,
2956        Vec<(EventKey, StakeTableEvent)>,
2957    )> {
2958        let mut tx = self.db.read().await?;
2959
2960        // check last l1 block if there is any
2961        let res = query_as::<(i64,)>(
2962            "SELECT last_l1_block FROM stake_table_events_l1_block where id = 0",
2963        )
2964        .fetch_optional(tx.as_mut())
2965        .await?;
2966
2967        let Some((last_processed_l1_block,)) = res else {
2968            // this just means we dont have any events stored
2969            return Ok((None, Vec::new()));
2970        };
2971
2972        // Determine the L1 block for querying events.
2973        // If the last stored L1 block is greater than the requested block, limit the query to the requested block.
2974        // Otherwise, query up to the last stored block.
2975        let to_l1_block = to_l1_block.try_into()?;
2976        let query_l1_block = if last_processed_l1_block > to_l1_block {
2977            to_l1_block
2978        } else {
2979            last_processed_l1_block
2980        };
2981
2982        let rows = query(
2983            "SELECT l1_block, log_index, event FROM stake_table_events WHERE $1 <= l1_block AND \
2984             l1_block <= $2 ORDER BY l1_block ASC, log_index ASC",
2985        )
2986        .bind(i64::try_from(from_l1_block)?)
2987        .bind(query_l1_block)
2988        .fetch_all(tx.as_mut())
2989        .await?;
2990
2991        let events = rows
2992            .into_iter()
2993            .map(|row| {
2994                let l1_block: i64 = row.try_get("l1_block")?;
2995                let log_index: i64 = row.try_get("log_index")?;
2996                let event = serde_json::from_value(row.try_get("event")?)?;
2997
2998                Ok(((l1_block.try_into()?, log_index.try_into()?), event))
2999            })
3000            .collect::<anyhow::Result<Vec<_>>>()?;
3001
3002        // Determine the read state based on the queried block range.
3003        // - If the persistence returned events up to the requested block, the read is complete.
3004        // - Otherwise, indicate that the read is up to the last processed block.
3005        if query_l1_block == to_l1_block {
3006            Ok((Some(EventsPersistenceRead::Complete), events))
3007        } else {
3008            Ok((
3009                Some(EventsPersistenceRead::UntilL1Block(
3010                    query_l1_block.try_into()?,
3011                )),
3012                events,
3013            ))
3014        }
3015    }
3016
3017    async fn delete_stake_tables(&self) -> anyhow::Result<()> {
3018        let mut tx = self.db.write().await?;
3019        #[cfg(not(feature = "embedded-db"))]
3020        query(
3021            "TRUNCATE stake_table_events, stake_table_events_l1_block, epoch_drb_and_root, \
3022             stake_table_validators",
3023        )
3024        .execute(tx.as_mut())
3025        .await?;
3026        #[cfg(feature = "embedded-db")]
3027        {
3028            query("DELETE FROM stake_table_events")
3029                .execute(tx.as_mut())
3030                .await?;
3031            query("DELETE FROM stake_table_events_l1_block")
3032                .execute(tx.as_mut())
3033                .await?;
3034            query("DELETE FROM epoch_drb_and_root")
3035                .execute(tx.as_mut())
3036                .await?;
3037            query("DELETE FROM stake_table_validators")
3038                .execute(tx.as_mut())
3039                .await?;
3040        }
3041        tx.commit().await?;
3042        Ok(())
3043    }
3044
3045    async fn store_all_validators(
3046        &self,
3047        epoch: EpochNumber,
3048        all_validators: RegisteredValidatorMap,
3049    ) -> anyhow::Result<()> {
3050        let mut tx = self.db.write().await?;
3051
3052        if all_validators.is_empty() {
3053            return Ok(());
3054        }
3055
3056        let mut query_builder =
3057            QueryBuilder::new("INSERT INTO stake_table_validators (epoch, address, validator) ");
3058
3059        query_builder.push_values(all_validators, |mut b, (address, validator)| {
3060            let validator_json =
3061                serde_json::to_value(&validator).expect("cannot serialize validator to json");
3062            b.push_bind(epoch.u64() as i64)
3063                .push_bind(address.to_string())
3064                .push_bind(validator_json);
3065        });
3066
3067        query_builder
3068            .push(" ON CONFLICT (epoch, address) DO UPDATE SET validator = EXCLUDED.validator");
3069
3070        let query = query_builder.build();
3071
3072        query.execute(tx.as_mut()).await?;
3073
3074        tx.commit().await?;
3075        Ok(())
3076    }
3077
3078    async fn load_all_validators(
3079        &self,
3080        epoch: EpochNumber,
3081        offset: u64,
3082        limit: u64,
3083    ) -> anyhow::Result<Vec<RegisteredValidator<PubKey>>> {
3084        let mut tx = self.db.read().await?;
3085
3086        // Use LOWER(address) in ORDER BY to ensure consistent ordering for SQlite and Postgres.
3087        // Postgres sorts text case sensitively by default, while SQLite sorts case insensitively.
3088        // Applying LOWER() makes the result consistent.
3089        let rows = query(
3090            "SELECT address, validator
3091         FROM stake_table_validators
3092         WHERE epoch = $1
3093         ORDER BY LOWER(address) ASC
3094         LIMIT $2 OFFSET $3",
3095        )
3096        .bind(epoch.u64() as i64)
3097        .bind(limit as i64)
3098        .bind(offset as i64)
3099        .fetch_all(tx.as_mut())
3100        .await?;
3101        rows.into_iter()
3102            .map(|row| {
3103                let validator_json: serde_json::Value = row.try_get("validator")?;
3104                serde_json::from_value::<RegisteredValidator<PubKey>>(validator_json)
3105                    .map_err(Into::into)
3106            })
3107            .collect()
3108    }
3109}
3110
3111#[async_trait]
3112impl DhtPersistentStorage for Persistence {
3113    /// Save the DHT to the database
3114    ///
3115    /// # Errors
3116    /// - If we fail to serialize the records
3117    /// - If we fail to write the serialized records to the DB
3118    async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
3119        // Bincode-serialize the records
3120        let to_save =
3121            bincode::serialize(&records).with_context(|| "failed to serialize records")?;
3122
3123        // Prepare the statement
3124        let stmt = "INSERT INTO libp2p_dht (id, serialized_records) VALUES (0, $1) ON CONFLICT \
3125                    (id) DO UPDATE SET serialized_records = $1";
3126
3127        // Execute the query
3128        let mut tx = self
3129            .db
3130            .write()
3131            .await
3132            .with_context(|| "failed to start an atomic DB transaction")?;
3133        tx.execute(query(stmt).bind(to_save))
3134            .await
3135            .with_context(|| "failed to execute DB query")?;
3136
3137        // Commit the state
3138        tx.commit().await.with_context(|| "failed to commit to DB")
3139    }
3140
3141    /// Load the DHT from the database
3142    ///
3143    /// # Errors
3144    /// - If we fail to read from the DB
3145    /// - If we fail to deserialize the records
3146    async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
3147        // Fetch the results from the DB
3148        let result = self
3149            .db
3150            .read()
3151            .await
3152            .with_context(|| "failed to start a DB read transaction")?
3153            .fetch_one("SELECT * FROM libp2p_dht where id = 0")
3154            .await
3155            .with_context(|| "failed to fetch from DB")?;
3156
3157        // Get the `serialized_records` row
3158        let serialied_records: Vec<u8> = result.get("serialized_records");
3159
3160        // Deserialize it
3161        let records: Vec<SerializableRecord> = bincode::deserialize(&serialied_records)
3162            .with_context(|| "Failed to deserialize records")?;
3163
3164        Ok(records)
3165    }
3166}
3167
3168#[async_trait]
3169impl Provider<SeqTypes, VidCommonRequest> for Persistence {
3170    #[tracing::instrument(skip(self))]
3171    async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
3172        let mut tx = match self.db.read().await {
3173            Ok(tx) => tx,
3174            Err(err) => {
3175                tracing::warn!("could not open transaction: {err:#}");
3176                return None;
3177            },
3178        };
3179
3180        let bytes = match query_as::<(Vec<u8>,)>(
3181            "SELECT data FROM vid_share2 WHERE payload_hash = $1 LIMIT 1",
3182        )
3183        .bind(req.0.to_string())
3184        .fetch_optional(tx.as_mut())
3185        .await
3186        {
3187            Ok(Some((bytes,))) => bytes,
3188            Ok(None) => return None,
3189            Err(err) => {
3190                tracing::error!("error loading VID share: {err:#}");
3191                return None;
3192            },
3193        };
3194
3195        let share: Proposal<SeqTypes, VidDisperseShare<SeqTypes>> =
3196            match bincode::deserialize(&bytes) {
3197                Ok(share) => share,
3198                Err(err) => {
3199                    tracing::warn!("error decoding VID share: {err:#}");
3200                    return None;
3201                },
3202            };
3203
3204        match share.data {
3205            VidDisperseShare::V0(vid) => Some(VidCommon::V0(vid.common)),
3206            VidDisperseShare::V1(vid) => Some(VidCommon::V1(vid.common)),
3207            VidDisperseShare::V2(vid) => Some(VidCommon::V2(vid.common)),
3208        }
3209    }
3210}
3211
3212#[async_trait]
3213impl Provider<SeqTypes, PayloadRequest> for Persistence {
3214    #[tracing::instrument(skip(self))]
3215    async fn fetch(&self, req: PayloadRequest) -> Option<Payload> {
3216        let mut tx = match self.db.read().await {
3217            Ok(tx) => tx,
3218            Err(err) => {
3219                tracing::warn!("could not open transaction: {err:#}");
3220                return None;
3221            },
3222        };
3223
3224        let bytes = match query_as::<(Vec<u8>,)>(
3225            "SELECT data FROM da_proposal2 WHERE payload_hash = $1 LIMIT 1",
3226        )
3227        .bind(req.0.to_string())
3228        .fetch_optional(tx.as_mut())
3229        .await
3230        {
3231            Ok(Some((bytes,))) => bytes,
3232            Ok(None) => return None,
3233            Err(err) => {
3234                tracing::warn!("error loading DA proposal: {err:#}");
3235                return None;
3236            },
3237        };
3238
3239        let proposal: Proposal<SeqTypes, DaProposal2<SeqTypes>> = match bincode::deserialize(&bytes)
3240        {
3241            Ok(proposal) => proposal,
3242            Err(err) => {
3243                tracing::error!("error decoding DA proposal: {err:#}");
3244                return None;
3245            },
3246        };
3247
3248        Some(Payload::from_bytes(
3249            &proposal.data.encoded_transactions,
3250            &proposal.data.metadata,
3251        ))
3252    }
3253}
3254
3255#[async_trait]
3256impl Provider<SeqTypes, LeafRequest<SeqTypes>> for Persistence {
3257    #[tracing::instrument(skip(self))]
3258    async fn fetch(&self, req: LeafRequest<SeqTypes>) -> Option<LeafQueryData<SeqTypes>> {
3259        let mut tx = match self.db.read().await {
3260            Ok(tx) => tx,
3261            Err(err) => {
3262                tracing::warn!("could not open transaction: {err:#}");
3263                return None;
3264            },
3265        };
3266
3267        let (leaf, qc) = match fetch_leaf_from_proposals(&mut tx, req).await {
3268            Ok(res) => res?,
3269            Err(err) => {
3270                tracing::info!("requested leaf not found in undecided proposals: {err:#}");
3271                return None;
3272            },
3273        };
3274
3275        match LeafQueryData::new(leaf, qc) {
3276            Ok(leaf) => Some(leaf),
3277            Err(err) => {
3278                tracing::warn!("fetched invalid leaf: {err:#}");
3279                None
3280            },
3281        }
3282    }
3283}
3284
3285async fn fetch_leaf_from_proposals<Mode: TransactionMode>(
3286    tx: &mut Transaction<Mode>,
3287    req: LeafRequest<SeqTypes>,
3288) -> anyhow::Result<Option<(Leaf2, QuorumCertificate2<SeqTypes>)>> {
3289    // Look for a quorum proposal corresponding to this leaf.
3290    let Some((proposal_bytes,)) =
3291        query_as::<(Vec<u8>,)>("SELECT data FROM quorum_proposals2 WHERE leaf_hash = $1 LIMIT 1")
3292            .bind(req.expected_leaf.to_string())
3293            .fetch_optional(tx.as_mut())
3294            .await
3295            .context("fetching proposal")?
3296    else {
3297        return Ok(None);
3298    };
3299
3300    // Look for a QC corresponding to this leaf.
3301    let Some((qc_bytes,)) =
3302        query_as::<(Vec<u8>,)>("SELECT data FROM quorum_certificate2 WHERE leaf_hash = $1 LIMIT 1")
3303            .bind(req.expected_leaf.to_string())
3304            .fetch_optional(tx.as_mut())
3305            .await
3306            .context("fetching QC")?
3307    else {
3308        return Ok(None);
3309    };
3310
3311    let proposal: Proposal<SeqTypes, QuorumProposalWrapper<SeqTypes>> =
3312        bincode::deserialize(&proposal_bytes).context("deserializing quorum proposal")?;
3313    let qc: QuorumCertificate2<SeqTypes> =
3314        bincode::deserialize(&qc_bytes).context("deserializing quorum certificate")?;
3315
3316    let leaf = Leaf2::from_quorum_proposal(&proposal.data);
3317    Ok(Some((leaf, qc)))
3318}
3319
3320#[cfg(test)]
3321mod testing {
3322    use hotshot_query_service::data_source::storage::sql::testing::TmpDb;
3323
3324    use super::*;
3325    use crate::persistence::tests::TestablePersistence;
3326
3327    #[async_trait]
3328    impl TestablePersistence for Persistence {
3329        type Storage = Arc<TmpDb>;
3330
3331        async fn tmp_storage() -> Self::Storage {
3332            Arc::new(TmpDb::init().await)
3333        }
3334
3335        #[allow(refining_impl_trait)]
3336        fn options(db: &Self::Storage) -> Options {
3337            #[cfg(not(feature = "embedded-db"))]
3338            {
3339                PostgresOptions {
3340                    port: Some(db.port()),
3341                    host: Some(db.host()),
3342                    user: Some("postgres".into()),
3343                    password: Some("password".into()),
3344                    ..Default::default()
3345                }
3346                .into()
3347            }
3348
3349            #[cfg(feature = "embedded-db")]
3350            {
3351                SqliteOptions { path: db.path() }.into()
3352            }
3353        }
3354    }
3355}
3356
3357#[cfg(test)]
3358mod test {
3359    use committable::{Commitment, CommitmentBoundsArkless};
3360    use espresso_types::{Header, Leaf, NodeState, ValidatedState, traits::NullEventConsumer};
3361    use futures::stream::TryStreamExt;
3362    use hotshot_example_types::node_types::TEST_VERSIONS;
3363    use hotshot_types::{
3364        data::{
3365            EpochNumber, QuorumProposal2, ns_table::parse_ns_table,
3366            vid_disperse::AvidMDisperseShare,
3367        },
3368        message::convert_proposal,
3369        simple_certificate::QuorumCertificate,
3370        simple_vote::QuorumData,
3371        traits::{
3372            EncodeBytes,
3373            block_contents::{BlockHeader, GENESIS_VID_NUM_STORAGE_NODES},
3374            signature_key::SignatureKey,
3375        },
3376        utils::EpochTransitionIndicator,
3377        vid::{
3378            advz::advz_scheme,
3379            avidm::{AvidMScheme, init_avidm_param},
3380        },
3381    };
3382    use jf_advz::VidScheme;
3383
3384    use super::*;
3385    use crate::{BLSPubKey, PubKey, persistence::tests::TestablePersistence as _};
3386
3387    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3388    async fn test_quorum_proposals_leaf_hash_migration() {
3389        // Create some quorum proposals to test with.
3390        let leaf: Leaf2 = Leaf::genesis(
3391            &ValidatedState::default(),
3392            &NodeState::mock(),
3393            TEST_VERSIONS.test.base,
3394        )
3395        .await
3396        .into();
3397        let privkey = BLSPubKey::generated_from_seed_indexed([0; 32], 1).1;
3398        let signature = PubKey::sign(&privkey, &[]).unwrap();
3399        let mut quorum_proposal = Proposal {
3400            data: QuorumProposal2::<SeqTypes> {
3401                epoch: None,
3402                block_header: leaf.block_header().clone(),
3403                view_number: ViewNumber::genesis(),
3404                justify_qc: QuorumCertificate::genesis(
3405                    &ValidatedState::default(),
3406                    &NodeState::mock(),
3407                    TEST_VERSIONS.test,
3408                )
3409                .await
3410                .to_qc2(),
3411                upgrade_certificate: None,
3412                view_change_evidence: None,
3413                next_drb_result: None,
3414                next_epoch_justify_qc: None,
3415                state_cert: None,
3416            },
3417            signature,
3418            _pd: Default::default(),
3419        };
3420
3421        let qp1: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
3422            convert_proposal(quorum_proposal.clone());
3423
3424        quorum_proposal.data.view_number = ViewNumber::new(1);
3425
3426        let qp2: Proposal<SeqTypes, QuorumProposal<SeqTypes>> =
3427            convert_proposal(quorum_proposal.clone());
3428        let qps = [qp1, qp2];
3429
3430        // Create persistence and add the quorum proposals with NULL leaf hash.
3431        let db = Persistence::tmp_storage().await;
3432        let persistence = Persistence::connect(&db).await;
3433        let mut tx = persistence.db.write().await.unwrap();
3434        let params = qps
3435            .iter()
3436            .map(|qp| {
3437                (
3438                    qp.data.view_number.u64() as i64,
3439                    bincode::serialize(&qp).unwrap(),
3440                )
3441            })
3442            .collect::<Vec<_>>();
3443        tx.upsert("quorum_proposals", ["view", "data"], ["view"], params)
3444            .await
3445            .unwrap();
3446        tx.commit().await.unwrap();
3447
3448        // Create a new persistence and ensure the commitments get populated.
3449        let persistence = Persistence::connect(&db).await;
3450        let mut tx = persistence.db.read().await.unwrap();
3451        let rows = tx
3452            .fetch("SELECT * FROM quorum_proposals ORDER BY view ASC")
3453            .try_collect::<Vec<_>>()
3454            .await
3455            .unwrap();
3456        assert_eq!(rows.len(), qps.len());
3457        for (row, qp) in rows.into_iter().zip(qps) {
3458            assert_eq!(row.get::<i64, _>("view"), qp.data.view_number.u64() as i64);
3459            assert_eq!(
3460                row.get::<Vec<u8>, _>("data"),
3461                bincode::serialize(&qp).unwrap()
3462            );
3463            assert_eq!(
3464                row.get::<String, _>("leaf_hash"),
3465                Committable::commit(&Leaf::from_quorum_proposal(&qp.data)).to_string()
3466            );
3467        }
3468    }
3469
3470    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3471    async fn test_x25519_keys_migration() {
3472        use std::collections::HashMap;
3473
3474        use crate::persistence::RegisteredValidatorNoX25519;
3475
3476        let mut validator = RegisteredValidator::mock();
3477        validator.delegators.clear();
3478        validator.stake = alloy::primitives::U256::from(1000u64);
3479
3480        let epoch = 1i64;
3481        let address = validator.account;
3482
3483        // Create legacy data without x25519 fields
3484        let legacy = RegisteredValidatorNoX25519 {
3485            account: validator.account,
3486            stake_table_key: validator.stake_table_key,
3487            state_ver_key: validator.state_ver_key.clone(),
3488            stake: validator.stake,
3489            commission: validator.commission,
3490            delegators: HashMap::new(),
3491            authenticated: true,
3492        };
3493
3494        // Bincode: serialize as legacy map
3495        let mut legacy_map: IndexMap<Address, RegisteredValidatorNoX25519> = IndexMap::new();
3496        legacy_map.insert(address, legacy);
3497        let stake_bytes = bincode::serialize(&legacy_map).unwrap();
3498
3499        // JSON: serialize without x25519 fields
3500        let json_legacy = RegisteredValidatorNoX25519 {
3501            account: validator.account,
3502            stake_table_key: validator.stake_table_key,
3503            state_ver_key: validator.state_ver_key.clone(),
3504            stake: validator.stake,
3505            commission: validator.commission,
3506            delegators: HashMap::new(),
3507            authenticated: true,
3508        };
3509        let validator_json = serde_json::to_value(&json_legacy).unwrap();
3510
3511        let db = Persistence::tmp_storage().await;
3512        let persistence = Persistence::connect(&db).await;
3513        let mut tx = persistence.db.write().await.unwrap();
3514
3515        tx.execute(
3516            query(
3517                "INSERT INTO stake_table_validators (epoch, address, validator) VALUES ($1, $2, \
3518                 $3)",
3519            )
3520            .bind(epoch)
3521            .bind(format!("{:?}", address))
3522            .bind(&validator_json),
3523        )
3524        .await
3525        .unwrap();
3526
3527        tx.execute(
3528            query("INSERT INTO epoch_drb_and_root (epoch, stake) VALUES ($1, $2)")
3529                .bind(epoch)
3530                .bind(&stake_bytes),
3531        )
3532        .await
3533        .unwrap();
3534
3535        // Reset migration state so it runs on the newly inserted data
3536        tx.execute(query(
3537            "UPDATE data_migrations SET completed = false, migrated_rows = 0 WHERE name = \
3538             'x25519_keys'",
3539        ))
3540        .await
3541        .unwrap();
3542
3543        tx.commit().await.unwrap();
3544
3545        // Verify JSON was inserted without x25519_key
3546        {
3547            let mut tx = persistence.db.read().await.unwrap();
3548            let row: (serde_json::Value,) =
3549                query_as("SELECT validator FROM stake_table_validators WHERE epoch = $1")
3550                    .bind(epoch)
3551                    .fetch_one(tx.as_mut())
3552                    .await
3553                    .unwrap();
3554            let json_obj = row.0.as_object().unwrap();
3555            assert!(!json_obj.contains_key("x25519_key"));
3556        }
3557
3558        // Run migrations
3559        let persistence = Persistence::connect(&db).await;
3560        persistence.migrate_storage().await.unwrap();
3561
3562        // Verify stake_table_validators now has x25519_key field
3563        {
3564            let mut tx = persistence.db.read().await.unwrap();
3565            let row: (serde_json::Value,) =
3566                query_as("SELECT validator FROM stake_table_validators WHERE epoch = $1")
3567                    .bind(epoch)
3568                    .fetch_one(tx.as_mut())
3569                    .await
3570                    .unwrap();
3571            let json_obj = row.0.as_object().unwrap();
3572            assert!(json_obj.contains_key("x25519_key"));
3573            assert!(json_obj.get("x25519_key").unwrap().is_null());
3574        }
3575
3576        // Verify epoch_drb_and_root stake was migrated
3577        {
3578            let mut tx = persistence.db.read().await.unwrap();
3579            let row: (Vec<u8>,) = query_as("SELECT stake FROM epoch_drb_and_root WHERE epoch = $1")
3580                .bind(epoch)
3581                .fetch_one(tx.as_mut())
3582                .await
3583                .unwrap();
3584            let migrated_map: AuthenticatedValidatorMap = bincode::deserialize(&row.0).unwrap();
3585            assert!(migrated_map.contains_key(&address));
3586            let v = migrated_map.get(&address).unwrap();
3587            assert!(v.x25519_key.is_none());
3588        }
3589
3590        // Verify migration tracking
3591        {
3592            let mut tx = persistence.db.read().await.unwrap();
3593            let row: (bool, i64) = query_as(
3594                "SELECT completed, migrated_rows FROM data_migrations WHERE name = 'x25519_keys' \
3595                 AND table_name = 'epoch_drb_and_root'",
3596            )
3597            .fetch_one(tx.as_mut())
3598            .await
3599            .unwrap();
3600            assert!(row.0);
3601            assert_eq!(row.1, 1);
3602        }
3603    }
3604
3605    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3606    async fn test_store_all_validators_authenticated_and_unauthenticated() {
3607        use std::collections::HashMap;
3608
3609        use alloy::primitives::{Address, U256};
3610        use hotshot_types::light_client::StateVerKey;
3611        use indexmap::IndexMap;
3612
3613        let tmp = Persistence::tmp_storage().await;
3614        let storage = Persistence::connect(&tmp).await;
3615
3616        // Create an authenticated validator
3617        let authenticated_validator = RegisteredValidator {
3618            account: Address::random(),
3619            stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 0).0,
3620            state_ver_key: StateVerKey::default(),
3621            stake: U256::from(1000),
3622            commission: 100,
3623            delegators: HashMap::new(),
3624            authenticated: true,
3625            x25519_key: None,
3626            p2p_addr: None,
3627        };
3628
3629        // Create an unauthenticated validator
3630        let unauthenticated_validator = RegisteredValidator {
3631            account: Address::random(),
3632            stake_table_key: BLSPubKey::generated_from_seed_indexed([0u8; 32], 1).0,
3633            state_ver_key: StateVerKey::default(),
3634            stake: U256::from(2000),
3635            commission: 200,
3636            delegators: HashMap::new(),
3637            authenticated: false,
3638            x25519_key: None,
3639            p2p_addr: None,
3640        };
3641
3642        let mut validators: IndexMap<Address, RegisteredValidator<BLSPubKey>> = IndexMap::new();
3643        validators.insert(
3644            authenticated_validator.account,
3645            authenticated_validator.clone(),
3646        );
3647        validators.insert(
3648            unauthenticated_validator.account,
3649            unauthenticated_validator.clone(),
3650        );
3651
3652        // Store both validators
3653        storage
3654            .store_all_validators(EpochNumber::new(1), validators)
3655            .await
3656            .unwrap();
3657
3658        // Load and verify
3659        let loaded = storage
3660            .load_all_validators(EpochNumber::new(1), 0, 100)
3661            .await
3662            .unwrap();
3663        assert_eq!(loaded.len(), 2);
3664
3665        // Find each validator and verify authenticated state is preserved
3666        let loaded_auth = loaded
3667            .iter()
3668            .find(|v| v.account == authenticated_validator.account)
3669            .unwrap();
3670        assert!(
3671            loaded_auth.authenticated,
3672            "authenticated validator should remain authenticated"
3673        );
3674
3675        let loaded_unauth = loaded
3676            .iter()
3677            .find(|v| v.account == unauthenticated_validator.account)
3678            .unwrap();
3679        assert!(
3680            !loaded_unauth.authenticated,
3681            "unauthenticated validator should remain unauthenticated"
3682        );
3683    }
3684
3685    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3686    async fn test_fetching_providers() {
3687        let tmp = Persistence::tmp_storage().await;
3688        let storage = Persistence::connect(&tmp).await;
3689
3690        // Mock up some data.
3691        let leaf = Leaf2::genesis(
3692            &ValidatedState::default(),
3693            &NodeState::mock(),
3694            TEST_VERSIONS.test.base,
3695        )
3696        .await;
3697        let leaf_payload = leaf.block_payload().unwrap();
3698        let leaf_payload_bytes_arc = leaf_payload.encode();
3699
3700        let avidm_param = init_avidm_param(2).unwrap();
3701        let weights = vec![1u32; 2];
3702
3703        let ns_table = parse_ns_table(
3704            leaf_payload.byte_len().as_usize(),
3705            &leaf_payload.ns_table().encode(),
3706        );
3707        let (payload_commitment, shares) =
3708            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
3709                .unwrap();
3710        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
3711        let vid_share = convert_proposal(
3712            AvidMDisperseShare::<SeqTypes> {
3713                view_number: ViewNumber::new(0),
3714                payload_commitment,
3715                share: shares[0].clone(),
3716                recipient_key: pubkey,
3717                epoch: None,
3718                target_epoch: None,
3719                common: avidm_param.clone(),
3720            }
3721            .to_proposal(&privkey)
3722            .unwrap()
3723            .clone(),
3724        );
3725
3726        let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
3727            proposal: QuorumProposal2::<SeqTypes> {
3728                block_header: leaf.block_header().clone(),
3729                view_number: leaf.view_number(),
3730                justify_qc: leaf.justify_qc(),
3731                upgrade_certificate: None,
3732                view_change_evidence: None,
3733                next_drb_result: None,
3734                next_epoch_justify_qc: None,
3735                epoch: None,
3736                state_cert: None,
3737            },
3738        };
3739        let quorum_proposal_signature =
3740            BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3741                .expect("Failed to sign quorum proposal");
3742        let quorum_proposal = Proposal {
3743            data: quorum_proposal,
3744            signature: quorum_proposal_signature,
3745            _pd: Default::default(),
3746        };
3747
3748        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
3749            .expect("Failed to sign block payload");
3750        let da_proposal = Proposal {
3751            data: DaProposal2::<SeqTypes> {
3752                encoded_transactions: leaf_payload_bytes_arc,
3753                metadata: leaf_payload.ns_table().clone(),
3754                view_number: ViewNumber::new(0),
3755                epoch: None,
3756                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
3757            },
3758            signature: block_payload_signature,
3759            _pd: Default::default(),
3760        };
3761
3762        let mut next_quorum_proposal = quorum_proposal.clone();
3763        next_quorum_proposal.data.proposal.view_number += 1;
3764        next_quorum_proposal.data.proposal.justify_qc.view_number += 1;
3765        next_quorum_proposal
3766            .data
3767            .proposal
3768            .justify_qc
3769            .data
3770            .leaf_commit = Committable::commit(&leaf.clone());
3771        let qc = next_quorum_proposal.data.justify_qc();
3772
3773        // Add to database.
3774        storage
3775            .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
3776            .await
3777            .unwrap();
3778        storage.append_vid(&vid_share).await.unwrap();
3779        storage
3780            .append_quorum_proposal2(&quorum_proposal)
3781            .await
3782            .unwrap();
3783
3784        // Add an extra quorum proposal so we have a QC pointing back at `leaf`.
3785        storage
3786            .append_quorum_proposal2(&next_quorum_proposal)
3787            .await
3788            .unwrap();
3789
3790        // Fetch it as if we were rebuilding an archive.
3791        assert_eq!(
3792            Some(VidCommon::V1(avidm_param)),
3793            storage
3794                .fetch(VidCommonRequest(vid_share.data.payload_commitment()))
3795                .await
3796        );
3797        assert_eq!(
3798            leaf_payload,
3799            storage
3800                .fetch(PayloadRequest(vid_share.data.payload_commitment()))
3801                .await
3802                .unwrap()
3803        );
3804        assert_eq!(
3805            LeafQueryData::new(leaf.clone(), qc.clone()).unwrap(),
3806            storage
3807                .fetch(LeafRequest::new(
3808                    leaf.block_header().block_number(),
3809                    Committable::commit(&leaf),
3810                    qc.clone().commit()
3811                ))
3812                .await
3813                .unwrap()
3814        );
3815    }
3816
3817    /// Test conditions that trigger pruning.
3818    ///
3819    /// This is a configurable test that can be used to test different configurations of GC,
3820    /// `pruning_opt`. The test populates the database with some data for view 1, asserts that it is
3821    /// retained for view 2, and then asserts that it is pruned by view 3. There are various
3822    /// different configurations that can achieve this behavior, such that the data is retained and
3823    /// then pruned due to different logic and code paths.
3824    async fn test_pruning_helper(pruning_opt: ConsensusPruningOptions) {
3825        let tmp = Persistence::tmp_storage().await;
3826        let mut opt = Persistence::options(&tmp);
3827        opt.consensus_pruning = pruning_opt;
3828        let storage = opt.create().await.unwrap();
3829
3830        let data_view = ViewNumber::new(1);
3831
3832        // Populate some data.
3833        let leaf = Leaf2::genesis(
3834            &ValidatedState::default(),
3835            &NodeState::mock(),
3836            TEST_VERSIONS.test.base,
3837        )
3838        .await;
3839        let leaf_payload = leaf.block_payload().unwrap();
3840        let leaf_payload_bytes_arc = leaf_payload.encode();
3841
3842        let avidm_param = init_avidm_param(2).unwrap();
3843        let weights = vec![1u32; 2];
3844
3845        let ns_table = parse_ns_table(
3846            leaf_payload.byte_len().as_usize(),
3847            &leaf_payload.ns_table().encode(),
3848        );
3849        let (payload_commitment, shares) =
3850            AvidMScheme::ns_disperse(&avidm_param, &weights, &leaf_payload_bytes_arc, ns_table)
3851                .unwrap();
3852
3853        let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1);
3854        let vid = convert_proposal(
3855            AvidMDisperseShare::<SeqTypes> {
3856                view_number: data_view,
3857                payload_commitment,
3858                share: shares[0].clone(),
3859                recipient_key: pubkey,
3860                epoch: None,
3861                target_epoch: None,
3862                common: avidm_param,
3863            }
3864            .to_proposal(&privkey)
3865            .unwrap()
3866            .clone(),
3867        );
3868        let quorum_proposal = QuorumProposalWrapper::<SeqTypes> {
3869            proposal: QuorumProposal2::<SeqTypes> {
3870                epoch: None,
3871                block_header: leaf.block_header().clone(),
3872                view_number: data_view,
3873                justify_qc: QuorumCertificate2::genesis(
3874                    &ValidatedState::default(),
3875                    &NodeState::mock(),
3876                    TEST_VERSIONS.test,
3877                )
3878                .await,
3879                upgrade_certificate: None,
3880                view_change_evidence: None,
3881                next_drb_result: None,
3882                next_epoch_justify_qc: None,
3883                state_cert: None,
3884            },
3885        };
3886        let quorum_proposal_signature =
3887            BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
3888                .expect("Failed to sign quorum proposal");
3889        let quorum_proposal = Proposal {
3890            data: quorum_proposal,
3891            signature: quorum_proposal_signature,
3892            _pd: Default::default(),
3893        };
3894
3895        let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc)
3896            .expect("Failed to sign block payload");
3897        let da_proposal = Proposal {
3898            data: DaProposal2::<SeqTypes> {
3899                encoded_transactions: leaf_payload_bytes_arc.clone(),
3900                metadata: leaf_payload.ns_table().clone(),
3901                view_number: data_view,
3902                epoch: Some(EpochNumber::new(0)),
3903                epoch_transition_indicator: EpochTransitionIndicator::NotInTransition,
3904            },
3905            signature: block_payload_signature,
3906            _pd: Default::default(),
3907        };
3908
3909        tracing::info!(?vid, ?da_proposal, ?quorum_proposal, "append data");
3910        storage.append_vid(&vid).await.unwrap();
3911        storage
3912            .append_da2(&da_proposal, VidCommitment::V1(payload_commitment))
3913            .await
3914            .unwrap();
3915        storage
3916            .append_quorum_proposal2(&quorum_proposal)
3917            .await
3918            .unwrap();
3919
3920        // The first decide doesn't trigger any garbage collection, even though our usage exceeds
3921        // the target, because of the minimum retention.
3922        tracing::info!("decide view 1");
3923        storage
3924            .append_decided_leaves(data_view + 1, [], None, &NullEventConsumer)
3925            .await
3926            .unwrap();
3927        assert_eq!(
3928            storage.load_vid_share(data_view).await.unwrap().unwrap(),
3929            vid
3930        );
3931        assert_eq!(
3932            storage.load_da_proposal(data_view).await.unwrap().unwrap(),
3933            da_proposal
3934        );
3935        assert_eq!(
3936            storage.load_quorum_proposal(data_view).await.unwrap(),
3937            quorum_proposal
3938        );
3939
3940        // After another view, our data is beyond the minimum retention (though not the target
3941        // retention) so it gets pruned.
3942        tracing::info!("decide view 2");
3943        storage
3944            .append_decided_leaves(data_view + 2, [], None, &NullEventConsumer)
3945            .await
3946            .unwrap();
3947        assert!(storage.load_vid_share(data_view).await.unwrap().is_none(),);
3948        assert!(storage.load_da_proposal(data_view).await.unwrap().is_none());
3949        storage.load_quorum_proposal(data_view).await.unwrap_err();
3950    }
3951
3952    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3953    async fn test_pruning_minimum_retention() {
3954        test_pruning_helper(ConsensusPruningOptions {
3955            // Use a very low target usage, to show that we still retain data up to the minimum
3956            // retention even when usage is above target.
3957            target_usage: 0,
3958            minimum_retention: 1,
3959            // Use a very high target retention, so that pruning is only triggered by the minimum
3960            // retention.
3961            target_retention: u64::MAX,
3962        })
3963        .await
3964    }
3965
3966    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3967    async fn test_pruning_target_retention() {
3968        test_pruning_helper(ConsensusPruningOptions {
3969            target_retention: 1,
3970            // Use a very low minimum retention, so that data is only kept around due to the target
3971            // retention.
3972            minimum_retention: 0,
3973            // Use a very high target usage, so that pruning is only triggered by the target
3974            // retention.
3975            target_usage: u64::MAX,
3976        })
3977        .await
3978    }
3979
3980    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3981    async fn test_consensus_migration() {
3982        let tmp = Persistence::tmp_storage().await;
3983        let mut opt = Persistence::options(&tmp);
3984
3985        let storage = opt.create().await.unwrap();
3986
3987        let rows = 300;
3988
3989        assert!(storage.load_state_cert().await.unwrap().is_none());
3990
3991        for i in 0..rows {
3992            let view = ViewNumber::new(i);
3993            let validated_state = ValidatedState::default();
3994            let instance_state = NodeState::default();
3995
3996            let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], i);
3997            let (payload, metadata) =
3998                Payload::from_transactions([], &validated_state, &instance_state)
3999                    .await
4000                    .unwrap();
4001
4002            let payload_bytes = payload.encode();
4003
4004            let block_header = Header::genesis(
4005                &instance_state,
4006                payload.clone(),
4007                &metadata,
4008                TEST_VERSIONS.test.base,
4009            );
4010
4011            let null_quorum_data = QuorumData {
4012                leaf_commit: Commitment::<Leaf>::default_commitment_no_preimage(),
4013            };
4014
4015            let justify_qc = QuorumCertificate::new(
4016                null_quorum_data.clone(),
4017                null_quorum_data.commit(),
4018                view,
4019                None,
4020                std::marker::PhantomData,
4021            );
4022
4023            let quorum_proposal = QuorumProposal {
4024                block_header,
4025                view_number: view,
4026                justify_qc: justify_qc.clone(),
4027                upgrade_certificate: None,
4028                proposal_certificate: None,
4029            };
4030
4031            let quorum_proposal_signature =
4032                BLSPubKey::sign(&privkey, &bincode::serialize(&quorum_proposal).unwrap())
4033                    .expect("Failed to sign quorum proposal");
4034
4035            let proposal = Proposal {
4036                data: quorum_proposal.clone(),
4037                signature: quorum_proposal_signature,
4038                _pd: std::marker::PhantomData::<SeqTypes>,
4039            };
4040
4041            let proposal_bytes = bincode::serialize(&proposal)
4042                .context("serializing proposal")
4043                .unwrap();
4044
4045            let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
4046            leaf.fill_block_payload(
4047                payload,
4048                GENESIS_VID_NUM_STORAGE_NODES,
4049                TEST_VERSIONS.test.base,
4050            )
4051            .unwrap();
4052
4053            let mut tx = storage.db.write().await.unwrap();
4054
4055            let qc_bytes = bincode::serialize(&justify_qc).unwrap();
4056            let leaf_bytes = bincode::serialize(&leaf).unwrap();
4057
4058            tx.upsert(
4059                "anchor_leaf",
4060                ["view", "leaf", "qc"],
4061                ["view"],
4062                [(i as i64, leaf_bytes, qc_bytes)],
4063            )
4064            .await
4065            .unwrap();
4066
4067            let state_cert = LightClientStateUpdateCertificateV2::<SeqTypes> {
4068                epoch: EpochNumber::new(i),
4069                light_client_state: Default::default(), // filling arbitrary value
4070                next_stake_table_state: Default::default(), // filling arbitrary value
4071                signatures: vec![],                     // filling arbitrary value
4072                auth_root: Default::default(),
4073            };
4074            // manually upsert the state cert to the finalized database
4075            let state_cert_bytes = bincode::serialize(&state_cert).unwrap();
4076            tx.upsert(
4077                "finalized_state_cert",
4078                ["epoch", "state_cert"],
4079                ["epoch"],
4080                [(i as i64, state_cert_bytes)],
4081            )
4082            .await
4083            .unwrap();
4084
4085            tx.commit().await.unwrap();
4086
4087            let disperse = advz_scheme(GENESIS_VID_NUM_STORAGE_NODES)
4088                .disperse(payload_bytes.clone())
4089                .unwrap();
4090
4091            let vid = VidDisperseShare0::<SeqTypes> {
4092                view_number: ViewNumber::new(i),
4093                payload_commitment: Default::default(),
4094                share: disperse.shares[0].clone(),
4095                common: disperse.common,
4096                recipient_key: pubkey,
4097            };
4098
4099            let (payload, metadata) =
4100                Payload::from_transactions([], &ValidatedState::default(), &NodeState::default())
4101                    .await
4102                    .unwrap();
4103
4104            let da = DaProposal::<SeqTypes> {
4105                encoded_transactions: payload.encode(),
4106                metadata,
4107                view_number: ViewNumber::new(i),
4108            };
4109
4110            let block_payload_signature =
4111                BLSPubKey::sign(&privkey, &payload_bytes).expect("Failed to sign block payload");
4112
4113            let da_proposal = Proposal {
4114                data: da,
4115                signature: block_payload_signature,
4116                _pd: Default::default(),
4117            };
4118
4119            storage
4120                .append_vid(&convert_proposal(vid.to_proposal(&privkey).unwrap()))
4121                .await
4122                .unwrap();
4123            storage
4124                .append_da(&da_proposal, VidCommitment::V0(disperse.commit))
4125                .await
4126                .unwrap();
4127
4128            let leaf_hash = Committable::commit(&leaf);
4129            let mut tx = storage.db.write().await.expect("failed to start write tx");
4130            tx.upsert(
4131                "quorum_proposals",
4132                ["view", "leaf_hash", "data"],
4133                ["view"],
4134                [(i as i64, leaf_hash.to_string(), proposal_bytes)],
4135            )
4136            .await
4137            .expect("failed to upsert quorum proposal");
4138
4139            let justify_qc = &proposal.data.justify_qc;
4140            let justify_qc_bytes = bincode::serialize(&justify_qc)
4141                .context("serializing QC")
4142                .unwrap();
4143            tx.upsert(
4144                "quorum_certificate",
4145                ["view", "leaf_hash", "data"],
4146                ["view"],
4147                [(
4148                    justify_qc.view_number.u64() as i64,
4149                    justify_qc.data.leaf_commit.to_string(),
4150                    &justify_qc_bytes,
4151                )],
4152            )
4153            .await
4154            .expect("failed to upsert qc");
4155
4156            tx.commit().await.expect("failed to commit");
4157        }
4158
4159        storage.migrate_storage().await.unwrap();
4160
4161        let mut tx = storage.db.read().await.unwrap();
4162        let (anchor_leaf2_count,) = query_as::<(i64,)>("SELECT COUNT(*) from anchor_leaf2")
4163            .fetch_one(tx.as_mut())
4164            .await
4165            .unwrap();
4166        assert_eq!(
4167            anchor_leaf2_count, rows as i64,
4168            "anchor leaf count does not match rows",
4169        );
4170
4171        let (da_proposal_count,) = query_as::<(i64,)>("SELECT COUNT(*) from da_proposal2")
4172            .fetch_one(tx.as_mut())
4173            .await
4174            .unwrap();
4175        assert_eq!(
4176            da_proposal_count, rows as i64,
4177            "da proposal count does not match rows",
4178        );
4179
4180        let (vid_share_count,) = query_as::<(i64,)>("SELECT COUNT(*) from vid_share2")
4181            .fetch_one(tx.as_mut())
4182            .await
4183            .unwrap();
4184        assert_eq!(
4185            vid_share_count, rows as i64,
4186            "vid share count does not match rows"
4187        );
4188
4189        let (quorum_proposals_count,) =
4190            query_as::<(i64,)>("SELECT COUNT(*) from quorum_proposals2")
4191                .fetch_one(tx.as_mut())
4192                .await
4193                .unwrap();
4194        assert_eq!(
4195            quorum_proposals_count, rows as i64,
4196            "quorum proposals count does not match rows",
4197        );
4198
4199        let (quorum_certificates_count,) =
4200            query_as::<(i64,)>("SELECT COUNT(*) from quorum_certificate2")
4201                .fetch_one(tx.as_mut())
4202                .await
4203                .unwrap();
4204        assert_eq!(
4205            quorum_certificates_count, rows as i64,
4206            "quorum certificates count does not match rows",
4207        );
4208
4209        let (state_cert_count,) = query_as::<(i64,)>("SELECT COUNT(*) from finalized_state_cert")
4210            .fetch_one(tx.as_mut())
4211            .await
4212            .unwrap();
4213        assert_eq!(
4214            state_cert_count, rows as i64,
4215            "Light client state update certificates count does not match rows",
4216        );
4217        assert_eq!(
4218            storage.load_state_cert().await.unwrap().unwrap(),
4219            LightClientStateUpdateCertificateV2::<SeqTypes> {
4220                epoch: EpochNumber::new(rows - 1),
4221                light_client_state: Default::default(),
4222                next_stake_table_state: Default::default(),
4223                signatures: vec![],
4224                auth_root: Default::default(),
4225            },
4226            "Wrong light client state update certificate in the storage",
4227        );
4228
4229        storage.migrate_storage().await.unwrap();
4230    }
4231
4232    /// Regression test for an ambiguous behavior in `store_events`/`load_events`.
4233    ///
4234    /// Previously, `store_events` did nothing when given an empty events list (in fact,
4235    /// `fetch_and_store_stake_table_events` was not even calling it). But this means that the
4236    /// `stake_table_events_l1_block` column does not get updated when we enter a new epoch with no
4237    /// new stake table events. This makes it impossible to distinguish between two very different
4238    /// scenarios:
4239    ///
4240    /// 1. The node has successfully processed events through the latest L1 finalized block, but
4241    ///    there are no new events from the last epoch.
4242    /// 2. The node is lagging behind the latest L1 finalized block, and is possibly missing some
4243    ///    new events.
4244    ///
4245    /// In scenario 1, clients of this node should be able to treat the empty list of stake table
4246    /// events as authoritative, and derive the stake table for the next epoch (which will end up
4247    /// being the same as the previous one. However, in scenario 2, clients need to wait, because we
4248    /// don't yet know whether there could be any events that modify the stake table. Thus,
4249    /// distinguishing these two scenarios is important.
4250    ///
4251    /// This regression test ensures that even if there are no new events, at least the
4252    /// `stake_table_events_l1_block` column gets updated. We can then distinguish the two scenarios
4253    /// using the `EventsPersistenceRead`` return value from load_events.
4254    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4255    async fn test_store_events_empty() {
4256        let tmp = Persistence::tmp_storage().await;
4257        let mut opt = Persistence::options(&tmp);
4258        let storage = opt.create().await.unwrap();
4259
4260        assert_eq!(storage.load_events(0, 100).await.unwrap(), (None, vec![]));
4261
4262        // Storing an empty events list still updates the latest L1 block.
4263        for i in 1..=2 {
4264            tracing::info!(i, "update l1 height");
4265            storage.store_events(i, vec![]).await.unwrap();
4266            assert_eq!(
4267                storage.load_events(0, 100).await.unwrap(),
4268                (Some(EventsPersistenceRead::UntilL1Block(i)), vec![])
4269            );
4270        }
4271    }
4272}
4273
4274#[cfg(test)]
4275#[cfg(not(feature = "embedded-db"))]
4276mod postgres_tests {
4277    use espresso_types::{FeeAccount, Header, Leaf, NodeState, Transaction as Tx};
4278    use hotshot_example_types::node_types::TEST_VERSIONS;
4279    use hotshot_query_service::{
4280        availability::BlockQueryData, data_source::storage::UpdateAvailabilityStorage,
4281    };
4282    use hotshot_types::{
4283        data::vid_commitment,
4284        simple_certificate::QuorumCertificate,
4285        traits::{
4286            EncodeBytes,
4287            block_contents::{BlockHeader, BuilderFee, GENESIS_VID_NUM_STORAGE_NODES},
4288            election::Membership,
4289            signature_key::BuilderSignatureKey,
4290        },
4291    };
4292
4293    use super::*;
4294    use crate::persistence::tests::TestablePersistence as _;
4295
4296    async fn test_postgres_read_ns_table(instance_state: NodeState) {
4297        instance_state
4298            .coordinator
4299            .membership()
4300            .write()
4301            .await
4302            .set_first_epoch(EpochNumber::genesis(), Default::default());
4303
4304        let tmp = Persistence::tmp_storage().await;
4305        let mut opt = Persistence::options(&tmp);
4306        let storage = opt.create().await.unwrap();
4307
4308        let txs = [
4309            Tx::new(10001u32.into(), vec![1, 2, 3]),
4310            Tx::new(10001u32.into(), vec![4, 5, 6]),
4311            Tx::new(10009u32.into(), vec![7, 8, 9]),
4312        ];
4313
4314        let validated_state = Default::default();
4315        let justify_qc =
4316            QuorumCertificate::genesis(&validated_state, &instance_state, TEST_VERSIONS.test).await;
4317        let view_number: ViewNumber = justify_qc.view_number + 1;
4318        let parent_leaf = Leaf::genesis(&validated_state, &instance_state, TEST_VERSIONS.test.base)
4319            .await
4320            .into();
4321
4322        let (payload, ns_table) =
4323            Payload::from_transactions(txs.clone(), &validated_state, &instance_state)
4324                .await
4325                .unwrap();
4326        let payload_bytes = payload.encode();
4327        let payload_commitment = vid_commitment(
4328            &payload_bytes,
4329            &ns_table.encode(),
4330            GENESIS_VID_NUM_STORAGE_NODES,
4331            instance_state.current_version,
4332        );
4333        let builder_commitment = payload.builder_commitment(&ns_table);
4334        let (fee_account, fee_key) = FeeAccount::generated_from_seed_indexed([0; 32], 0);
4335        let fee_amount = 0;
4336        let fee_signature = FeeAccount::sign_fee(&fee_key, fee_amount, &ns_table).unwrap();
4337        let block_header = Header::new(
4338            &validated_state,
4339            &instance_state,
4340            &parent_leaf,
4341            payload_commitment,
4342            builder_commitment,
4343            ns_table,
4344            BuilderFee {
4345                fee_amount,
4346                fee_account,
4347                fee_signature,
4348            },
4349            instance_state.current_version,
4350            view_number.u64(),
4351        )
4352        .await
4353        .unwrap();
4354        let proposal = QuorumProposal {
4355            block_header: block_header.clone(),
4356            view_number,
4357            justify_qc: justify_qc.clone(),
4358            upgrade_certificate: None,
4359            proposal_certificate: None,
4360        };
4361        let leaf: Leaf2 = Leaf::from_quorum_proposal(&proposal).into();
4362        let mut qc = justify_qc.to_qc2();
4363        qc.data.leaf_commit = leaf.commit();
4364        qc.view_number = view_number;
4365
4366        let mut tx = storage.db.write().await.unwrap();
4367        tx.insert_leaf(&LeafQueryData::new(leaf, qc).unwrap())
4368            .await
4369            .unwrap();
4370        tx.insert_block(&BlockQueryData::<SeqTypes>::new(block_header, payload))
4371            .await
4372            .unwrap();
4373        tx.commit().await.unwrap();
4374
4375        let mut tx = storage.db.read().await.unwrap();
4376        let rows = query(
4377            "
4378            SELECT ns_id, read_ns_id(get_ns_table(h.data), t.ns_index) AS read_ns_id
4379              FROM header AS h
4380              JOIN transactions AS t ON t.block_height = h.height
4381              ORDER BY t.ns_index, t.position
4382        ",
4383        )
4384        .fetch_all(tx.as_mut())
4385        .await
4386        .unwrap();
4387        assert_eq!(rows.len(), txs.len());
4388        for (i, row) in rows.into_iter().enumerate() {
4389            let ns = u64::from(txs[i].namespace()) as i64;
4390            assert_eq!(row.get::<i64, _>("ns_id"), ns);
4391            assert_eq!(row.get::<i64, _>("read_ns_id"), ns);
4392        }
4393    }
4394
4395    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4396    async fn test_postgres_read_ns_table_v0_1() {
4397        test_postgres_read_ns_table(NodeState::mock()).await;
4398    }
4399
4400    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4401    async fn test_postgres_read_ns_table_v0_2() {
4402        test_postgres_read_ns_table(NodeState::mock_v2()).await;
4403    }
4404
4405    #[test_log::test(tokio::test(flavor = "multi_thread"))]
4406    async fn test_postgres_read_ns_table_v0_3() {
4407        test_postgres_read_ns_table(NodeState::mock_v3().with_epoch_height(0)).await;
4408    }
4409}