1use std::{collections::HashMap, marker::PhantomData, time::Instant};
22
23use anyhow::{Context, bail};
24use async_trait::async_trait;
25use committable::Committable;
26use derive_more::{Deref, DerefMut};
27use futures::future::Future;
28#[cfg(feature = "embedded-db")]
29use futures::stream::TryStreamExt;
30use hotshot_types::{
31 data::VidShare,
32 simple_certificate::CertificatePair,
33 traits::{
34 EncodeBytes,
35 block_contents::BlockHeader,
36 metrics::{Counter, Gauge, Histogram, Metrics},
37 node_implementation::NodeType,
38 },
39};
40use itertools::Itertools;
41use jf_merkle_tree_compat::prelude::MerkleProof;
42pub use sqlx::Executor;
43use sqlx::{Encode, Execute, FromRow, QueryBuilder, Type, pool::Pool, query_builder::Separated};
44use tracing::instrument;
45
46#[cfg(not(feature = "embedded-db"))]
47use super::queries::state::batch_insert_hashes;
48#[cfg(feature = "embedded-db")]
49use super::queries::state::build_hash_batch_insert;
50use super::{
51 Database, Db,
52 queries::{
53 self,
54 state::{Node, collect_nodes_from_proofs},
55 },
56};
57use crate::{
58 Header, Payload, QueryError, QueryResult,
59 availability::{
60 BlockQueryData, LeafQueryData, QueryableHeader, QueryablePayload, VidCommonQueryData,
61 },
62 data_source::{
63 storage::{NodeStorage, UpdateAvailabilityStorage, pruning::PrunedHeightStorage},
64 update,
65 },
66 merklized_state::{MerklizedState, UpdateStateData},
67 types::HeightIndexed,
68};
69
70pub type Query<'q> = sqlx::query::Query<'q, Db, <Db as Database>::Arguments<'q>>;
71pub type QueryAs<'q, T> = sqlx::query::QueryAs<'q, Db, T, <Db as Database>::Arguments<'q>>;
72
73pub fn query(sql: &str) -> Query<'_> {
74 sqlx::query(sql)
75}
76
77pub fn query_as<'q, T>(sql: &'q str) -> QueryAs<'q, T>
78where
79 T: for<'r> FromRow<'r, <Db as Database>::Row>,
80{
81 sqlx::query_as(sql)
82}
83
84#[derive(Clone, Copy, Debug, Default)]
86pub struct Write;
87
88#[derive(Clone, Copy, Debug, Default)]
90pub struct Read;
91
92pub trait TransactionMode: Send + Sync {
94 fn begin(
95 conn: &mut <Db as Database>::Connection,
96 ) -> impl Future<Output = anyhow::Result<()>> + Send;
97
98 fn display() -> &'static str;
99}
100
101impl TransactionMode for Write {
102 #[allow(unused_variables)]
103 async fn begin(conn: &mut <Db as Database>::Connection) -> anyhow::Result<()> {
104 #[cfg(feature = "embedded-db")]
133 conn.execute("UPDATE pruned_height SET id = id WHERE false")
134 .await?;
135
136 #[cfg(not(feature = "embedded-db"))]
139 conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
140 .await?;
141
142 Ok(())
143 }
144
145 fn display() -> &'static str {
146 "write"
147 }
148}
149
150impl TransactionMode for Read {
151 #[allow(unused_variables)]
152 async fn begin(conn: &mut <Db as Database>::Connection) -> anyhow::Result<()> {
153 #[cfg(not(feature = "embedded-db"))]
162 conn.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE")
163 .await?;
164
165 Ok(())
166 }
167
168 fn display() -> &'static str {
169 "read-only"
170 }
171}
172
173#[derive(Clone, Copy, Debug)]
174enum CloseType {
175 Commit,
176 Revert,
177 Drop,
178}
179
180#[derive(Debug)]
181struct TransactionMetricsGuard<Mode> {
182 started_at: Instant,
183 metrics: PoolMetrics,
184 close_type: CloseType,
185 _mode: PhantomData<Mode>,
186}
187
188impl<Mode: TransactionMode> TransactionMetricsGuard<Mode> {
189 fn begin(metrics: PoolMetrics) -> Self {
190 let started_at = Instant::now();
191 tracing::trace!(mode = Mode::display(), ?started_at, "begin");
192 metrics.open_transactions.update(1);
193
194 Self {
195 started_at,
196 metrics,
197 close_type: CloseType::Drop,
198 _mode: Default::default(),
199 }
200 }
201
202 fn set_closed(&mut self, t: CloseType) {
203 self.close_type = t;
204 }
205}
206
207impl<Mode> Drop for TransactionMetricsGuard<Mode> {
208 fn drop(&mut self) {
209 self.metrics
210 .transaction_durations
211 .add_point((self.started_at.elapsed().as_millis() as f64) / 1000.);
212 self.metrics.open_transactions.update(-1);
213 match self.close_type {
214 CloseType::Commit => self.metrics.commits.add(1),
215 CloseType::Revert => self.metrics.reverts.add(1),
216 CloseType::Drop => self.metrics.drops.add(1),
217 }
218 tracing::trace!(started_at = ?self.started_at, reason = ?self.close_type, "close");
219 }
220}
221
222#[derive(Debug, Deref, DerefMut)]
224pub struct Transaction<Mode> {
225 #[deref]
226 #[deref_mut]
227 inner: sqlx::Transaction<'static, Db>,
228 metrics: TransactionMetricsGuard<Mode>,
229}
230
231impl<Mode: TransactionMode> Transaction<Mode> {
232 pub(super) async fn new(pool: &Pool<Db>, metrics: PoolMetrics) -> anyhow::Result<Self> {
233 let mut inner = pool.begin().await?;
234 let metrics = TransactionMetricsGuard::begin(metrics);
235 Mode::begin(inner.as_mut()).await?;
236 Ok(Self { inner, metrics })
237 }
238}
239
240impl<Mode: TransactionMode> update::Transaction for Transaction<Mode> {
241 async fn commit(mut self) -> anyhow::Result<()> {
242 self.inner.commit().await?;
243 self.metrics.set_closed(CloseType::Commit);
244 Ok(())
245 }
246 fn revert(mut self) -> impl Future + Send {
247 async move {
248 self.inner.rollback().await.unwrap();
249 self.metrics.set_closed(CloseType::Revert);
250 }
251 }
252}
253
254pub trait Params<'p> {
284 fn bind<'q, 'r>(
285 self,
286 q: &'q mut Separated<'r, 'p, Db, &'static str>,
287 ) -> &'q mut Separated<'r, 'p, Db, &'static str>
288 where
289 'p: 'r;
290}
291
292pub trait FixedLengthParams<'p, const N: usize>: Params<'p> {}
298
299macro_rules! impl_tuple_params {
300 ($n:literal, ($($t:ident,)+)) => {
301 impl<'p, $($t),+> Params<'p> for ($($t,)+)
302 where $(
303 $t: 'p + Encode<'p, Db> + Type<Db>
304 ),+{
305 fn bind<'q, 'r>(self, q: &'q mut Separated<'r, 'p, Db, &'static str>) -> &'q mut Separated<'r, 'p, Db, &'static str>
306 where 'p: 'r,
307 {
308 #[allow(non_snake_case)]
309 let ($($t,)+) = self;
310 q $(
311 .push_bind($t)
312 )+
313
314 }
315 }
316
317 impl<'p, $($t),+> FixedLengthParams<'p, $n> for ($($t,)+)
318 where $(
319 $t: 'p + for<'q> Encode<'q, Db> + Type<Db>
320 ),+ {
321 }
322 };
323}
324
325impl_tuple_params!(1, (T,));
326impl_tuple_params!(2, (T1, T2,));
327impl_tuple_params!(3, (T1, T2, T3,));
328impl_tuple_params!(4, (T1, T2, T3, T4,));
329impl_tuple_params!(5, (T1, T2, T3, T4, T5,));
330impl_tuple_params!(6, (T1, T2, T3, T4, T5, T6,));
331impl_tuple_params!(7, (T1, T2, T3, T4, T5, T6, T7,));
332impl_tuple_params!(8, (T1, T2, T3, T4, T5, T6, T7, T8,));
333
334pub fn build_where_in<'a, I>(
335 query: &'a str,
336 column: &'a str,
337 values: I,
338) -> QueryResult<(queries::QueryBuilder<'a>, String)>
339where
340 I: IntoIterator,
341 I::Item: 'a + Encode<'a, Db> + Type<Db>,
342{
343 let mut builder = queries::QueryBuilder::default();
344 let params = values
345 .into_iter()
346 .map(|v| Ok(format!("{} ", builder.bind(v)?)))
347 .collect::<QueryResult<Vec<String>>>()?;
348
349 if params.is_empty() {
350 return Err(QueryError::Error {
351 message: "failed to build WHERE IN query. No parameter found ".to_string(),
352 });
353 }
354
355 let sql = format!(
356 "{query} where {column} IN ({}) ",
357 params.into_iter().join(",")
358 );
359
360 Ok((builder, sql))
361}
362
363impl Transaction<Write> {
365 pub async fn upsert<'p, const N: usize, R>(
366 &mut self,
367 table: &str,
368 columns: [&str; N],
369 pk: impl IntoIterator<Item = &str>,
370 rows: R,
371 ) -> anyhow::Result<()>
372 where
373 R: IntoIterator,
374 R::Item: 'p + FixedLengthParams<'p, N>,
375 {
376 let set_columns = columns
377 .iter()
378 .map(|col| format!("{col} = excluded.{col}"))
379 .join(",");
380
381 let columns_str = columns.iter().map(|col| format!("\"{col}\"")).join(",");
382
383 let pk = pk.into_iter().join(",");
384
385 let rows: Vec<_> = rows.into_iter().collect();
386 let num_rows = rows.len();
387
388 if num_rows == 0 {
389 tracing::warn!("trying to upsert 0 rows into {table}, this has no effect");
390 return Ok(());
391 }
392
393 let mut query_builder =
394 QueryBuilder::new(format!("INSERT INTO \"{table}\" ({columns_str}) "));
395 query_builder.push_values(rows, |mut b, row| {
396 row.bind(&mut b);
397 });
398 query_builder.push(format!(" ON CONFLICT ({pk}) DO UPDATE SET {set_columns}"));
399
400 let query = query_builder.build();
401 let statement = query.sql();
402
403 let res = self.execute(query).await.inspect_err(|err| {
404 tracing::error!(statement, "error in statement execution: {err:#}");
405 })?;
406 let rows_modified = res.rows_affected() as usize;
407 if rows_modified != num_rows {
408 let error = format!(
409 "unexpected number of rows modified: expected {num_rows}, got {rows_modified}. \
410 query: {statement}"
411 );
412 tracing::error!(error);
413 bail!(error);
414 }
415 Ok(())
416 }
417}
418
419impl Transaction<Write> {
421 #[instrument(skip(self))]
423 pub(super) async fn delete_batch(&mut self, height: u64) -> anyhow::Result<()> {
424 let res = query(
426 "WITH to_delete AS (
427 SELECT h.payload_hash, h.ns_table FROM header AS h
428 WHERE (h.payload_hash, h.ns_table) IN (
429 SELECT range.payload_hash, range.ns_table
430 FROM header AS range
431 WHERE range.height <= $1
432 )
433 GROUP BY h.payload_hash, h.ns_table
434 HAVING count(*) <= 1
435 )
436 DELETE FROM payload AS p
437 WHERE (p.hash, p.ns_table) IN (SELECT * FROM to_delete)",
438 )
439 .bind(height as i64)
440 .execute(self.as_mut())
441 .await
442 .context("deleting payloads")?;
443 tracing::debug!(
444 rows_affected = res.rows_affected(),
445 "garbage collected payloads"
446 );
447
448 let res = query(
450 "WITH to_delete AS (
451 SELECT h.payload_hash FROM header AS h
452 WHERE h.payload_hash IN (
453 SELECT range.payload_hash
454 FROM header AS range
455 WHERE range.height <= $1
456 )
457 GROUP BY h.payload_hash
458 HAVING count(*) <= 1
459 )
460 DELETE FROM vid_common AS v
461 WHERE v.hash IN (SELECT * FROM to_delete)",
462 )
463 .bind(height as i64)
464 .execute(self.as_mut())
465 .await
466 .context("deleting VID common")?;
467 tracing::debug!(
468 rows_affected = res.rows_affected(),
469 "garbage collected VID common"
470 );
471
472 let res = query("DELETE FROM transactions WHERE block_height <= $1")
474 .bind(height as i64)
475 .execute(self.as_mut())
476 .await
477 .context("deleting transactions")?;
478 tracing::debug!(rows_affected = res.rows_affected(), "pruned transactions");
479
480 let res = query("DELETE FROM leaf2 WHERE height <= $1")
481 .bind(height as i64)
482 .execute(self.as_mut())
483 .await
484 .context("deleting leaf2")?;
485 tracing::debug!(rows_affected = res.rows_affected(), "pruned leaf2");
486
487 let res = query("DELETE FROM header WHERE height <= $1")
488 .bind(height as i64)
489 .execute(self.as_mut())
490 .await
491 .context("deleting headers")?;
492 tracing::debug!(rows_affected = res.rows_affected(), "pruned headers");
493
494 Ok(())
495 }
496
497 #[instrument(skip(self))]
501 pub(super) async fn delete_state_batch(
502 &mut self,
503 state_tables: Vec<String>,
504 height: u64,
505 ) -> anyhow::Result<()> {
506 for state_table in state_tables {
507 self.execute(
508 query(&format!(
509 "
510 DELETE FROM {state_table}
511 WHERE {state_table}.created <= $1
512 AND EXISTS (
513 SELECT 1 FROM {state_table} AS t2
514 WHERE t2.path = {state_table}.path
515 AND t2.created > {state_table}.created
516 AND t2.created <= $1
517 )"
518 ))
519 .bind(height as i64),
520 )
521 .await?;
522 }
523
524 Ok(())
525 }
526
527 pub(crate) async fn save_pruned_height(&mut self, height: u64) -> anyhow::Result<()> {
529 self.upsert(
532 "pruned_height",
533 ["id", "last_height"],
534 ["id"],
535 [(1i32, height as i64)],
536 )
537 .await
538 }
539}
540
541impl<Types> UpdateAvailabilityStorage<Types> for Transaction<Write>
542where
543 Types: NodeType,
544 Payload<Types>: QueryablePayload<Types>,
545 Header<Types>: QueryableHeader<Types>,
546{
547 async fn insert_qc_chain(
548 &mut self,
549 height: u64,
550 qc_chain: Option<[CertificatePair<Types>; 2]>,
551 ) -> anyhow::Result<()> {
552 let block_height = NodeStorage::<Types>::block_height(self).await? as u64;
553 if height + 1 >= block_height {
554 let qcs = serde_json::to_value(&qc_chain)?;
559 self.upsert("latest_qc_chain", ["id", "qcs"], ["id"], [(1i32, qcs)])
560 .await
561 .context("inserting QC chain")?;
562 }
563
564 Ok(())
565 }
566
567 async fn insert_leaf_range<'a>(
568 &mut self,
569 leaves: impl Send + IntoIterator<IntoIter: Send, Item = &'a LeafQueryData<Types>>,
570 ) -> anyhow::Result<()> {
571 let leaves = leaves.into_iter();
572
573 let pruned_height = self.load_pruned_height().await?;
575 let leaves = leaves.skip_while(|leaf| pruned_height.is_some_and(|h| leaf.height() <= h));
576
577 let (header_rows, leaf_rows): (Vec<_>, Vec<_>) = leaves
580 .map(|leaf| {
581 let header_json = serde_json::to_value(leaf.leaf().block_header())
582 .context("failed to serialize header")?;
583 let header_row = (
584 leaf.height() as i64,
585 leaf.block_hash().to_string(),
586 leaf.leaf().block_header().payload_commitment().to_string(),
587 leaf.leaf().block_header().ns_table(),
588 header_json,
589 leaf.leaf().block_header().timestamp() as i64,
590 );
591
592 let leaf_json =
593 serde_json::to_value(leaf.leaf()).context("failed to serialize leaf")?;
594 let qc_json = serde_json::to_value(leaf.qc()).context("failed to serialize QC")?;
595 let leaf_row = (
596 leaf.height() as i64,
597 leaf.hash().to_string(),
598 leaf.block_hash().to_string(),
599 leaf_json,
600 qc_json,
601 );
602
603 anyhow::Ok((header_row, leaf_row))
604 })
605 .process_results(|iter| iter.unzip())?;
606
607 self.upsert(
608 "header",
609 [
610 "height",
611 "hash",
612 "payload_hash",
613 "ns_table",
614 "data",
615 "timestamp",
616 ],
617 ["height"],
618 header_rows,
619 )
620 .await
621 .context("inserting headers")?;
622
623 self.upsert(
625 "leaf2",
626 ["height", "hash", "block_hash", "leaf", "qc"],
627 ["height"],
628 leaf_rows,
629 )
630 .await
631 .context("inserting leaves")?;
632
633 Ok(())
634 }
635
636 async fn insert_block_range<'a>(
637 &mut self,
638 blocks: impl Send + IntoIterator<IntoIter: Send, Item = &'a BlockQueryData<Types>>,
639 ) -> anyhow::Result<()> {
640 let blocks = blocks.into_iter();
641
642 let pruned_height = self.load_pruned_height().await?;
644 let blocks = blocks.skip_while(|block| pruned_height.is_some_and(|h| block.height() <= h));
645
646 let (payload_rows, tx_rows): (Vec<_>, Vec<_>) = blocks
647 .map(|block| {
648 let payload_row = (
649 block.payload_hash().to_string(),
650 block.header().ns_table(),
651 block.size() as i32,
652 block.num_transactions() as i32,
653 block.payload.encode().as_ref().to_vec(),
654 );
655
656 let tx_rows = block.enumerate().map(|(txn_ix, txn)| {
657 let ns_id = block.header().namespace_id(&txn_ix.ns_index).unwrap();
658 (
659 txn.commit().to_string(),
660 block.height() as i64,
661 txn_ix.ns_index.into(),
662 ns_id.into(),
663 txn_ix.position as i64,
664 )
665 });
666
667 (payload_row, tx_rows)
668 })
669 .unzip();
670 let tx_rows = tx_rows.into_iter().flatten().collect::<Vec<_>>();
671
672 self.upsert(
673 "payload",
674 ["hash", "ns_table", "size", "num_transactions", "data"],
675 ["hash", "ns_table"],
676 payload_rows,
677 )
678 .await
679 .context("inserting payloads")?;
680
681 if !tx_rows.is_empty() {
683 self.upsert(
684 "transactions",
685 ["hash", "block_height", "ns_index", "ns_id", "position"],
686 ["block_height", "ns_id", "position"],
687 tx_rows,
688 )
689 .await
690 .context("inserting transactions")?;
691 }
692
693 Ok(())
694 }
695
696 async fn insert_vid_range<'a>(
697 &mut self,
698 vid: impl Send
699 + IntoIterator<
700 IntoIter: Send,
701 Item = (&'a VidCommonQueryData<Types>, Option<&'a VidShare>),
702 >,
703 ) -> anyhow::Result<()> {
704 let vid = vid.into_iter();
705
706 let pruned_height = self.load_pruned_height().await?;
708 let vid = vid.skip_while(|(common, _)| pruned_height.is_some_and(|h| common.height() <= h));
709
710 let (common_rows, share_rows): (Vec<_>, Vec<_>) = vid
711 .map(|(common, share)| {
712 let common_data = bincode::serialize(common.common())
713 .context("failed to serialize VID common data")?;
714 let common_row = (common.payload_hash().to_string(), common_data);
715
716 let share_row = if let Some(share) = share {
717 let share_data =
718 bincode::serialize(&share).context("failed to serialize VID share")?;
719 Some((common.height() as i64, share_data))
720 } else {
721 None
722 };
723
724 anyhow::Ok((common_row, share_row))
725 })
726 .process_results(|iter| iter.unzip())?;
727 let share_rows = share_rows.into_iter().flatten().collect::<Vec<_>>();
728
729 self.upsert("vid_common", ["hash", "data"], ["hash"], common_rows)
730 .await
731 .context("inserting VID common")?;
732
733 if !share_rows.is_empty() {
734 let mut q = QueryBuilder::new("WITH rows (height, share) AS (");
735 q.push_values(share_rows, |mut q, (height, share)| {
736 q.push_bind(height).push_bind(share);
737 });
738 q.push(
739 ") UPDATE header SET vid_share = rows.share
740 FROM rows
741 WHERE header.height = rows.height",
742 );
743 q.build()
744 .execute(self.as_mut())
745 .await
746 .context("inserting VID shares")?;
747 }
748
749 Ok(())
750 }
751}
752
753#[async_trait]
754impl<Types: NodeType, State: MerklizedState<Types, ARITY>, const ARITY: usize>
755 UpdateStateData<Types, State, ARITY> for Transaction<Write>
756{
757 async fn set_last_state_height(&mut self, height: usize) -> anyhow::Result<()> {
758 self.upsert(
759 "last_merklized_state_height",
760 ["id", "height"],
761 ["id"],
762 [(1i32, height as i64)],
763 )
764 .await?;
765
766 Ok(())
767 }
768
769 async fn insert_merkle_nodes(
770 &mut self,
771 proof: MerkleProof<State::Entry, State::Key, State::T, ARITY>,
772 traversal_path: Vec<usize>,
773 block_number: u64,
774 ) -> anyhow::Result<()> {
775 let proofs = vec![(proof, traversal_path)];
776 UpdateStateData::<Types, State, ARITY>::insert_merkle_nodes_batch(
777 self,
778 proofs,
779 block_number,
780 )
781 .await
782 }
783
784 async fn insert_merkle_nodes_batch(
785 &mut self,
786 proofs: Vec<(
787 MerkleProof<State::Entry, State::Key, State::T, ARITY>,
788 Vec<usize>,
789 )>,
790 block_number: u64,
791 ) -> anyhow::Result<()> {
792 if proofs.is_empty() {
793 return Ok(());
794 }
795
796 let name = State::state_type();
797 let block_number = block_number as i64;
798
799 let (mut all_nodes, all_hashes) = collect_nodes_from_proofs(&proofs)?;
800 let hashes: Vec<Vec<u8>> = all_hashes.into_iter().collect();
801
802 #[cfg(not(feature = "embedded-db"))]
803 let nodes_hash_ids: HashMap<Vec<u8>, i32> = batch_insert_hashes(hashes, self).await?;
804
805 #[cfg(feature = "embedded-db")]
806 let nodes_hash_ids: HashMap<Vec<u8>, i32> = {
807 let mut hash_ids: HashMap<Vec<u8>, i32> = HashMap::with_capacity(hashes.len());
808 for hash_chunk in hashes.chunks(20) {
809 let (query, sql) = build_hash_batch_insert(hash_chunk)?;
810 let chunk_ids: HashMap<Vec<u8>, i32> = query
811 .query_as(&sql)
812 .fetch(self.as_mut())
813 .try_collect()
814 .await?;
815 hash_ids.extend(chunk_ids);
816 }
817 hash_ids
818 };
819
820 for (node, children, hash) in &mut all_nodes {
821 node.created = block_number;
822 node.hash_id = *nodes_hash_ids.get(&*hash).ok_or(QueryError::Error {
823 message: "Missing node hash".to_string(),
824 })?;
825
826 if let Some(children) = children {
827 let children_hashes = children
828 .iter()
829 .map(|c| nodes_hash_ids.get(c).copied())
830 .collect::<Option<Vec<i32>>>()
831 .ok_or(QueryError::Error {
832 message: "Missing child hash".to_string(),
833 })?;
834
835 node.children = Some(children_hashes.into());
836 }
837 }
838
839 Node::upsert(name, all_nodes.into_iter().map(|(n, ..)| n), self).await?;
840
841 Ok(())
842 }
843}
844
845#[async_trait]
846impl<Mode: TransactionMode> PrunedHeightStorage for Transaction<Mode> {
847 async fn load_pruned_height(&mut self) -> anyhow::Result<Option<u64>> {
848 let Some((height,)) =
849 query_as::<(i64,)>("SELECT last_height FROM pruned_height ORDER BY id DESC LIMIT 1")
850 .fetch_optional(self.as_mut())
851 .await?
852 else {
853 return Ok(None);
854 };
855 Ok(Some(height as u64))
856 }
857}
858
859#[derive(Clone, Debug)]
860pub(super) struct PoolMetrics {
861 open_transactions: Box<dyn Gauge>,
862 transaction_durations: Box<dyn Histogram>,
863 commits: Box<dyn Counter>,
864 reverts: Box<dyn Counter>,
865 drops: Box<dyn Counter>,
866}
867
868impl PoolMetrics {
869 pub(super) fn new(metrics: &(impl Metrics + ?Sized)) -> Self {
870 Self {
871 open_transactions: metrics.create_gauge("open_transactions".into(), None),
872 transaction_durations: metrics
873 .create_histogram("transaction_duration".into(), Some("s".into())),
874 commits: metrics.create_counter("committed_transactions".into(), None),
875 reverts: metrics.create_counter("reverted_transactions".into(), None),
876 drops: metrics.create_counter("dropped_transactions".into(), None),
877 }
878 }
879}