hotshot_query_service/data_source/storage/sql/
transaction.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! SQL transactions
14//!
15//! A transaction encapsulates all the mutable functionality provided by the SQL database, and
16//! allows for mutable operations to be combined into complex updates that affect the main database
17//! atomically. A transaction also provides all the immutable query functionality of a regular
18//! database connection, so that the updated state of the database can be queried midway through a
19//! transaction.
20
21use std::{collections::HashMap, marker::PhantomData, time::Instant};
22
23use anyhow::{Context, bail};
24use async_trait::async_trait;
25use committable::Committable;
26use derive_more::{Deref, DerefMut};
27use futures::future::Future;
28#[cfg(feature = "embedded-db")]
29use futures::stream::TryStreamExt;
30use hotshot_types::{
31    data::VidShare,
32    simple_certificate::CertificatePair,
33    traits::{
34        EncodeBytes,
35        block_contents::BlockHeader,
36        metrics::{Counter, Gauge, Histogram, Metrics},
37        node_implementation::NodeType,
38    },
39};
40use itertools::Itertools;
41use jf_merkle_tree_compat::prelude::MerkleProof;
42pub use sqlx::Executor;
43use sqlx::{Encode, Execute, FromRow, QueryBuilder, Type, pool::Pool, query_builder::Separated};
44use tracing::instrument;
45
46#[cfg(not(feature = "embedded-db"))]
47use super::queries::state::batch_insert_hashes;
48#[cfg(feature = "embedded-db")]
49use super::queries::state::build_hash_batch_insert;
50use super::{
51    Database, Db,
52    queries::{
53        self,
54        state::{Node, collect_nodes_from_proofs},
55    },
56};
57use crate::{
58    Header, Payload, QueryError, QueryResult,
59    availability::{
60        BlockQueryData, LeafQueryData, QueryableHeader, QueryablePayload, VidCommonQueryData,
61    },
62    data_source::{
63        storage::{NodeStorage, UpdateAvailabilityStorage, pruning::PrunedHeightStorage},
64        update,
65    },
66    merklized_state::{MerklizedState, UpdateStateData},
67    types::HeightIndexed,
68};
69
70pub type Query<'q> = sqlx::query::Query<'q, Db, <Db as Database>::Arguments<'q>>;
71pub type QueryAs<'q, T> = sqlx::query::QueryAs<'q, Db, T, <Db as Database>::Arguments<'q>>;
72
73pub fn query(sql: &str) -> Query<'_> {
74    sqlx::query(sql)
75}
76
77pub fn query_as<'q, T>(sql: &'q str) -> QueryAs<'q, T>
78where
79    T: for<'r> FromRow<'r, <Db as Database>::Row>,
80{
81    sqlx::query_as(sql)
82}
83
84/// Marker type indicating a transaction with read-write access to the database.
85#[derive(Clone, Copy, Debug, Default)]
86pub struct Write;
87
88/// Marker type indicating a transaction with read-only access to the database.
89#[derive(Clone, Copy, Debug, Default)]
90pub struct Read;
91
92/// Trait for marker types indicating what type of access a transaction has to the database.
93pub trait TransactionMode: Send + Sync {
94    fn begin(
95        conn: &mut <Db as Database>::Connection,
96    ) -> impl Future<Output = anyhow::Result<()>> + Send;
97
98    fn display() -> &'static str;
99}
100
101impl TransactionMode for Write {
102    #[allow(unused_variables)]
103    async fn begin(conn: &mut <Db as Database>::Connection) -> anyhow::Result<()> {
104        // SQLite automatically sets the read/write mode of a transactions based on the statements
105        // in it. However, there is still a good reason to explicitly enable write mode right from
106        // the start: if a transaction first executes a read statement and then a write statement,
107        // it will be upgraded from a read transaction to a write transaction. Because this involves
108        // obtaining a different kind of lock while already holding one, it can cause a deadlock,
109        // e.g.:
110        // * Transaction A executes a read statement, obtaining a read lock
111        // * Transaction B executes a write statement and begins waiting for a write lock
112        // * Transaction A executes a write statement and begins waiting for a write lock
113        //
114        // Transaction A can never obtain its write lock because it must first wait for transaction
115        // B to get a write lock, which cannot happen because B is in turn waiting for A to release
116        // its read lock.
117        //
118        // This type of deadlock cannot happen if transaction A immediately starts as a write, since
119        // it will then only ever try to acquire one type of lock (a write lock). By working with
120        // this restriction (transactions are either readers or writers, but never upgradable), we
121        // avoid deadlock, we more closely imitate the concurrency semantics of postgres, and we
122        // take advantage of the SQLite busy timeout, which may allow a transaction to acquire a
123        // lock and succeed (after a small delay), even when there was a conflicting transaction in
124        // progress. Whereas a deadlock is always an automatic rollback.
125        //
126        // The proper way to begin a write transaction in SQLite is with `BEGIN IMMEDIATE`. However,
127        // sqlx does not expose any way to customize the `BEGIN` statement that starts a
128        // transaction. A serviceable workaround is to perform some write statement before performing
129        // any read statement, ensuring that the first lock we acquire is exclusive. A write
130        // statement that has no actual effect on the database is suitable for this purpose, hence
131        // the `WHERE false`.
132        #[cfg(feature = "embedded-db")]
133        conn.execute("UPDATE pruned_height SET id = id WHERE false")
134            .await?;
135
136        // With Postgres things are much more straightforward: just tell Postgres we want a write
137        // transaction immediately after opening it.
138        #[cfg(not(feature = "embedded-db"))]
139        conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
140            .await?;
141
142        Ok(())
143    }
144
145    fn display() -> &'static str {
146        "write"
147    }
148}
149
150impl TransactionMode for Read {
151    #[allow(unused_variables)]
152    async fn begin(conn: &mut <Db as Database>::Connection) -> anyhow::Result<()> {
153        // With Postgres, we explicitly set the transaction mode to specify that we want the
154        // strongest possible consistency semantics in case of competing transactions
155        // (SERIALIZABLE), and we want to wait until this is possible rather than failing
156        // (DEFERRABLE).
157        //
158        // With SQLite, there is nothing to be done here, as SQLite automatically starts
159        // transactions in read-only mode, and always has serializable concurrency unless we
160        // explicitly opt in to dirty reads with a pragma.
161        #[cfg(not(feature = "embedded-db"))]
162        conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE")
163            .await?;
164
165        Ok(())
166    }
167
168    fn display() -> &'static str {
169        "read-only"
170    }
171}
172
173#[derive(Clone, Copy, Debug)]
174enum CloseType {
175    Commit,
176    Revert,
177    Drop,
178}
179
180#[derive(Debug)]
181struct TransactionMetricsGuard<Mode> {
182    started_at: Instant,
183    metrics: PoolMetrics,
184    close_type: CloseType,
185    _mode: PhantomData<Mode>,
186}
187
188impl<Mode: TransactionMode> TransactionMetricsGuard<Mode> {
189    fn begin(metrics: PoolMetrics) -> Self {
190        let started_at = Instant::now();
191        tracing::trace!(mode = Mode::display(), ?started_at, "begin");
192        metrics.open_transactions.update(1);
193
194        Self {
195            started_at,
196            metrics,
197            close_type: CloseType::Drop,
198            _mode: Default::default(),
199        }
200    }
201
202    fn set_closed(&mut self, t: CloseType) {
203        self.close_type = t;
204    }
205}
206
207impl<Mode> Drop for TransactionMetricsGuard<Mode> {
208    fn drop(&mut self) {
209        self.metrics
210            .transaction_durations
211            .add_point((self.started_at.elapsed().as_millis() as f64) / 1000.);
212        self.metrics.open_transactions.update(-1);
213        match self.close_type {
214            CloseType::Commit => self.metrics.commits.add(1),
215            CloseType::Revert => self.metrics.reverts.add(1),
216            CloseType::Drop => self.metrics.drops.add(1),
217        }
218        tracing::trace!(started_at = ?self.started_at, reason = ?self.close_type, "close");
219    }
220}
221
222/// An atomic SQL transaction.
223#[derive(Debug, Deref, DerefMut)]
224pub struct Transaction<Mode> {
225    #[deref]
226    #[deref_mut]
227    inner: sqlx::Transaction<'static, Db>,
228    metrics: TransactionMetricsGuard<Mode>,
229}
230
231impl<Mode: TransactionMode> Transaction<Mode> {
232    pub(super) async fn new(pool: &Pool<Db>, metrics: PoolMetrics) -> anyhow::Result<Self> {
233        let mut inner = pool.begin().await?;
234        let metrics = TransactionMetricsGuard::begin(metrics);
235        Mode::begin(inner.as_mut()).await?;
236        Ok(Self { inner, metrics })
237    }
238}
239
240impl<Mode: TransactionMode> update::Transaction for Transaction<Mode> {
241    async fn commit(mut self) -> anyhow::Result<()> {
242        self.inner.commit().await?;
243        self.metrics.set_closed(CloseType::Commit);
244        Ok(())
245    }
246    fn revert(mut self) -> impl Future + Send {
247        async move {
248            self.inner.rollback().await.unwrap();
249            self.metrics.set_closed(CloseType::Revert);
250        }
251    }
252}
253
254/// A collection of parameters which can be bound to a SQL query.
255///
256/// This trait allows us to carry around hetergenous lists of parameters (e.g. tuples) and bind them
257/// to a query at the last moment before executing. This means we can manipulate the parameters
258/// independently of the query before executing it. For example, by requiring a trait bound of
259/// `Params<'p> + Clone`, we get a list (or tuple) of parameters which can be cloned and then bound
260/// to a query, which allows us to keep a copy of the parameters around in order to retry the query
261/// if it fails.
262///
263/// # Lifetimes
264///
265/// A SQL [`Query`] with lifetime `'q` borrows from both it's SQL statement (`&'q str`) and its
266/// parameters (bound via `bind<'q>`). Sometimes, though, it is necessary for the statement and its
267/// parameters to have different (but overlapping) lifetimes. For example, the parameters might be
268/// passed in and owned by the caller, while the query string is constructed in the callee and its
269/// lifetime is limited to the callee scope. (See for example the [`upsert`](Transaction::upsert)
270/// function which does exactly this.)
271///
272/// We could rectify this situation with a trait bound like `P: for<'q> Params<'q>`, meaning `P`
273/// must be bindable to a query with a lifetime chosen by the callee. However, when `P` is an
274/// associated type, such as an element of an iterator, as in
275/// `<I as IntoIter>::Item: for<'q> Params<'q>`, [a current limitation](https://blog.rust-lang.org/2022/10/28/gats-stabilization.html#implied-static-requirement-from-higher-ranked-trait-bounds.)
276/// in the Rust compiler then requires `P: 'static`, which we don't necessarily want: the caller
277/// should be able to pass in a reference to avoid expensive cloning.
278///
279/// So, instead, we work around this by making it explicit in the [`Params`] trait that the lifetime
280/// of the query we're binding to (`'q`) may be different than the lifetime of the parameters (`'p`)
281/// as long as the parameters outlive the duration of the query (the `'p: 'q`) bound on the
282/// [`bind`](Self::bind) function.
283pub trait Params<'p> {
284    fn bind<'q, 'r>(
285        self,
286        q: &'q mut Separated<'r, 'p, Db, &'static str>,
287    ) -> &'q mut Separated<'r, 'p, Db, &'static str>
288    where
289        'p: 'r;
290}
291
292/// A collection of parameters with a statically known length.
293///
294/// This is a simple trick for enforcing at compile time that a list of parameters has a certain
295/// length, such as matching the length of a list of column names. This can prevent easy mistakes
296/// like leaving out a parameter. It is implemented for tuples up to length 8.
297pub trait FixedLengthParams<'p, const N: usize>: Params<'p> {}
298
299macro_rules! impl_tuple_params {
300    ($n:literal, ($($t:ident,)+)) => {
301        impl<'p,  $($t),+> Params<'p> for ($($t,)+)
302        where $(
303            $t: 'p +  Encode<'p, Db> + Type<Db>
304        ),+{
305            fn bind<'q, 'r>(self, q: &'q mut Separated<'r, 'p, Db, &'static str>) ->   &'q mut Separated<'r, 'p, Db, &'static str>
306            where 'p: 'r,
307            {
308                #[allow(non_snake_case)]
309                let ($($t,)+) = self;
310                q $(
311                    .push_bind($t)
312                )+
313
314            }
315        }
316
317        impl<'p, $($t),+> FixedLengthParams<'p, $n> for ($($t,)+)
318        where $(
319            $t: 'p + for<'q> Encode<'q, Db> + Type<Db>
320        ),+ {
321        }
322    };
323}
324
325impl_tuple_params!(1, (T,));
326impl_tuple_params!(2, (T1, T2,));
327impl_tuple_params!(3, (T1, T2, T3,));
328impl_tuple_params!(4, (T1, T2, T3, T4,));
329impl_tuple_params!(5, (T1, T2, T3, T4, T5,));
330impl_tuple_params!(6, (T1, T2, T3, T4, T5, T6,));
331impl_tuple_params!(7, (T1, T2, T3, T4, T5, T6, T7,));
332impl_tuple_params!(8, (T1, T2, T3, T4, T5, T6, T7, T8,));
333
334pub fn build_where_in<'a, I>(
335    query: &'a str,
336    column: &'a str,
337    values: I,
338) -> QueryResult<(queries::QueryBuilder<'a>, String)>
339where
340    I: IntoIterator,
341    I::Item: 'a + Encode<'a, Db> + Type<Db>,
342{
343    let mut builder = queries::QueryBuilder::default();
344    let params = values
345        .into_iter()
346        .map(|v| Ok(format!("{} ", builder.bind(v)?)))
347        .collect::<QueryResult<Vec<String>>>()?;
348
349    if params.is_empty() {
350        return Err(QueryError::Error {
351            message: "failed to build WHERE IN query. No parameter found ".to_string(),
352        });
353    }
354
355    let sql = format!(
356        "{query} where {column} IN ({}) ",
357        params.into_iter().join(",")
358    );
359
360    Ok((builder, sql))
361}
362
363/// Low-level, general database queries and mutation.
364impl Transaction<Write> {
365    pub async fn upsert<'p, const N: usize, R>(
366        &mut self,
367        table: &str,
368        columns: [&str; N],
369        pk: impl IntoIterator<Item = &str>,
370        rows: R,
371    ) -> anyhow::Result<()>
372    where
373        R: IntoIterator,
374        R::Item: 'p + FixedLengthParams<'p, N>,
375    {
376        let set_columns = columns
377            .iter()
378            .map(|col| format!("{col} = excluded.{col}"))
379            .join(",");
380
381        let columns_str = columns.iter().map(|col| format!("\"{col}\"")).join(",");
382
383        let pk = pk.into_iter().join(",");
384
385        let rows: Vec<_> = rows.into_iter().collect();
386        let num_rows = rows.len();
387
388        if num_rows == 0 {
389            tracing::warn!("trying to upsert 0 rows into {table}, this has no effect");
390            return Ok(());
391        }
392
393        let mut query_builder =
394            QueryBuilder::new(format!("INSERT INTO \"{table}\" ({columns_str}) "));
395        query_builder.push_values(rows, |mut b, row| {
396            row.bind(&mut b);
397        });
398        query_builder.push(format!(" ON CONFLICT ({pk}) DO UPDATE SET {set_columns}"));
399
400        let query = query_builder.build();
401        let statement = query.sql();
402
403        let res = self.execute(query).await.inspect_err(|err| {
404            tracing::error!(statement, "error in statement execution: {err:#}");
405        })?;
406        let rows_modified = res.rows_affected() as usize;
407        if rows_modified != num_rows {
408            let error = format!(
409                "unexpected number of rows modified: expected {num_rows}, got {rows_modified}. \
410                 query: {statement}"
411            );
412            tracing::error!(error);
413            bail!(error);
414        }
415        Ok(())
416    }
417}
418
419/// Query service specific mutations.
420impl Transaction<Write> {
421    /// Delete a batch of data for pruning.
422    #[instrument(skip(self))]
423    pub(super) async fn delete_batch(&mut self, height: u64) -> anyhow::Result<()> {
424        // Delete payloads which are only referenced by the headers we're going to delete.
425        let res = query(
426            "WITH to_delete AS (
427                SELECT h.payload_hash, h.ns_table FROM header AS h
428                 WHERE (h.payload_hash, h.ns_table) IN (
429                    SELECT range.payload_hash, range.ns_table
430                      FROM header AS range
431                     WHERE range.height <= $1
432                 )
433                GROUP BY h.payload_hash, h.ns_table
434                HAVING count(*) <= 1
435            )
436            DELETE FROM payload AS p
437             WHERE (p.hash, p.ns_table) IN (SELECT * FROM to_delete)",
438        )
439        .bind(height as i64)
440        .execute(self.as_mut())
441        .await
442        .context("deleting payloads")?;
443        tracing::debug!(
444            rows_affected = res.rows_affected(),
445            "garbage collected payloads"
446        );
447
448        // Delete VID common which are only referenced by the headers we're going to delete.
449        let res = query(
450            "WITH to_delete AS (
451                SELECT h.payload_hash FROM header AS h
452                 WHERE h.payload_hash IN (
453                    SELECT range.payload_hash
454                      FROM header AS range
455                     WHERE range.height <= $1
456                 )
457                GROUP BY h.payload_hash
458                HAVING count(*) <= 1
459            )
460            DELETE FROM vid_common AS v
461             WHERE v.hash IN (SELECT * FROM to_delete)",
462        )
463        .bind(height as i64)
464        .execute(self.as_mut())
465        .await
466        .context("deleting VID common")?;
467        tracing::debug!(
468            rows_affected = res.rows_affected(),
469            "garbage collected VID common"
470        );
471
472        // Delete dependent tables individually before deleting headers.
473        let res = query("DELETE FROM transactions WHERE block_height <= $1")
474            .bind(height as i64)
475            .execute(self.as_mut())
476            .await
477            .context("deleting transactions")?;
478        tracing::debug!(rows_affected = res.rows_affected(), "pruned transactions");
479
480        let res = query("DELETE FROM leaf2 WHERE height <= $1")
481            .bind(height as i64)
482            .execute(self.as_mut())
483            .await
484            .context("deleting leaf2")?;
485        tracing::debug!(rows_affected = res.rows_affected(), "pruned leaf2");
486
487        let res = query("DELETE FROM header WHERE height <= $1")
488            .bind(height as i64)
489            .execute(self.as_mut())
490            .await
491            .context("deleting headers")?;
492        tracing::debug!(rows_affected = res.rows_affected(), "pruned headers");
493
494        Ok(())
495    }
496
497    /// Prune merklized state tables.
498    ///
499    /// Only deletes nodes having `created <= height` that are not the newest node at their position.
500    #[instrument(skip(self))]
501    pub(super) async fn delete_state_batch(
502        &mut self,
503        state_tables: Vec<String>,
504        height: u64,
505    ) -> anyhow::Result<()> {
506        for state_table in state_tables {
507            self.execute(
508                query(&format!(
509                    "
510                DELETE FROM {state_table}
511                WHERE {state_table}.created <= $1
512                  AND EXISTS (
513                    SELECT 1 FROM {state_table} AS t2
514                    WHERE t2.path = {state_table}.path
515                      AND t2.created > {state_table}.created
516                      AND t2.created <= $1
517                  )"
518                ))
519                .bind(height as i64),
520            )
521            .await?;
522        }
523
524        Ok(())
525    }
526
527    /// Record the height of the latest pruned header.
528    pub(crate) async fn save_pruned_height(&mut self, height: u64) -> anyhow::Result<()> {
529        // id is set to 1 so that there is only one row in the table.
530        // height is updated if the row already exists.
531        self.upsert(
532            "pruned_height",
533            ["id", "last_height"],
534            ["id"],
535            [(1i32, height as i64)],
536        )
537        .await
538    }
539}
540
541impl<Types> UpdateAvailabilityStorage<Types> for Transaction<Write>
542where
543    Types: NodeType,
544    Payload<Types>: QueryablePayload<Types>,
545    Header<Types>: QueryableHeader<Types>,
546{
547    async fn insert_qc_chain(
548        &mut self,
549        height: u64,
550        qc_chain: Option<[CertificatePair<Types>; 2]>,
551    ) -> anyhow::Result<()> {
552        let block_height = NodeStorage::<Types>::block_height(self).await? as u64;
553        if height + 1 >= block_height {
554            // If this QC chain is for the latest leaf we know about, store it so that we can prove
555            // to clients that the corresponding leaf is finalized. (If it is not the latest leaf,
556            // this is unnecessary, since we can prove it is an ancestor of some later, finalized
557            // leaf.)
558            let qcs = serde_json::to_value(&qc_chain)?;
559            self.upsert("latest_qc_chain", ["id", "qcs"], ["id"], [(1i32, qcs)])
560                .await
561                .context("inserting QC chain")?;
562        }
563
564        Ok(())
565    }
566
567    async fn insert_leaf_range<'a>(
568        &mut self,
569        leaves: impl Send + IntoIterator<IntoIter: Send, Item = &'a LeafQueryData<Types>>,
570    ) -> anyhow::Result<()> {
571        let leaves = leaves.into_iter();
572
573        // Ignore leaves below the pruned height.
574        let pruned_height = self.load_pruned_height().await?;
575        let leaves = leaves.skip_while(|leaf| pruned_height.is_some_and(|h| leaf.height() <= h));
576
577        // While we don't necessarily have the full block for these leaves yet, we can initialize
578        // the header and leaf tables with block metadata taken from the leaves.
579        let (header_rows, leaf_rows): (Vec<_>, Vec<_>) = leaves
580            .map(|leaf| {
581                let header_json = serde_json::to_value(leaf.leaf().block_header())
582                    .context("failed to serialize header")?;
583                let header_row = (
584                    leaf.height() as i64,
585                    leaf.block_hash().to_string(),
586                    leaf.leaf().block_header().payload_commitment().to_string(),
587                    leaf.leaf().block_header().ns_table(),
588                    header_json,
589                    leaf.leaf().block_header().timestamp() as i64,
590                );
591
592                let leaf_json =
593                    serde_json::to_value(leaf.leaf()).context("failed to serialize leaf")?;
594                let qc_json = serde_json::to_value(leaf.qc()).context("failed to serialize QC")?;
595                let leaf_row = (
596                    leaf.height() as i64,
597                    leaf.hash().to_string(),
598                    leaf.block_hash().to_string(),
599                    leaf_json,
600                    qc_json,
601                );
602
603                anyhow::Ok((header_row, leaf_row))
604            })
605            .process_results(|iter| iter.unzip())?;
606
607        self.upsert(
608            "header",
609            [
610                "height",
611                "hash",
612                "payload_hash",
613                "ns_table",
614                "data",
615                "timestamp",
616            ],
617            ["height"],
618            header_rows,
619        )
620        .await
621        .context("inserting headers")?;
622
623        // Insert the leaves themselves, which reference the header rows we created.
624        self.upsert(
625            "leaf2",
626            ["height", "hash", "block_hash", "leaf", "qc"],
627            ["height"],
628            leaf_rows,
629        )
630        .await
631        .context("inserting leaves")?;
632
633        Ok(())
634    }
635
636    async fn insert_block_range<'a>(
637        &mut self,
638        blocks: impl Send + IntoIterator<IntoIter: Send, Item = &'a BlockQueryData<Types>>,
639    ) -> anyhow::Result<()> {
640        let blocks = blocks.into_iter();
641
642        // Ignore blocks below the pruned height.
643        let pruned_height = self.load_pruned_height().await?;
644        let blocks = blocks.skip_while(|block| pruned_height.is_some_and(|h| block.height() <= h));
645
646        let (payload_rows, tx_rows): (Vec<_>, Vec<_>) = blocks
647            .map(|block| {
648                let payload_row = (
649                    block.payload_hash().to_string(),
650                    block.header().ns_table(),
651                    block.size() as i32,
652                    block.num_transactions() as i32,
653                    block.payload.encode().as_ref().to_vec(),
654                );
655
656                let tx_rows = block.enumerate().map(|(txn_ix, txn)| {
657                    let ns_id = block.header().namespace_id(&txn_ix.ns_index).unwrap();
658                    (
659                        txn.commit().to_string(),
660                        block.height() as i64,
661                        txn_ix.ns_index.into(),
662                        ns_id.into(),
663                        txn_ix.position as i64,
664                    )
665                });
666
667                (payload_row, tx_rows)
668            })
669            .unzip();
670        let tx_rows = tx_rows.into_iter().flatten().collect::<Vec<_>>();
671
672        self.upsert(
673            "payload",
674            ["hash", "ns_table", "size", "num_transactions", "data"],
675            ["hash", "ns_table"],
676            payload_rows,
677        )
678        .await
679        .context("inserting payloads")?;
680
681        // Index the transactions and namespaces in the block.
682        if !tx_rows.is_empty() {
683            self.upsert(
684                "transactions",
685                ["hash", "block_height", "ns_index", "ns_id", "position"],
686                ["block_height", "ns_id", "position"],
687                tx_rows,
688            )
689            .await
690            .context("inserting transactions")?;
691        }
692
693        Ok(())
694    }
695
696    async fn insert_vid_range<'a>(
697        &mut self,
698        vid: impl Send
699        + IntoIterator<
700            IntoIter: Send,
701            Item = (&'a VidCommonQueryData<Types>, Option<&'a VidShare>),
702        >,
703    ) -> anyhow::Result<()> {
704        let vid = vid.into_iter();
705
706        // Ignore objects below the pruned height.
707        let pruned_height = self.load_pruned_height().await?;
708        let vid = vid.skip_while(|(common, _)| pruned_height.is_some_and(|h| common.height() <= h));
709
710        let (common_rows, share_rows): (Vec<_>, Vec<_>) = vid
711            .map(|(common, share)| {
712                let common_data = bincode::serialize(common.common())
713                    .context("failed to serialize VID common data")?;
714                let common_row = (common.payload_hash().to_string(), common_data);
715
716                let share_row = if let Some(share) = share {
717                    let share_data =
718                        bincode::serialize(&share).context("failed to serialize VID share")?;
719                    Some((common.height() as i64, share_data))
720                } else {
721                    None
722                };
723
724                anyhow::Ok((common_row, share_row))
725            })
726            .process_results(|iter| iter.unzip())?;
727        let share_rows = share_rows.into_iter().flatten().collect::<Vec<_>>();
728
729        self.upsert("vid_common", ["hash", "data"], ["hash"], common_rows)
730            .await
731            .context("inserting VID common")?;
732
733        if !share_rows.is_empty() {
734            let mut q = QueryBuilder::new("WITH rows (height, share) AS (");
735            q.push_values(share_rows, |mut q, (height, share)| {
736                q.push_bind(height).push_bind(share);
737            });
738            q.push(
739                ") UPDATE header SET vid_share = rows.share
740                FROM rows
741                WHERE header.height = rows.height",
742            );
743            q.build()
744                .execute(self.as_mut())
745                .await
746                .context("inserting VID shares")?;
747        }
748
749        Ok(())
750    }
751}
752
753#[async_trait]
754impl<Types: NodeType, State: MerklizedState<Types, ARITY>, const ARITY: usize>
755    UpdateStateData<Types, State, ARITY> for Transaction<Write>
756{
757    async fn set_last_state_height(&mut self, height: usize) -> anyhow::Result<()> {
758        self.upsert(
759            "last_merklized_state_height",
760            ["id", "height"],
761            ["id"],
762            [(1i32, height as i64)],
763        )
764        .await?;
765
766        Ok(())
767    }
768
769    async fn insert_merkle_nodes(
770        &mut self,
771        proof: MerkleProof<State::Entry, State::Key, State::T, ARITY>,
772        traversal_path: Vec<usize>,
773        block_number: u64,
774    ) -> anyhow::Result<()> {
775        let proofs = vec![(proof, traversal_path)];
776        UpdateStateData::<Types, State, ARITY>::insert_merkle_nodes_batch(
777            self,
778            proofs,
779            block_number,
780        )
781        .await
782    }
783
784    async fn insert_merkle_nodes_batch(
785        &mut self,
786        proofs: Vec<(
787            MerkleProof<State::Entry, State::Key, State::T, ARITY>,
788            Vec<usize>,
789        )>,
790        block_number: u64,
791    ) -> anyhow::Result<()> {
792        if proofs.is_empty() {
793            return Ok(());
794        }
795
796        let name = State::state_type();
797        let block_number = block_number as i64;
798
799        let (mut all_nodes, all_hashes) = collect_nodes_from_proofs(&proofs)?;
800        let hashes: Vec<Vec<u8>> = all_hashes.into_iter().collect();
801
802        #[cfg(not(feature = "embedded-db"))]
803        let nodes_hash_ids: HashMap<Vec<u8>, i32> = batch_insert_hashes(hashes, self).await?;
804
805        #[cfg(feature = "embedded-db")]
806        let nodes_hash_ids: HashMap<Vec<u8>, i32> = {
807            let mut hash_ids: HashMap<Vec<u8>, i32> = HashMap::with_capacity(hashes.len());
808            for hash_chunk in hashes.chunks(20) {
809                let (query, sql) = build_hash_batch_insert(hash_chunk)?;
810                let chunk_ids: HashMap<Vec<u8>, i32> = query
811                    .query_as(&sql)
812                    .fetch(self.as_mut())
813                    .try_collect()
814                    .await?;
815                hash_ids.extend(chunk_ids);
816            }
817            hash_ids
818        };
819
820        for (node, children, hash) in &mut all_nodes {
821            node.created = block_number;
822            node.hash_id = *nodes_hash_ids.get(&*hash).ok_or(QueryError::Error {
823                message: "Missing node hash".to_string(),
824            })?;
825
826            if let Some(children) = children {
827                let children_hashes = children
828                    .iter()
829                    .map(|c| nodes_hash_ids.get(c).copied())
830                    .collect::<Option<Vec<i32>>>()
831                    .ok_or(QueryError::Error {
832                        message: "Missing child hash".to_string(),
833                    })?;
834
835                node.children = Some(children_hashes.into());
836            }
837        }
838
839        Node::upsert(name, all_nodes.into_iter().map(|(n, ..)| n), self).await?;
840
841        Ok(())
842    }
843}
844
845#[async_trait]
846impl<Mode: TransactionMode> PrunedHeightStorage for Transaction<Mode> {
847    async fn load_pruned_height(&mut self) -> anyhow::Result<Option<u64>> {
848        let Some((height,)) =
849            query_as::<(i64,)>("SELECT last_height FROM pruned_height ORDER BY id DESC LIMIT 1")
850                .fetch_optional(self.as_mut())
851                .await?
852        else {
853            return Ok(None);
854        };
855        Ok(Some(height as u64))
856    }
857}
858
859#[derive(Clone, Debug)]
860pub(super) struct PoolMetrics {
861    open_transactions: Box<dyn Gauge>,
862    transaction_durations: Box<dyn Histogram>,
863    commits: Box<dyn Counter>,
864    reverts: Box<dyn Counter>,
865    drops: Box<dyn Counter>,
866}
867
868impl PoolMetrics {
869    pub(super) fn new(metrics: &(impl Metrics + ?Sized)) -> Self {
870        Self {
871            open_transactions: metrics.create_gauge("open_transactions".into(), None),
872            transaction_durations: metrics
873                .create_histogram("transaction_duration".into(), Some("s".into())),
874            commits: metrics.create_counter("committed_transactions".into(), None),
875            reverts: metrics.create_counter("reverted_transactions".into(), None),
876            drops: metrics.create_counter("dropped_transactions".into(), None),
877        }
878    }
879}