Skip to main content

hotshot_query_service/data_source/storage/sql/
transaction.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! SQL transactions
14//!
15//! A transaction encapsulates all the mutable functionality provided by the SQL database, and
16//! allows for mutable operations to be combined into complex updates that affect the main database
17//! atomically. A transaction also provides all the immutable query functionality of a regular
18//! database connection, so that the updated state of the database can be queried midway through a
19//! transaction.
20
21use std::{collections::HashMap, marker::PhantomData, time::Instant};
22
23use anyhow::{Context, bail};
24use async_trait::async_trait;
25use committable::Committable;
26use derive_more::{Deref, DerefMut};
27use futures::future::Future;
28#[cfg(feature = "embedded-db")]
29use futures::stream::TryStreamExt;
30use hotshot_types::{
31    data::VidShare,
32    simple_certificate::CertificatePair,
33    traits::{
34        EncodeBytes,
35        block_contents::BlockHeader,
36        metrics::{Counter, Gauge, Histogram, Metrics},
37        node_implementation::NodeType,
38    },
39};
40use itertools::Itertools;
41use jf_merkle_tree_compat::prelude::MerkleProof;
42pub use sqlx::Executor;
43use sqlx::{Encode, Execute, FromRow, QueryBuilder, Type, pool::Pool, query_builder::Separated};
44use tracing::instrument;
45
46#[cfg(not(feature = "embedded-db"))]
47use super::queries::state::batch_insert_hashes;
48#[cfg(feature = "embedded-db")]
49use super::queries::state::build_hash_batch_insert;
50use super::{
51    Database, Db,
52    queries::{
53        self,
54        state::{Node, collect_nodes_from_proofs},
55    },
56};
57use crate::{
58    Header, Payload, QueryError, QueryResult,
59    availability::{
60        BlockQueryData, Certificate2, LeafQueryData, QueryableHeader, QueryablePayload,
61        VidCommonQueryData,
62    },
63    data_source::{
64        storage::{NodeStorage, UpdateAvailabilityStorage, pruning::PrunedHeightStorage},
65        update,
66    },
67    merklized_state::{MerklizedState, UpdateStateData},
68    types::HeightIndexed,
69};
70
71/// When set to `true`, read transactions begin with
72/// `SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY` (no `DEFERRABLE`),
73/// so they start immediately instead of waiting for a safe serializable snapshot.
74#[cfg(not(feature = "embedded-db"))]
75static NO_DEFERRABLE_ON_READ: std::sync::atomic::AtomicBool =
76    std::sync::atomic::AtomicBool::new(false);
77
78/// Configure whether read transactions on Postgres should omit `DEFERRABLE`.
79///
80/// When `true`, `Read::begin` issues `SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY`
81/// (no `DEFERRABLE`) so the transaction starts immediately rather than waiting for a safe
82/// serializable snapshot. Default: `false`. Call this once at startup based on the operator's
83/// chosen configuration.
84#[cfg(not(feature = "embedded-db"))]
85pub fn set_no_deferrable_on_read(value: bool) {
86    NO_DEFERRABLE_ON_READ.store(value, std::sync::atomic::Ordering::Relaxed);
87}
88
89pub type Query<'q> = sqlx::query::Query<'q, Db, <Db as Database>::Arguments<'q>>;
90pub type QueryAs<'q, T> = sqlx::query::QueryAs<'q, Db, T, <Db as Database>::Arguments<'q>>;
91
92pub fn query(sql: &str) -> Query<'_> {
93    sqlx::query(sql)
94}
95
96pub fn query_as<'q, T>(sql: &'q str) -> QueryAs<'q, T>
97where
98    T: for<'r> FromRow<'r, <Db as Database>::Row>,
99{
100    sqlx::query_as(sql)
101}
102
103/// Marker type indicating a transaction with read-write access to the database.
104#[derive(Clone, Copy, Debug, Default)]
105pub struct Write;
106
107/// Marker type indicating a transaction with read-only access to the database.
108#[derive(Clone, Copy, Debug, Default)]
109pub struct Read;
110
111/// Marker type indicating a transaction used for pruning deletes.
112///
113/// On Postgres this uses READ COMMITTED isolation instead of SERIALIZABLE to avoid predicate lock
114/// conflicts between pruning DELETE and consensus INSERT operations.
115#[derive(Clone, Copy, Debug, Default)]
116pub struct Prune;
117
118/// Trait for marker types indicating what type of access a transaction has to the database.
119pub trait TransactionMode: Send + Sync {
120    fn begin(
121        conn: &mut <Db as Database>::Connection,
122    ) -> impl Future<Output = anyhow::Result<()>> + Send;
123
124    fn display() -> &'static str;
125}
126
127impl TransactionMode for Write {
128    #[allow(unused_variables)]
129    async fn begin(conn: &mut <Db as Database>::Connection) -> anyhow::Result<()> {
130        // SQLite automatically sets the read/write mode of a transactions based on the statements
131        // in it. However, there is still a good reason to explicitly enable write mode right from
132        // the start: if a transaction first executes a read statement and then a write statement,
133        // it will be upgraded from a read transaction to a write transaction. Because this involves
134        // obtaining a different kind of lock while already holding one, it can cause a deadlock,
135        // e.g.:
136        // * Transaction A executes a read statement, obtaining a read lock
137        // * Transaction B executes a write statement and begins waiting for a write lock
138        // * Transaction A executes a write statement and begins waiting for a write lock
139        //
140        // Transaction A can never obtain its write lock because it must first wait for transaction
141        // B to get a write lock, which cannot happen because B is in turn waiting for A to release
142        // its read lock.
143        //
144        // This type of deadlock cannot happen if transaction A immediately starts as a write, since
145        // it will then only ever try to acquire one type of lock (a write lock). By working with
146        // this restriction (transactions are either readers or writers, but never upgradable), we
147        // avoid deadlock, we more closely imitate the concurrency semantics of postgres, and we
148        // take advantage of the SQLite busy timeout, which may allow a transaction to acquire a
149        // lock and succeed (after a small delay), even when there was a conflicting transaction in
150        // progress. Whereas a deadlock is always an automatic rollback.
151        //
152        // The proper way to begin a write transaction in SQLite is with `BEGIN IMMEDIATE`. However,
153        // sqlx does not expose any way to customize the `BEGIN` statement that starts a
154        // transaction. A serviceable workaround is to perform some write statement before performing
155        // any read statement, ensuring that the first lock we acquire is exclusive. A write
156        // statement that has no actual effect on the database is suitable for this purpose, hence
157        // the `WHERE false`.
158        #[cfg(feature = "embedded-db")]
159        conn.execute("UPDATE pruned_height SET id = id WHERE false")
160            .await?;
161
162        // With Postgres things are much more straightforward: just tell Postgres we want a write
163        // transaction immediately after opening it.
164        #[cfg(not(feature = "embedded-db"))]
165        conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
166            .await?;
167
168        Ok(())
169    }
170
171    fn display() -> &'static str {
172        "write"
173    }
174}
175
176impl TransactionMode for Prune {
177    #[allow(unused_variables)]
178    async fn begin(conn: &mut <Db as Database>::Connection) -> anyhow::Result<()> {
179        // SQLite: same as Write -- acquire an exclusive lock immediately to avoid deadlocks.
180        #[cfg(feature = "embedded-db")]
181        conn.execute("UPDATE pruned_height SET id = id WHERE false")
182            .await?;
183
184        // Postgres: use READ COMMITTED to avoid predicate lock conflicts between pruning
185        // DELETE and concurrent consensus INSERT operations. Pruning does not need SERIALIZABLE
186        // guarantees since it only removes old data that is no longer read by consensus.
187        #[cfg(not(feature = "embedded-db"))]
188        conn.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
189            .await?;
190
191        Ok(())
192    }
193
194    fn display() -> &'static str {
195        "prune"
196    }
197}
198
199impl TransactionMode for Read {
200    #[allow(unused_variables)]
201    async fn begin(conn: &mut <Db as Database>::Connection) -> anyhow::Result<()> {
202        // With Postgres, we explicitly set the transaction mode to specify that we want the
203        // strongest possible consistency semantics in case of competing transactions
204        // (SERIALIZABLE), and we want to wait until this is possible rather than failing
205        // (DEFERRABLE).
206        //
207        // Setting `ESPRESSO_NODE_POSTGRES_NO_DEFERRABLE=true` disables the DEFERRABLE
208        // option, so that read transactions start immediately and may instead fail with a
209        // serialization error if they conflict with a concurrent write. This trades start-up
210        // latency for the chance of a retry, and is opt-in.
211        //
212        // With SQLite, there is nothing to be done here, as SQLite automatically starts
213        // transactions in read-only mode, and always has serializable concurrency unless we
214        // explicitly opt in to dirty reads with a pragma.
215        #[cfg(not(feature = "embedded-db"))]
216        {
217            let sql = if NO_DEFERRABLE_ON_READ.load(std::sync::atomic::Ordering::Relaxed) {
218                "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY"
219            } else {
220                "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE"
221            };
222            conn.execute(sql).await?;
223        }
224
225        Ok(())
226    }
227
228    fn display() -> &'static str {
229        "read-only"
230    }
231}
232
233#[derive(Clone, Copy, Debug)]
234enum CloseType {
235    Commit,
236    Revert,
237    Drop,
238}
239
240#[derive(Debug)]
241struct TransactionMetricsGuard<Mode> {
242    started_at: Instant,
243    metrics: PoolMetrics,
244    close_type: CloseType,
245    _mode: PhantomData<Mode>,
246}
247
248impl<Mode: TransactionMode> TransactionMetricsGuard<Mode> {
249    fn begin(metrics: PoolMetrics) -> Self {
250        let started_at = Instant::now();
251        tracing::trace!(mode = Mode::display(), ?started_at, "begin");
252        metrics.open_transactions.update(1);
253
254        Self {
255            started_at,
256            metrics,
257            close_type: CloseType::Drop,
258            _mode: Default::default(),
259        }
260    }
261
262    fn set_closed(&mut self, t: CloseType) {
263        self.close_type = t;
264    }
265}
266
267impl<Mode> Drop for TransactionMetricsGuard<Mode> {
268    fn drop(&mut self) {
269        self.metrics
270            .transaction_durations
271            .add_point((self.started_at.elapsed().as_millis() as f64) / 1000.);
272        self.metrics.open_transactions.update(-1);
273        match self.close_type {
274            CloseType::Commit => self.metrics.commits.add(1),
275            CloseType::Revert => self.metrics.reverts.add(1),
276            CloseType::Drop => self.metrics.drops.add(1),
277        }
278        tracing::trace!(started_at = ?self.started_at, reason = ?self.close_type, "close");
279    }
280}
281
282/// An atomic SQL transaction.
283#[derive(Debug, Deref, DerefMut)]
284pub struct Transaction<Mode> {
285    #[deref]
286    #[deref_mut]
287    inner: sqlx::Transaction<'static, Db>,
288    metrics: TransactionMetricsGuard<Mode>,
289}
290
291impl<Mode: TransactionMode> Transaction<Mode> {
292    pub(super) async fn new(pool: &Pool<Db>, metrics: PoolMetrics) -> anyhow::Result<Self> {
293        let mut inner = pool.begin().await?;
294        let metrics = TransactionMetricsGuard::begin(metrics);
295        Mode::begin(inner.as_mut()).await?;
296        Ok(Self { inner, metrics })
297    }
298}
299
300impl<Mode: TransactionMode> update::Transaction for Transaction<Mode> {
301    async fn commit(mut self) -> anyhow::Result<()> {
302        self.inner.commit().await?;
303        self.metrics.set_closed(CloseType::Commit);
304        Ok(())
305    }
306    fn revert(mut self) -> impl Future + Send {
307        async move {
308            self.inner.rollback().await.unwrap();
309            self.metrics.set_closed(CloseType::Revert);
310        }
311    }
312}
313
314/// A collection of parameters which can be bound to a SQL query.
315///
316/// This trait allows us to carry around hetergenous lists of parameters (e.g. tuples) and bind them
317/// to a query at the last moment before executing. This means we can manipulate the parameters
318/// independently of the query before executing it. For example, by requiring a trait bound of
319/// `Params<'p> + Clone`, we get a list (or tuple) of parameters which can be cloned and then bound
320/// to a query, which allows us to keep a copy of the parameters around in order to retry the query
321/// if it fails.
322///
323/// # Lifetimes
324///
325/// A SQL [`Query`] with lifetime `'q` borrows from both it's SQL statement (`&'q str`) and its
326/// parameters (bound via `bind<'q>`). Sometimes, though, it is necessary for the statement and its
327/// parameters to have different (but overlapping) lifetimes. For example, the parameters might be
328/// passed in and owned by the caller, while the query string is constructed in the callee and its
329/// lifetime is limited to the callee scope. (See for example the [`upsert`](Transaction::upsert)
330/// function which does exactly this.)
331///
332/// We could rectify this situation with a trait bound like `P: for<'q> Params<'q>`, meaning `P`
333/// must be bindable to a query with a lifetime chosen by the callee. However, when `P` is an
334/// associated type, such as an element of an iterator, as in
335/// `<I as IntoIter>::Item: for<'q> Params<'q>`, [a current limitation](https://blog.rust-lang.org/2022/10/28/gats-stabilization.html#implied-static-requirement-from-higher-ranked-trait-bounds.)
336/// in the Rust compiler then requires `P: 'static`, which we don't necessarily want: the caller
337/// should be able to pass in a reference to avoid expensive cloning.
338///
339/// So, instead, we work around this by making it explicit in the [`Params`] trait that the lifetime
340/// of the query we're binding to (`'q`) may be different than the lifetime of the parameters (`'p`)
341/// as long as the parameters outlive the duration of the query (the `'p: 'q`) bound on the
342/// [`bind`](Self::bind) function.
343pub trait Params<'p> {
344    fn bind<'q, 'r>(
345        self,
346        q: &'q mut Separated<'r, 'p, Db, &'static str>,
347    ) -> &'q mut Separated<'r, 'p, Db, &'static str>
348    where
349        'p: 'r;
350}
351
352/// A collection of parameters with a statically known length.
353///
354/// This is a simple trick for enforcing at compile time that a list of parameters has a certain
355/// length, such as matching the length of a list of column names. This can prevent easy mistakes
356/// like leaving out a parameter. It is implemented for tuples up to length 8.
357pub trait FixedLengthParams<'p, const N: usize>: Params<'p> {}
358
359macro_rules! impl_tuple_params {
360    ($n:literal, ($($t:ident,)+)) => {
361        impl<'p,  $($t),+> Params<'p> for ($($t,)+)
362        where $(
363            $t: 'p +  Encode<'p, Db> + Type<Db>
364        ),+{
365            fn bind<'q, 'r>(self, q: &'q mut Separated<'r, 'p, Db, &'static str>) ->   &'q mut Separated<'r, 'p, Db, &'static str>
366            where 'p: 'r,
367            {
368                #[allow(non_snake_case)]
369                let ($($t,)+) = self;
370                q $(
371                    .push_bind($t)
372                )+
373
374            }
375        }
376
377        impl<'p, $($t),+> FixedLengthParams<'p, $n> for ($($t,)+)
378        where $(
379            $t: 'p + for<'q> Encode<'q, Db> + Type<Db>
380        ),+ {
381        }
382    };
383}
384
385impl_tuple_params!(1, (T,));
386impl_tuple_params!(2, (T1, T2,));
387impl_tuple_params!(3, (T1, T2, T3,));
388impl_tuple_params!(4, (T1, T2, T3, T4,));
389impl_tuple_params!(5, (T1, T2, T3, T4, T5,));
390impl_tuple_params!(6, (T1, T2, T3, T4, T5, T6,));
391impl_tuple_params!(7, (T1, T2, T3, T4, T5, T6, T7,));
392impl_tuple_params!(8, (T1, T2, T3, T4, T5, T6, T7, T8,));
393
394pub fn build_where_in<'a, I>(
395    query: &'a str,
396    column: &'a str,
397    values: I,
398) -> QueryResult<(queries::QueryBuilder<'a>, String)>
399where
400    I: IntoIterator,
401    I::Item: 'a + Encode<'a, Db> + Type<Db>,
402{
403    let mut builder = queries::QueryBuilder::default();
404    let params = values
405        .into_iter()
406        .map(|v| Ok(format!("{} ", builder.bind(v)?)))
407        .collect::<QueryResult<Vec<String>>>()?;
408
409    if params.is_empty() {
410        return Err(QueryError::Error {
411            message: "failed to build WHERE IN query. No parameter found ".to_string(),
412        });
413    }
414
415    let sql = format!(
416        "{query} where {column} IN ({}) ",
417        params.into_iter().join(",")
418    );
419
420    Ok((builder, sql))
421}
422
423/// Low-level, general database queries and mutation.
424impl Transaction<Write> {
425    /// Maximum number of parameters allowed in a single query.
426    ///
427    /// This should be safely under the hard limit imposed by the Postgres client, which is either
428    /// 32,767 or 65,535.
429    const STATEMENT_MAX_PARAMETERS: usize = 30_000;
430
431    pub async fn upsert<'p, const N: usize, R>(
432        &mut self,
433        table: &str,
434        columns: [&str; N],
435        pk: impl IntoIterator<Item = &str>,
436        rows: R,
437    ) -> anyhow::Result<()>
438    where
439        R: IntoIterator,
440        R::Item: 'p + FixedLengthParams<'p, N>,
441    {
442        let set_columns = columns
443            .iter()
444            .map(|col| format!("{col} = excluded.{col}"))
445            .join(",");
446
447        let columns_str = columns.iter().map(|col| format!("\"{col}\"")).join(",");
448
449        let pk = pk.into_iter().join(",");
450
451        let rows: Vec<_> = rows.into_iter().collect();
452        let num_rows = rows.len();
453
454        if num_rows == 0 {
455            tracing::warn!("trying to upsert 0 rows into {table}, this has no effect");
456            return Ok(());
457        }
458
459        // For very large upserts, we might need to proceed in chunks to avoid exceeding the maximum
460        // number of parameters in a single statement.
461        let rows_per_chunk = Self::STATEMENT_MAX_PARAMETERS / N;
462        let mut rows = rows.into_iter();
463        loop {
464            let chunk = rows.by_ref().take(rows_per_chunk).collect::<Vec<_>>();
465            if chunk.is_empty() {
466                break;
467            }
468            let num_rows = chunk.len();
469            tracing::debug!(num_rows, "upsert chunk");
470
471            let mut query_builder =
472                QueryBuilder::new(format!("INSERT INTO \"{table}\" ({columns_str}) "));
473            query_builder.push_values(chunk, |mut b, row| {
474                row.bind(&mut b);
475            });
476            query_builder.push(format!(" ON CONFLICT ({pk}) DO UPDATE SET {set_columns}"));
477
478            let query = query_builder.build();
479            let statement = query.sql();
480
481            let res = self.execute(query).await.inspect_err(|err| {
482                tracing::error!(statement, "error in statement execution: {err:#}");
483            })?;
484            let rows_modified = res.rows_affected() as usize;
485            if rows_modified != num_rows {
486                let error = format!(
487                    "unexpected number of rows modified: expected {num_rows}, got \
488                     {rows_modified}. query: {statement}"
489                );
490                tracing::error!(error);
491                bail!(error);
492            }
493        }
494        Ok(())
495    }
496}
497
498/// Pruning mutations, run under READ COMMITTED isolation on Postgres.
499impl Transaction<Prune> {
500    /// Delete a batch of data for pruning.
501    ///
502    /// Payloads/vid_common are GC'd after header deletion using NOT EXISTS. Under READ
503    /// COMMITTED, if a concurrent insert holds a lock on a payload row, the DELETE waits
504    /// and re-evaluates with a fresh snapshot after the insert commits. If the payload was
505    /// already deleted, the inserting SERIALIZABLE transaction gets a serialization error
506    /// and retries, recreating the payload.
507    #[instrument(skip(self))]
508    pub(super) async fn delete_batch(&mut self, height: u64) -> anyhow::Result<()> {
509        let res = query("DELETE FROM transactions WHERE block_height <= $1")
510            .bind(height as i64)
511            .execute(self.as_mut())
512            .await
513            .context("deleting transactions")?;
514        tracing::debug!(rows_affected = res.rows_affected(), "pruned transactions");
515
516        let res = query("DELETE FROM leaf2 WHERE height <= $1")
517            .bind(height as i64)
518            .execute(self.as_mut())
519            .await
520            .context("deleting leaf2")?;
521        tracing::debug!(rows_affected = res.rows_affected(), "pruned leaf2");
522
523        let res = query("DELETE FROM header WHERE height <= $1")
524            .bind(height as i64)
525            .execute(self.as_mut())
526            .await
527            .context("deleting headers")?;
528        tracing::debug!(rows_affected = res.rows_affected(), "pruned headers");
529
530        let res = query(
531            "DELETE FROM payload AS p
532             WHERE NOT EXISTS (
533                SELECT 1 FROM header AS h
534                WHERE h.payload_hash = p.hash AND h.ns_table = p.ns_table
535             )",
536        )
537        .execute(self.as_mut())
538        .await
539        .context("garbage collecting payloads")?;
540        tracing::debug!(
541            rows_affected = res.rows_affected(),
542            "garbage collected payloads"
543        );
544
545        let res = query(
546            "DELETE FROM vid_common AS v
547             WHERE NOT EXISTS (
548                SELECT 1 FROM header AS h
549                WHERE h.payload_hash = v.hash
550             )",
551        )
552        .execute(self.as_mut())
553        .await
554        .context("garbage collecting VID common")?;
555        tracing::debug!(
556            rows_affected = res.rows_affected(),
557            "garbage collected VID common"
558        );
559
560        Ok(())
561    }
562
563    /// Prune merklized state tables.
564    ///
565    /// Only deletes nodes having `created <= height` that are not the newest node at their position.
566    #[instrument(skip(self))]
567    pub(super) async fn delete_state_batch(
568        &mut self,
569        state_tables: Vec<String>,
570        height: u64,
571    ) -> anyhow::Result<()> {
572        for state_table in state_tables {
573            self.execute(
574                query(&format!(
575                    "
576                DELETE FROM {state_table}
577                WHERE {state_table}.created <= $1
578                  AND EXISTS (
579                    SELECT 1 FROM {state_table} AS t2
580                    WHERE t2.path = {state_table}.path
581                      AND t2.created > {state_table}.created
582                      AND t2.created <= $1
583                  )"
584                ))
585                .bind(height as i64),
586            )
587            .await?;
588        }
589
590        Ok(())
591    }
592}
593
594/// Query service specific mutations.
595impl Transaction<Write> {
596    /// Record the height of the latest pruned header.
597    pub(crate) async fn save_pruned_height(&mut self, height: u64) -> anyhow::Result<()> {
598        // id is set to 1 so that there is only one row in the table.
599        // height is updated if the row already exists.
600        self.upsert(
601            "pruned_height",
602            ["id", "last_height"],
603            ["id"],
604            [(1i32, height as i64)],
605        )
606        .await
607    }
608}
609
610impl<Types> UpdateAvailabilityStorage<Types> for Transaction<Write>
611where
612    Types: NodeType,
613    Payload<Types>: QueryablePayload<Types>,
614    Header<Types>: QueryableHeader<Types>,
615{
616    async fn insert_qc_chain(
617        &mut self,
618        height: u64,
619        qc_chain: Option<[CertificatePair<Types>; 2]>,
620    ) -> anyhow::Result<()> {
621        let block_height = NodeStorage::<Types>::block_height(self).await? as u64;
622        if height + 1 >= block_height {
623            // If this QC chain is for the latest leaf we know about, store it so that we can prove
624            // to clients that the corresponding leaf is finalized. (If it is not the latest leaf,
625            // this is unnecessary, since we can prove it is an ancestor of some later, finalized
626            // leaf.)
627            let qcs = serde_json::to_value(&qc_chain)?;
628            self.upsert("latest_qc_chain", ["id", "qcs"], ["id"], [(1i32, qcs)])
629                .await
630                .context("inserting QC chain")?;
631        }
632
633        Ok(())
634    }
635
636    async fn insert_cert2(
637        &mut self,
638        height: u64,
639        cert2: Certificate2<Types>,
640    ) -> anyhow::Result<()> {
641        let cert2_json = serde_json::to_value(&cert2)?;
642        self.upsert(
643            "cert2",
644            ["height", "data"],
645            ["height"],
646            [(height as i64, cert2_json)],
647        )
648        .await
649        .context("inserting cert2")?;
650        Ok(())
651    }
652
653    async fn insert_leaf_range<'a>(
654        &mut self,
655        leaves: impl Send + IntoIterator<IntoIter: Send, Item = &'a LeafQueryData<Types>>,
656    ) -> anyhow::Result<()> {
657        let leaves = leaves.into_iter();
658
659        // Ignore leaves below the pruned height.
660        let pruned_height = self.load_pruned_height().await?;
661        let leaves = leaves.skip_while(|leaf| pruned_height.is_some_and(|h| leaf.height() <= h));
662
663        // While we don't necessarily have the full block for these leaves yet, we can initialize
664        // the header and leaf tables with block metadata taken from the leaves.
665        let (header_rows, leaf_rows): (Vec<_>, Vec<_>) = leaves
666            .map(|leaf| {
667                let header_json = serde_json::to_value(leaf.leaf().block_header())
668                    .context("failed to serialize header")?;
669                let header_row = (
670                    leaf.height() as i64,
671                    leaf.block_hash().to_string(),
672                    leaf.leaf().block_header().payload_commitment().to_string(),
673                    leaf.leaf().block_header().ns_table(),
674                    header_json,
675                    leaf.leaf().block_header().timestamp() as i64,
676                );
677
678                let leaf_json =
679                    serde_json::to_value(leaf.leaf()).context("failed to serialize leaf")?;
680                let qc_json = serde_json::to_value(leaf.qc()).context("failed to serialize QC")?;
681                let leaf_row = (
682                    leaf.height() as i64,
683                    leaf.hash().to_string(),
684                    leaf.block_hash().to_string(),
685                    leaf_json,
686                    qc_json,
687                );
688
689                anyhow::Ok((header_row, leaf_row))
690            })
691            .process_results(|iter| iter.unzip())?;
692
693        self.upsert(
694            "header",
695            [
696                "height",
697                "hash",
698                "payload_hash",
699                "ns_table",
700                "data",
701                "timestamp",
702            ],
703            ["height"],
704            header_rows,
705        )
706        .await
707        .context("inserting headers")?;
708
709        // Insert the leaves themselves, which reference the header rows we created.
710        self.upsert(
711            "leaf2",
712            ["height", "hash", "block_hash", "leaf", "qc"],
713            ["height"],
714            leaf_rows,
715        )
716        .await
717        .context("inserting leaves")?;
718
719        Ok(())
720    }
721
722    async fn insert_block_range<'a>(
723        &mut self,
724        blocks: impl Send + IntoIterator<IntoIter: Send, Item = &'a BlockQueryData<Types>>,
725    ) -> anyhow::Result<()> {
726        let blocks = blocks.into_iter();
727
728        // Ignore blocks below the pruned height.
729        let pruned_height = self.load_pruned_height().await?;
730        let blocks = blocks.skip_while(|block| pruned_height.is_some_and(|h| block.height() <= h));
731
732        let (payload_rows, tx_rows): (Vec<_>, Vec<_>) = blocks
733            .map(|block| {
734                let payload_row = (
735                    block.payload_hash().to_string(),
736                    block.header().ns_table(),
737                    block.size() as i32,
738                    block.num_transactions() as i32,
739                    block.payload.encode().as_ref().to_vec(),
740                );
741
742                let tx_rows = block.enumerate().map(|(txn_ix, txn)| {
743                    let ns_id = block.header().namespace_id(&txn_ix.ns_index).unwrap();
744                    (
745                        txn.commit().to_string(),
746                        block.height() as i64,
747                        txn_ix.ns_index.into(),
748                        ns_id.into(),
749                        txn_ix.position as i64,
750                    )
751                });
752
753                (payload_row, tx_rows)
754            })
755            .unzip();
756        let tx_rows = tx_rows.into_iter().flatten().collect::<Vec<_>>();
757
758        // Multiple blocks in the range might have the same payload. We must filter out such
759        // duplicates, because SQL does not allow conflicting rows in a single upsert statement.
760        let payload_rows = payload_rows
761            .into_iter()
762            .unique_by(|(hash, ns_table, ..)| (hash.clone(), ns_table.clone()));
763
764        self.upsert(
765            "payload",
766            ["hash", "ns_table", "size", "num_transactions", "data"],
767            ["hash", "ns_table"],
768            payload_rows,
769        )
770        .await
771        .context("inserting payloads")?;
772
773        // Index the transactions and namespaces in the block.
774        if !tx_rows.is_empty() {
775            self.upsert(
776                "transactions",
777                ["hash", "block_height", "ns_index", "ns_id", "position"],
778                ["block_height", "ns_id", "position"],
779                tx_rows,
780            )
781            .await
782            .context("inserting transactions")?;
783        }
784
785        Ok(())
786    }
787
788    async fn insert_vid_range<'a>(
789        &mut self,
790        vid: impl Send
791        + IntoIterator<
792            IntoIter: Send,
793            Item = (&'a VidCommonQueryData<Types>, Option<&'a VidShare>),
794        >,
795    ) -> anyhow::Result<()> {
796        let vid = vid.into_iter();
797
798        // Ignore objects below the pruned height.
799        let pruned_height = self.load_pruned_height().await?;
800        let vid = vid.skip_while(|(common, _)| pruned_height.is_some_and(|h| common.height() <= h));
801
802        let (common_rows, share_rows): (Vec<_>, Vec<_>) = vid
803            .map(|(common, share)| {
804                let common_data = bincode::serialize(common.common())
805                    .context("failed to serialize VID common data")?;
806                let common_row = (common.payload_hash().to_string(), common_data);
807
808                let share_row = if let Some(share) = share {
809                    let share_data =
810                        bincode::serialize(&share).context("failed to serialize VID share")?;
811                    Some((common.height() as i64, share_data))
812                } else {
813                    None
814                };
815
816                anyhow::Ok((common_row, share_row))
817            })
818            .process_results(|iter| iter.unzip())?;
819        let share_rows = share_rows.into_iter().flatten().collect::<Vec<_>>();
820
821        // Multiple blocks in the range might have the same VID common. We must filter out such
822        // duplicates, because SQL does not allow conflicting rows in a single upsert statement.
823        let common_rows = common_rows.into_iter().unique_by(|(hash, ..)| hash.clone());
824
825        self.upsert("vid_common", ["hash", "data"], ["hash"], common_rows)
826            .await
827            .context("inserting VID common")?;
828
829        if !share_rows.is_empty() {
830            let mut q = QueryBuilder::new("WITH rows (height, share) AS (");
831            q.push_values(share_rows, |mut q, (height, share)| {
832                q.push_bind(height).push_bind(share);
833            });
834            q.push(
835                ") UPDATE header SET vid_share = rows.share
836                FROM rows
837                WHERE header.height = rows.height",
838            );
839            q.build()
840                .execute(self.as_mut())
841                .await
842                .context("inserting VID shares")?;
843        }
844
845        Ok(())
846    }
847}
848
849#[async_trait]
850impl<Types: NodeType, State: MerklizedState<Types, ARITY>, const ARITY: usize>
851    UpdateStateData<Types, State, ARITY> for Transaction<Write>
852{
853    async fn set_last_state_height(&mut self, height: usize) -> anyhow::Result<()> {
854        self.upsert(
855            "last_merklized_state_height",
856            ["id", "height"],
857            ["id"],
858            [(1i32, height as i64)],
859        )
860        .await?;
861
862        Ok(())
863    }
864
865    async fn insert_merkle_nodes(
866        &mut self,
867        proof: MerkleProof<State::Entry, State::Key, State::T, ARITY>,
868        traversal_path: Vec<usize>,
869        block_number: u64,
870    ) -> anyhow::Result<()> {
871        let proofs = vec![(proof, traversal_path)];
872        UpdateStateData::<Types, State, ARITY>::insert_merkle_nodes_batch(
873            self,
874            proofs,
875            block_number,
876        )
877        .await
878    }
879
880    async fn insert_merkle_nodes_batch(
881        &mut self,
882        proofs: Vec<(
883            MerkleProof<State::Entry, State::Key, State::T, ARITY>,
884            Vec<usize>,
885        )>,
886        block_number: u64,
887    ) -> anyhow::Result<()> {
888        if proofs.is_empty() {
889            return Ok(());
890        }
891
892        let name = State::state_type();
893        let block_number = block_number as i64;
894
895        let (mut all_nodes, all_hashes) = collect_nodes_from_proofs(&proofs)?;
896        let hashes: Vec<Vec<u8>> = all_hashes.into_iter().collect();
897
898        #[cfg(not(feature = "embedded-db"))]
899        let nodes_hash_ids: HashMap<Vec<u8>, i32> = batch_insert_hashes(hashes, self).await?;
900
901        #[cfg(feature = "embedded-db")]
902        let nodes_hash_ids: HashMap<Vec<u8>, i32> = {
903            let mut hash_ids: HashMap<Vec<u8>, i32> = HashMap::with_capacity(hashes.len());
904            for hash_chunk in hashes.chunks(20) {
905                let (query, sql) = build_hash_batch_insert(hash_chunk)?;
906                let chunk_ids: HashMap<Vec<u8>, i32> = query
907                    .query_as(&sql)
908                    .fetch(self.as_mut())
909                    .try_collect()
910                    .await?;
911                hash_ids.extend(chunk_ids);
912            }
913            hash_ids
914        };
915
916        for (node, children, hash) in &mut all_nodes {
917            node.created = block_number;
918            node.hash_id = *nodes_hash_ids.get(&*hash).ok_or(QueryError::Error {
919                message: "Missing node hash".to_string(),
920            })?;
921
922            if let Some(children) = children {
923                let children_hashes = children
924                    .iter()
925                    .map(|c| nodes_hash_ids.get(c).copied())
926                    .collect::<Option<Vec<i32>>>()
927                    .ok_or(QueryError::Error {
928                        message: "Missing child hash".to_string(),
929                    })?;
930
931                node.children = Some(children_hashes.into());
932            }
933        }
934
935        Node::upsert(name, all_nodes.into_iter().map(|(n, ..)| n), self).await?;
936
937        Ok(())
938    }
939}
940
941#[async_trait]
942impl<Mode: TransactionMode> PrunedHeightStorage for Transaction<Mode> {
943    async fn load_pruned_height(&mut self) -> anyhow::Result<Option<u64>> {
944        let Some((height,)) =
945            query_as::<(i64,)>("SELECT last_height FROM pruned_height ORDER BY id DESC LIMIT 1")
946                .fetch_optional(self.as_mut())
947                .await?
948        else {
949            return Ok(None);
950        };
951        Ok(Some(height as u64))
952    }
953}
954
955#[derive(Clone, Debug)]
956pub(super) struct PoolMetrics {
957    open_transactions: Box<dyn Gauge>,
958    transaction_durations: Box<dyn Histogram>,
959    commits: Box<dyn Counter>,
960    reverts: Box<dyn Counter>,
961    drops: Box<dyn Counter>,
962}
963
964impl PoolMetrics {
965    pub(super) fn new(metrics: &(impl Metrics + ?Sized)) -> Self {
966        Self {
967            open_transactions: metrics.create_gauge("open_transactions".into(), None),
968            transaction_durations: metrics
969                .create_histogram("transaction_duration".into(), Some("s".into())),
970            commits: metrics.create_counter("committed_transactions".into(), None),
971            reverts: metrics.create_counter("reverted_transactions".into(), None),
972            drops: metrics.create_counter("dropped_transactions".into(), None),
973        }
974    }
975}
976
977#[cfg(test)]
978mod test {
979    use super::*;
980    use crate::data_source::{
981        Transaction as _, VersionedDataSource,
982        sql::testing::TmpDb,
983        storage::{SqlStorage, StorageConnectionType},
984    };
985
986    #[tokio::test]
987    #[test_log::test]
988    async fn test_upsert_many_rows() {
989        let db = TmpDb::init().await;
990        let storage = SqlStorage::connect(db.config(), StorageConnectionType::Sequencer)
991            .await
992            .unwrap();
993
994        let mut tx = storage.write().await.unwrap();
995        query(
996            "CREATE TABLE test (
997                a INT PRIMARY KEY,
998                b INT,
999                c INT
1000            )",
1001        )
1002        .execute(tx.as_mut())
1003        .await
1004        .unwrap();
1005        tx.commit().await.unwrap();
1006
1007        // use a non-integer number of chunks.
1008        let n = (2 * Transaction::STATEMENT_MAX_PARAMETERS
1009            + (Transaction::STATEMENT_MAX_PARAMETERS / 2)) as i32;
1010        let rows = (0..n).map(|i| (i, i, i)).collect::<Vec<_>>();
1011
1012        let mut tx = storage.write().await.unwrap();
1013        tx.upsert("test", ["a", "b", "c"], ["a"], rows.clone())
1014            .await
1015            .unwrap();
1016        tx.commit().await.unwrap();
1017
1018        let mut tx = storage.read().await.unwrap();
1019        assert_eq!(
1020            rows,
1021            query_as("SELECT * FROM test ORDER BY a")
1022                .fetch_all(tx.as_mut())
1023                .await
1024                .unwrap()
1025        );
1026    }
1027}