1use std::{collections::HashMap, marker::PhantomData, time::Instant};
22
23use anyhow::{Context, bail};
24use async_trait::async_trait;
25use committable::Committable;
26use derive_more::{Deref, DerefMut};
27use futures::future::Future;
28#[cfg(feature = "embedded-db")]
29use futures::stream::TryStreamExt;
30use hotshot_types::{
31 data::VidShare,
32 simple_certificate::CertificatePair,
33 traits::{
34 EncodeBytes,
35 block_contents::BlockHeader,
36 metrics::{Counter, Gauge, Histogram, Metrics},
37 node_implementation::NodeType,
38 },
39};
40use itertools::Itertools;
41use jf_merkle_tree_compat::prelude::MerkleProof;
42pub use sqlx::Executor;
43use sqlx::{Encode, Execute, FromRow, QueryBuilder, Type, pool::Pool, query_builder::Separated};
44use tracing::instrument;
45
46#[cfg(not(feature = "embedded-db"))]
47use super::queries::state::batch_insert_hashes;
48#[cfg(feature = "embedded-db")]
49use super::queries::state::build_hash_batch_insert;
50use super::{
51 Database, Db,
52 queries::{
53 self,
54 state::{Node, collect_nodes_from_proofs},
55 },
56};
57use crate::{
58 Header, Payload, QueryError, QueryResult,
59 availability::{
60 BlockQueryData, Certificate2, LeafQueryData, QueryableHeader, QueryablePayload,
61 VidCommonQueryData,
62 },
63 data_source::{
64 storage::{NodeStorage, UpdateAvailabilityStorage, pruning::PrunedHeightStorage},
65 update,
66 },
67 merklized_state::{MerklizedState, UpdateStateData},
68 types::HeightIndexed,
69};
70
71#[cfg(not(feature = "embedded-db"))]
75static NO_DEFERRABLE_ON_READ: std::sync::atomic::AtomicBool =
76 std::sync::atomic::AtomicBool::new(false);
77
78#[cfg(not(feature = "embedded-db"))]
85pub fn set_no_deferrable_on_read(value: bool) {
86 NO_DEFERRABLE_ON_READ.store(value, std::sync::atomic::Ordering::Relaxed);
87}
88
89pub type Query<'q> = sqlx::query::Query<'q, Db, <Db as Database>::Arguments<'q>>;
90pub type QueryAs<'q, T> = sqlx::query::QueryAs<'q, Db, T, <Db as Database>::Arguments<'q>>;
91
92pub fn query(sql: &str) -> Query<'_> {
93 sqlx::query(sql)
94}
95
96pub fn query_as<'q, T>(sql: &'q str) -> QueryAs<'q, T>
97where
98 T: for<'r> FromRow<'r, <Db as Database>::Row>,
99{
100 sqlx::query_as(sql)
101}
102
103#[derive(Clone, Copy, Debug, Default)]
105pub struct Write;
106
107#[derive(Clone, Copy, Debug, Default)]
109pub struct Read;
110
111#[derive(Clone, Copy, Debug, Default)]
116pub struct Prune;
117
118pub trait TransactionMode: Send + Sync {
120 fn begin(
121 conn: &mut <Db as Database>::Connection,
122 ) -> impl Future<Output = anyhow::Result<()>> + Send;
123
124 fn display() -> &'static str;
125}
126
127impl TransactionMode for Write {
128 #[allow(unused_variables)]
129 async fn begin(conn: &mut <Db as Database>::Connection) -> anyhow::Result<()> {
130 #[cfg(feature = "embedded-db")]
159 conn.execute("UPDATE pruned_height SET id = id WHERE false")
160 .await?;
161
162 #[cfg(not(feature = "embedded-db"))]
165 conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
166 .await?;
167
168 Ok(())
169 }
170
171 fn display() -> &'static str {
172 "write"
173 }
174}
175
176impl TransactionMode for Prune {
177 #[allow(unused_variables)]
178 async fn begin(conn: &mut <Db as Database>::Connection) -> anyhow::Result<()> {
179 #[cfg(feature = "embedded-db")]
181 conn.execute("UPDATE pruned_height SET id = id WHERE false")
182 .await?;
183
184 #[cfg(not(feature = "embedded-db"))]
188 conn.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
189 .await?;
190
191 Ok(())
192 }
193
194 fn display() -> &'static str {
195 "prune"
196 }
197}
198
199impl TransactionMode for Read {
200 #[allow(unused_variables)]
201 async fn begin(conn: &mut <Db as Database>::Connection) -> anyhow::Result<()> {
202 #[cfg(not(feature = "embedded-db"))]
216 {
217 let sql = if NO_DEFERRABLE_ON_READ.load(std::sync::atomic::Ordering::Relaxed) {
218 "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY"
219 } else {
220 "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE"
221 };
222 conn.execute(sql).await?;
223 }
224
225 Ok(())
226 }
227
228 fn display() -> &'static str {
229 "read-only"
230 }
231}
232
233#[derive(Clone, Copy, Debug)]
234enum CloseType {
235 Commit,
236 Revert,
237 Drop,
238}
239
240#[derive(Debug)]
241struct TransactionMetricsGuard<Mode> {
242 started_at: Instant,
243 metrics: PoolMetrics,
244 close_type: CloseType,
245 _mode: PhantomData<Mode>,
246}
247
248impl<Mode: TransactionMode> TransactionMetricsGuard<Mode> {
249 fn begin(metrics: PoolMetrics) -> Self {
250 let started_at = Instant::now();
251 tracing::trace!(mode = Mode::display(), ?started_at, "begin");
252 metrics.open_transactions.update(1);
253
254 Self {
255 started_at,
256 metrics,
257 close_type: CloseType::Drop,
258 _mode: Default::default(),
259 }
260 }
261
262 fn set_closed(&mut self, t: CloseType) {
263 self.close_type = t;
264 }
265}
266
267impl<Mode> Drop for TransactionMetricsGuard<Mode> {
268 fn drop(&mut self) {
269 self.metrics
270 .transaction_durations
271 .add_point((self.started_at.elapsed().as_millis() as f64) / 1000.);
272 self.metrics.open_transactions.update(-1);
273 match self.close_type {
274 CloseType::Commit => self.metrics.commits.add(1),
275 CloseType::Revert => self.metrics.reverts.add(1),
276 CloseType::Drop => self.metrics.drops.add(1),
277 }
278 tracing::trace!(started_at = ?self.started_at, reason = ?self.close_type, "close");
279 }
280}
281
282#[derive(Debug, Deref, DerefMut)]
284pub struct Transaction<Mode> {
285 #[deref]
286 #[deref_mut]
287 inner: sqlx::Transaction<'static, Db>,
288 metrics: TransactionMetricsGuard<Mode>,
289}
290
291impl<Mode: TransactionMode> Transaction<Mode> {
292 pub(super) async fn new(pool: &Pool<Db>, metrics: PoolMetrics) -> anyhow::Result<Self> {
293 let mut inner = pool.begin().await?;
294 let metrics = TransactionMetricsGuard::begin(metrics);
295 Mode::begin(inner.as_mut()).await?;
296 Ok(Self { inner, metrics })
297 }
298}
299
300impl<Mode: TransactionMode> update::Transaction for Transaction<Mode> {
301 async fn commit(mut self) -> anyhow::Result<()> {
302 self.inner.commit().await?;
303 self.metrics.set_closed(CloseType::Commit);
304 Ok(())
305 }
306 fn revert(mut self) -> impl Future + Send {
307 async move {
308 self.inner.rollback().await.unwrap();
309 self.metrics.set_closed(CloseType::Revert);
310 }
311 }
312}
313
314pub trait Params<'p> {
344 fn bind<'q, 'r>(
345 self,
346 q: &'q mut Separated<'r, 'p, Db, &'static str>,
347 ) -> &'q mut Separated<'r, 'p, Db, &'static str>
348 where
349 'p: 'r;
350}
351
352pub trait FixedLengthParams<'p, const N: usize>: Params<'p> {}
358
359macro_rules! impl_tuple_params {
360 ($n:literal, ($($t:ident,)+)) => {
361 impl<'p, $($t),+> Params<'p> for ($($t,)+)
362 where $(
363 $t: 'p + Encode<'p, Db> + Type<Db>
364 ),+{
365 fn bind<'q, 'r>(self, q: &'q mut Separated<'r, 'p, Db, &'static str>) -> &'q mut Separated<'r, 'p, Db, &'static str>
366 where 'p: 'r,
367 {
368 #[allow(non_snake_case)]
369 let ($($t,)+) = self;
370 q $(
371 .push_bind($t)
372 )+
373
374 }
375 }
376
377 impl<'p, $($t),+> FixedLengthParams<'p, $n> for ($($t,)+)
378 where $(
379 $t: 'p + for<'q> Encode<'q, Db> + Type<Db>
380 ),+ {
381 }
382 };
383}
384
385impl_tuple_params!(1, (T,));
386impl_tuple_params!(2, (T1, T2,));
387impl_tuple_params!(3, (T1, T2, T3,));
388impl_tuple_params!(4, (T1, T2, T3, T4,));
389impl_tuple_params!(5, (T1, T2, T3, T4, T5,));
390impl_tuple_params!(6, (T1, T2, T3, T4, T5, T6,));
391impl_tuple_params!(7, (T1, T2, T3, T4, T5, T6, T7,));
392impl_tuple_params!(8, (T1, T2, T3, T4, T5, T6, T7, T8,));
393
394pub fn build_where_in<'a, I>(
395 query: &'a str,
396 column: &'a str,
397 values: I,
398) -> QueryResult<(queries::QueryBuilder<'a>, String)>
399where
400 I: IntoIterator,
401 I::Item: 'a + Encode<'a, Db> + Type<Db>,
402{
403 let mut builder = queries::QueryBuilder::default();
404 let params = values
405 .into_iter()
406 .map(|v| Ok(format!("{} ", builder.bind(v)?)))
407 .collect::<QueryResult<Vec<String>>>()?;
408
409 if params.is_empty() {
410 return Err(QueryError::Error {
411 message: "failed to build WHERE IN query. No parameter found ".to_string(),
412 });
413 }
414
415 let sql = format!(
416 "{query} where {column} IN ({}) ",
417 params.into_iter().join(",")
418 );
419
420 Ok((builder, sql))
421}
422
423impl Transaction<Write> {
425 const STATEMENT_MAX_PARAMETERS: usize = 30_000;
430
431 pub async fn upsert<'p, const N: usize, R>(
432 &mut self,
433 table: &str,
434 columns: [&str; N],
435 pk: impl IntoIterator<Item = &str>,
436 rows: R,
437 ) -> anyhow::Result<()>
438 where
439 R: IntoIterator,
440 R::Item: 'p + FixedLengthParams<'p, N>,
441 {
442 let set_columns = columns
443 .iter()
444 .map(|col| format!("{col} = excluded.{col}"))
445 .join(",");
446
447 let columns_str = columns.iter().map(|col| format!("\"{col}\"")).join(",");
448
449 let pk = pk.into_iter().join(",");
450
451 let rows: Vec<_> = rows.into_iter().collect();
452 let num_rows = rows.len();
453
454 if num_rows == 0 {
455 tracing::warn!("trying to upsert 0 rows into {table}, this has no effect");
456 return Ok(());
457 }
458
459 let rows_per_chunk = Self::STATEMENT_MAX_PARAMETERS / N;
462 let mut rows = rows.into_iter();
463 loop {
464 let chunk = rows.by_ref().take(rows_per_chunk).collect::<Vec<_>>();
465 if chunk.is_empty() {
466 break;
467 }
468 let num_rows = chunk.len();
469 tracing::debug!(num_rows, "upsert chunk");
470
471 let mut query_builder =
472 QueryBuilder::new(format!("INSERT INTO \"{table}\" ({columns_str}) "));
473 query_builder.push_values(chunk, |mut b, row| {
474 row.bind(&mut b);
475 });
476 query_builder.push(format!(" ON CONFLICT ({pk}) DO UPDATE SET {set_columns}"));
477
478 let query = query_builder.build();
479 let statement = query.sql();
480
481 let res = self.execute(query).await.inspect_err(|err| {
482 tracing::error!(statement, "error in statement execution: {err:#}");
483 })?;
484 let rows_modified = res.rows_affected() as usize;
485 if rows_modified != num_rows {
486 let error = format!(
487 "unexpected number of rows modified: expected {num_rows}, got \
488 {rows_modified}. query: {statement}"
489 );
490 tracing::error!(error);
491 bail!(error);
492 }
493 }
494 Ok(())
495 }
496}
497
498impl Transaction<Prune> {
500 #[instrument(skip(self))]
508 pub(super) async fn delete_batch(&mut self, height: u64) -> anyhow::Result<()> {
509 let res = query("DELETE FROM transactions WHERE block_height <= $1")
510 .bind(height as i64)
511 .execute(self.as_mut())
512 .await
513 .context("deleting transactions")?;
514 tracing::debug!(rows_affected = res.rows_affected(), "pruned transactions");
515
516 let res = query("DELETE FROM leaf2 WHERE height <= $1")
517 .bind(height as i64)
518 .execute(self.as_mut())
519 .await
520 .context("deleting leaf2")?;
521 tracing::debug!(rows_affected = res.rows_affected(), "pruned leaf2");
522
523 let res = query("DELETE FROM header WHERE height <= $1")
524 .bind(height as i64)
525 .execute(self.as_mut())
526 .await
527 .context("deleting headers")?;
528 tracing::debug!(rows_affected = res.rows_affected(), "pruned headers");
529
530 let res = query(
531 "DELETE FROM payload AS p
532 WHERE NOT EXISTS (
533 SELECT 1 FROM header AS h
534 WHERE h.payload_hash = p.hash AND h.ns_table = p.ns_table
535 )",
536 )
537 .execute(self.as_mut())
538 .await
539 .context("garbage collecting payloads")?;
540 tracing::debug!(
541 rows_affected = res.rows_affected(),
542 "garbage collected payloads"
543 );
544
545 let res = query(
546 "DELETE FROM vid_common AS v
547 WHERE NOT EXISTS (
548 SELECT 1 FROM header AS h
549 WHERE h.payload_hash = v.hash
550 )",
551 )
552 .execute(self.as_mut())
553 .await
554 .context("garbage collecting VID common")?;
555 tracing::debug!(
556 rows_affected = res.rows_affected(),
557 "garbage collected VID common"
558 );
559
560 Ok(())
561 }
562
563 #[instrument(skip(self))]
567 pub(super) async fn delete_state_batch(
568 &mut self,
569 state_tables: Vec<String>,
570 height: u64,
571 ) -> anyhow::Result<()> {
572 for state_table in state_tables {
573 self.execute(
574 query(&format!(
575 "
576 DELETE FROM {state_table}
577 WHERE {state_table}.created <= $1
578 AND EXISTS (
579 SELECT 1 FROM {state_table} AS t2
580 WHERE t2.path = {state_table}.path
581 AND t2.created > {state_table}.created
582 AND t2.created <= $1
583 )"
584 ))
585 .bind(height as i64),
586 )
587 .await?;
588 }
589
590 Ok(())
591 }
592}
593
594impl Transaction<Write> {
596 pub(crate) async fn save_pruned_height(&mut self, height: u64) -> anyhow::Result<()> {
598 self.upsert(
601 "pruned_height",
602 ["id", "last_height"],
603 ["id"],
604 [(1i32, height as i64)],
605 )
606 .await
607 }
608}
609
610impl<Types> UpdateAvailabilityStorage<Types> for Transaction<Write>
611where
612 Types: NodeType,
613 Payload<Types>: QueryablePayload<Types>,
614 Header<Types>: QueryableHeader<Types>,
615{
616 async fn insert_qc_chain(
617 &mut self,
618 height: u64,
619 qc_chain: Option<[CertificatePair<Types>; 2]>,
620 ) -> anyhow::Result<()> {
621 let block_height = NodeStorage::<Types>::block_height(self).await? as u64;
622 if height + 1 >= block_height {
623 let qcs = serde_json::to_value(&qc_chain)?;
628 self.upsert("latest_qc_chain", ["id", "qcs"], ["id"], [(1i32, qcs)])
629 .await
630 .context("inserting QC chain")?;
631 }
632
633 Ok(())
634 }
635
636 async fn insert_cert2(
637 &mut self,
638 height: u64,
639 cert2: Certificate2<Types>,
640 ) -> anyhow::Result<()> {
641 let cert2_json = serde_json::to_value(&cert2)?;
642 self.upsert(
643 "cert2",
644 ["height", "data"],
645 ["height"],
646 [(height as i64, cert2_json)],
647 )
648 .await
649 .context("inserting cert2")?;
650 Ok(())
651 }
652
653 async fn insert_leaf_range<'a>(
654 &mut self,
655 leaves: impl Send + IntoIterator<IntoIter: Send, Item = &'a LeafQueryData<Types>>,
656 ) -> anyhow::Result<()> {
657 let leaves = leaves.into_iter();
658
659 let pruned_height = self.load_pruned_height().await?;
661 let leaves = leaves.skip_while(|leaf| pruned_height.is_some_and(|h| leaf.height() <= h));
662
663 let (header_rows, leaf_rows): (Vec<_>, Vec<_>) = leaves
666 .map(|leaf| {
667 let header_json = serde_json::to_value(leaf.leaf().block_header())
668 .context("failed to serialize header")?;
669 let header_row = (
670 leaf.height() as i64,
671 leaf.block_hash().to_string(),
672 leaf.leaf().block_header().payload_commitment().to_string(),
673 leaf.leaf().block_header().ns_table(),
674 header_json,
675 leaf.leaf().block_header().timestamp() as i64,
676 );
677
678 let leaf_json =
679 serde_json::to_value(leaf.leaf()).context("failed to serialize leaf")?;
680 let qc_json = serde_json::to_value(leaf.qc()).context("failed to serialize QC")?;
681 let leaf_row = (
682 leaf.height() as i64,
683 leaf.hash().to_string(),
684 leaf.block_hash().to_string(),
685 leaf_json,
686 qc_json,
687 );
688
689 anyhow::Ok((header_row, leaf_row))
690 })
691 .process_results(|iter| iter.unzip())?;
692
693 self.upsert(
694 "header",
695 [
696 "height",
697 "hash",
698 "payload_hash",
699 "ns_table",
700 "data",
701 "timestamp",
702 ],
703 ["height"],
704 header_rows,
705 )
706 .await
707 .context("inserting headers")?;
708
709 self.upsert(
711 "leaf2",
712 ["height", "hash", "block_hash", "leaf", "qc"],
713 ["height"],
714 leaf_rows,
715 )
716 .await
717 .context("inserting leaves")?;
718
719 Ok(())
720 }
721
722 async fn insert_block_range<'a>(
723 &mut self,
724 blocks: impl Send + IntoIterator<IntoIter: Send, Item = &'a BlockQueryData<Types>>,
725 ) -> anyhow::Result<()> {
726 let blocks = blocks.into_iter();
727
728 let pruned_height = self.load_pruned_height().await?;
730 let blocks = blocks.skip_while(|block| pruned_height.is_some_and(|h| block.height() <= h));
731
732 let (payload_rows, tx_rows): (Vec<_>, Vec<_>) = blocks
733 .map(|block| {
734 let payload_row = (
735 block.payload_hash().to_string(),
736 block.header().ns_table(),
737 block.size() as i32,
738 block.num_transactions() as i32,
739 block.payload.encode().as_ref().to_vec(),
740 );
741
742 let tx_rows = block.enumerate().map(|(txn_ix, txn)| {
743 let ns_id = block.header().namespace_id(&txn_ix.ns_index).unwrap();
744 (
745 txn.commit().to_string(),
746 block.height() as i64,
747 txn_ix.ns_index.into(),
748 ns_id.into(),
749 txn_ix.position as i64,
750 )
751 });
752
753 (payload_row, tx_rows)
754 })
755 .unzip();
756 let tx_rows = tx_rows.into_iter().flatten().collect::<Vec<_>>();
757
758 let payload_rows = payload_rows
761 .into_iter()
762 .unique_by(|(hash, ns_table, ..)| (hash.clone(), ns_table.clone()));
763
764 self.upsert(
765 "payload",
766 ["hash", "ns_table", "size", "num_transactions", "data"],
767 ["hash", "ns_table"],
768 payload_rows,
769 )
770 .await
771 .context("inserting payloads")?;
772
773 if !tx_rows.is_empty() {
775 self.upsert(
776 "transactions",
777 ["hash", "block_height", "ns_index", "ns_id", "position"],
778 ["block_height", "ns_id", "position"],
779 tx_rows,
780 )
781 .await
782 .context("inserting transactions")?;
783 }
784
785 Ok(())
786 }
787
788 async fn insert_vid_range<'a>(
789 &mut self,
790 vid: impl Send
791 + IntoIterator<
792 IntoIter: Send,
793 Item = (&'a VidCommonQueryData<Types>, Option<&'a VidShare>),
794 >,
795 ) -> anyhow::Result<()> {
796 let vid = vid.into_iter();
797
798 let pruned_height = self.load_pruned_height().await?;
800 let vid = vid.skip_while(|(common, _)| pruned_height.is_some_and(|h| common.height() <= h));
801
802 let (common_rows, share_rows): (Vec<_>, Vec<_>) = vid
803 .map(|(common, share)| {
804 let common_data = bincode::serialize(common.common())
805 .context("failed to serialize VID common data")?;
806 let common_row = (common.payload_hash().to_string(), common_data);
807
808 let share_row = if let Some(share) = share {
809 let share_data =
810 bincode::serialize(&share).context("failed to serialize VID share")?;
811 Some((common.height() as i64, share_data))
812 } else {
813 None
814 };
815
816 anyhow::Ok((common_row, share_row))
817 })
818 .process_results(|iter| iter.unzip())?;
819 let share_rows = share_rows.into_iter().flatten().collect::<Vec<_>>();
820
821 let common_rows = common_rows.into_iter().unique_by(|(hash, ..)| hash.clone());
824
825 self.upsert("vid_common", ["hash", "data"], ["hash"], common_rows)
826 .await
827 .context("inserting VID common")?;
828
829 if !share_rows.is_empty() {
830 let mut q = QueryBuilder::new("WITH rows (height, share) AS (");
831 q.push_values(share_rows, |mut q, (height, share)| {
832 q.push_bind(height).push_bind(share);
833 });
834 q.push(
835 ") UPDATE header SET vid_share = rows.share
836 FROM rows
837 WHERE header.height = rows.height",
838 );
839 q.build()
840 .execute(self.as_mut())
841 .await
842 .context("inserting VID shares")?;
843 }
844
845 Ok(())
846 }
847}
848
849#[async_trait]
850impl<Types: NodeType, State: MerklizedState<Types, ARITY>, const ARITY: usize>
851 UpdateStateData<Types, State, ARITY> for Transaction<Write>
852{
853 async fn set_last_state_height(&mut self, height: usize) -> anyhow::Result<()> {
854 self.upsert(
855 "last_merklized_state_height",
856 ["id", "height"],
857 ["id"],
858 [(1i32, height as i64)],
859 )
860 .await?;
861
862 Ok(())
863 }
864
865 async fn insert_merkle_nodes(
866 &mut self,
867 proof: MerkleProof<State::Entry, State::Key, State::T, ARITY>,
868 traversal_path: Vec<usize>,
869 block_number: u64,
870 ) -> anyhow::Result<()> {
871 let proofs = vec![(proof, traversal_path)];
872 UpdateStateData::<Types, State, ARITY>::insert_merkle_nodes_batch(
873 self,
874 proofs,
875 block_number,
876 )
877 .await
878 }
879
880 async fn insert_merkle_nodes_batch(
881 &mut self,
882 proofs: Vec<(
883 MerkleProof<State::Entry, State::Key, State::T, ARITY>,
884 Vec<usize>,
885 )>,
886 block_number: u64,
887 ) -> anyhow::Result<()> {
888 if proofs.is_empty() {
889 return Ok(());
890 }
891
892 let name = State::state_type();
893 let block_number = block_number as i64;
894
895 let (mut all_nodes, all_hashes) = collect_nodes_from_proofs(&proofs)?;
896 let hashes: Vec<Vec<u8>> = all_hashes.into_iter().collect();
897
898 #[cfg(not(feature = "embedded-db"))]
899 let nodes_hash_ids: HashMap<Vec<u8>, i32> = batch_insert_hashes(hashes, self).await?;
900
901 #[cfg(feature = "embedded-db")]
902 let nodes_hash_ids: HashMap<Vec<u8>, i32> = {
903 let mut hash_ids: HashMap<Vec<u8>, i32> = HashMap::with_capacity(hashes.len());
904 for hash_chunk in hashes.chunks(20) {
905 let (query, sql) = build_hash_batch_insert(hash_chunk)?;
906 let chunk_ids: HashMap<Vec<u8>, i32> = query
907 .query_as(&sql)
908 .fetch(self.as_mut())
909 .try_collect()
910 .await?;
911 hash_ids.extend(chunk_ids);
912 }
913 hash_ids
914 };
915
916 for (node, children, hash) in &mut all_nodes {
917 node.created = block_number;
918 node.hash_id = *nodes_hash_ids.get(&*hash).ok_or(QueryError::Error {
919 message: "Missing node hash".to_string(),
920 })?;
921
922 if let Some(children) = children {
923 let children_hashes = children
924 .iter()
925 .map(|c| nodes_hash_ids.get(c).copied())
926 .collect::<Option<Vec<i32>>>()
927 .ok_or(QueryError::Error {
928 message: "Missing child hash".to_string(),
929 })?;
930
931 node.children = Some(children_hashes.into());
932 }
933 }
934
935 Node::upsert(name, all_nodes.into_iter().map(|(n, ..)| n), self).await?;
936
937 Ok(())
938 }
939}
940
941#[async_trait]
942impl<Mode: TransactionMode> PrunedHeightStorage for Transaction<Mode> {
943 async fn load_pruned_height(&mut self) -> anyhow::Result<Option<u64>> {
944 let Some((height,)) =
945 query_as::<(i64,)>("SELECT last_height FROM pruned_height ORDER BY id DESC LIMIT 1")
946 .fetch_optional(self.as_mut())
947 .await?
948 else {
949 return Ok(None);
950 };
951 Ok(Some(height as u64))
952 }
953}
954
955#[derive(Clone, Debug)]
956pub(super) struct PoolMetrics {
957 open_transactions: Box<dyn Gauge>,
958 transaction_durations: Box<dyn Histogram>,
959 commits: Box<dyn Counter>,
960 reverts: Box<dyn Counter>,
961 drops: Box<dyn Counter>,
962}
963
964impl PoolMetrics {
965 pub(super) fn new(metrics: &(impl Metrics + ?Sized)) -> Self {
966 Self {
967 open_transactions: metrics.create_gauge("open_transactions".into(), None),
968 transaction_durations: metrics
969 .create_histogram("transaction_duration".into(), Some("s".into())),
970 commits: metrics.create_counter("committed_transactions".into(), None),
971 reverts: metrics.create_counter("reverted_transactions".into(), None),
972 drops: metrics.create_counter("dropped_transactions".into(), None),
973 }
974 }
975}
976
977#[cfg(test)]
978mod test {
979 use super::*;
980 use crate::data_source::{
981 Transaction as _, VersionedDataSource,
982 sql::testing::TmpDb,
983 storage::{SqlStorage, StorageConnectionType},
984 };
985
986 #[tokio::test]
987 #[test_log::test]
988 async fn test_upsert_many_rows() {
989 let db = TmpDb::init().await;
990 let storage = SqlStorage::connect(db.config(), StorageConnectionType::Sequencer)
991 .await
992 .unwrap();
993
994 let mut tx = storage.write().await.unwrap();
995 query(
996 "CREATE TABLE test (
997 a INT PRIMARY KEY,
998 b INT,
999 c INT
1000 )",
1001 )
1002 .execute(tx.as_mut())
1003 .await
1004 .unwrap();
1005 tx.commit().await.unwrap();
1006
1007 let n = (2 * Transaction::STATEMENT_MAX_PARAMETERS
1009 + (Transaction::STATEMENT_MAX_PARAMETERS / 2)) as i32;
1010 let rows = (0..n).map(|i| (i, i, i)).collect::<Vec<_>>();
1011
1012 let mut tx = storage.write().await.unwrap();
1013 tx.upsert("test", ["a", "b", "c"], ["a"], rows.clone())
1014 .await
1015 .unwrap();
1016 tx.commit().await.unwrap();
1017
1018 let mut tx = storage.read().await.unwrap();
1019 assert_eq!(
1020 rows,
1021 query_as("SELECT * FROM test ORDER BY a")
1022 .fetch_all(tx.as_mut())
1023 .await
1024 .unwrap()
1025 );
1026 }
1027}