hotshot_query_service/data_source/storage/
fs.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "file-system-data-source")]
14
15use std::{
16    collections::{
17        BTreeMap,
18        hash_map::{Entry, HashMap},
19    },
20    hash::Hash,
21    iter,
22    ops::{Bound, Deref, RangeBounds},
23    path::Path,
24};
25
26use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
27use async_trait::async_trait;
28use atomic_store::{AtomicStore, AtomicStoreLoader, PersistenceError};
29use committable::Committable;
30use futures::future::Future;
31use hotshot_types::{
32    data::{VidCommitment, VidShare},
33    simple_certificate::CertificatePair,
34    traits::{block_contents::BlockHeader, node_implementation::NodeType},
35};
36use serde::{Serialize, de::DeserializeOwned};
37use snafu::OptionExt;
38
39use super::{
40    Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata,
41    UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata,
42    ledger_log::{Iter, LedgerLog},
43    pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig},
44};
45use crate::{
46    Header, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult,
47    availability::{
48        NamespaceId,
49        data_source::{BlockId, LeafId},
50        query_data::{
51            BlockHash, BlockQueryData, LeafHash, LeafQueryData, PayloadQueryData, QueryableHeader,
52            QueryablePayload, TransactionHash, VidCommonQueryData,
53        },
54    },
55    data_source::{VersionedDataSource, update},
56    metrics::PrometheusMetrics,
57    node::{SyncStatusQueryData, TimeWindowQueryData, WindowStart},
58    status::HasMetrics,
59    types::HeightIndexed,
60};
61
62const CACHED_LEAVES_COUNT: usize = 100;
63const CACHED_BLOCKS_COUNT: usize = 100;
64const CACHED_VID_COMMON_COUNT: usize = 100;
65
66#[derive(custom_debug::Debug)]
67pub struct FileSystemStorageInner<Types>
68where
69    Types: NodeType,
70    Header<Types>: QueryableHeader<Types>,
71    Payload<Types>: QueryablePayload<Types>,
72{
73    index_by_leaf_hash: HashMap<LeafHash<Types>, u64>,
74    index_by_block_hash: HashMap<BlockHash<Types>, u64>,
75    index_by_payload_hash: HashMap<VidCommitment, u64>,
76    index_by_txn_hash: HashMap<TransactionHash<Types>, u64>,
77    index_by_time: BTreeMap<u64, Vec<u64>>,
78    num_transactions: usize,
79    payload_size: usize,
80    #[debug(skip)]
81    top_storage: Option<AtomicStore>,
82    leaf_storage: LedgerLog<LeafQueryData<Types>>,
83    block_storage: LedgerLog<BlockQueryData<Types>>,
84    vid_storage: LedgerLog<(VidCommonQueryData<Types>, Option<VidShare>)>,
85    latest_qc_chain: Option<[CertificatePair<Types>; 2]>,
86}
87
88impl<Types> FileSystemStorageInner<Types>
89where
90    Types: NodeType,
91    Header<Types>: QueryableHeader<Types>,
92    Payload<Types>: QueryablePayload<Types>,
93{
94    fn get_block_index(&self, id: BlockId<Types>) -> QueryResult<usize> {
95        match id {
96            BlockId::Number(n) => Ok(n),
97            BlockId::Hash(h) => {
98                Ok(*self.index_by_block_hash.get(&h).context(NotFoundSnafu)? as usize)
99            },
100            BlockId::PayloadHash(h) => {
101                Ok(*self.index_by_payload_hash.get(&h).context(NotFoundSnafu)? as usize)
102            },
103        }
104    }
105
106    fn get_block(&self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
107        self.block_storage
108            .iter()
109            .nth(self.get_block_index(id)?)
110            .context(NotFoundSnafu)?
111            .context(MissingSnafu)
112    }
113
114    fn get_header(&self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
115        self.get_block(id).map(|block| block.header)
116    }
117
118    fn get_block_range<R>(&self, range: R) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
119    where
120        R: RangeBounds<usize> + Send,
121    {
122        Ok(range_iter(self.block_storage.iter(), range).collect())
123    }
124}
125
126/// Storage for the APIs provided in this crate, backed by a remote PostgreSQL database.
127#[derive(Debug)]
128pub struct FileSystemStorage<Types: NodeType>
129where
130    Header<Types>: QueryableHeader<Types>,
131    Payload<Types>: QueryablePayload<Types>,
132{
133    inner: RwLock<FileSystemStorageInner<Types>>,
134    metrics: PrometheusMetrics,
135}
136
137impl<Types: NodeType> PrunerConfig for FileSystemStorage<Types>
138where
139    Header<Types>: QueryableHeader<Types>,
140    Payload<Types>: QueryablePayload<Types>,
141{
142}
143impl<Types: NodeType> PruneStorage for FileSystemStorage<Types>
144where
145    Header<Types>: QueryableHeader<Types>,
146    Payload<Types>: QueryablePayload<Types>,
147{
148    type Pruner = ();
149}
150
151impl<Types: NodeType> FileSystemStorage<Types>
152where
153    Payload<Types>: QueryablePayload<Types>,
154    Header<Types>: QueryableHeader<Types>,
155{
156    /// Create a new [FileSystemStorage] with storage at `path`.
157    ///
158    /// If there is already data at `path`, it will be archived.
159    ///
160    /// The [FileSystemStorage] will manage its own persistence synchronization.
161    pub async fn create(path: &Path) -> Result<Self, PersistenceError> {
162        let mut loader = AtomicStoreLoader::create(path, "hotshot_data_source")?;
163        loader.retain_archives(1);
164        let data_source = Self::create_with_store(&mut loader).await?;
165        data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
166        Ok(data_source)
167    }
168
169    /// Open an existing [FileSystemStorage] from storage at `path`.
170    ///
171    /// If there is no data at `path`, a new store will be created.
172    ///
173    /// The [FileSystemStorage] will manage its own persistence synchronization.
174    pub async fn open(path: &Path) -> Result<Self, PersistenceError> {
175        let mut loader = AtomicStoreLoader::load(path, "hotshot_data_source")?;
176        loader.retain_archives(1);
177        let data_source = Self::open_with_store(&mut loader).await?;
178        data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
179        Ok(data_source)
180    }
181
182    /// Create a new [FileSystemStorage] using a persistent storage loader.
183    ///
184    /// If there is existing data corresponding to the [FileSystemStorage] data structures, it will
185    /// be archived.
186    ///
187    /// The [FileSystemStorage] will register its persistent data structures with `loader`. The
188    /// caller is responsible for creating an [AtomicStore] from `loader` and managing
189    /// synchronization of the store.
190    pub async fn create_with_store(
191        loader: &mut AtomicStoreLoader,
192    ) -> Result<Self, PersistenceError> {
193        Ok(Self {
194            inner: RwLock::new(FileSystemStorageInner {
195                index_by_leaf_hash: Default::default(),
196                index_by_block_hash: Default::default(),
197                index_by_payload_hash: Default::default(),
198                index_by_txn_hash: Default::default(),
199                index_by_time: Default::default(),
200                num_transactions: 0,
201                payload_size: 0,
202                top_storage: None,
203                leaf_storage: LedgerLog::create(loader, "leaves", CACHED_LEAVES_COUNT)?,
204                block_storage: LedgerLog::create(loader, "blocks", CACHED_BLOCKS_COUNT)?,
205                vid_storage: LedgerLog::create(loader, "vid_common", CACHED_VID_COMMON_COUNT)?,
206                latest_qc_chain: None,
207            }),
208            metrics: Default::default(),
209        })
210    }
211
212    /// Open an existing [FileSystemStorage] using a persistent storage loader.
213    ///
214    /// If there is no existing data corresponding to the [FileSystemStorage] data structures, a new
215    /// store will be created.
216    ///
217    /// The [FileSystemStorage] will register its persistent data structures with `loader`. The
218    /// caller is responsible for creating an [AtomicStore] from `loader` and managing
219    /// synchronization of the store.
220    pub async fn open_with_store(loader: &mut AtomicStoreLoader) -> Result<Self, PersistenceError> {
221        let leaf_storage =
222            LedgerLog::<LeafQueryData<Types>>::open(loader, "leaves", CACHED_LEAVES_COUNT)?;
223        let block_storage =
224            LedgerLog::<BlockQueryData<Types>>::open(loader, "blocks", CACHED_BLOCKS_COUNT)?;
225        let vid_storage = LedgerLog::<(VidCommonQueryData<Types>, Option<VidShare>)>::open(
226            loader,
227            "vid_common",
228            CACHED_VID_COMMON_COUNT,
229        )?;
230
231        let mut index_by_block_hash = HashMap::new();
232        let mut index_by_payload_hash = HashMap::new();
233        let mut index_by_time = BTreeMap::<u64, Vec<u64>>::new();
234        let index_by_leaf_hash = leaf_storage
235            .iter()
236            .flatten()
237            .map(|leaf| {
238                update_index_by_hash(&mut index_by_block_hash, leaf.block_hash(), leaf.height());
239                update_index_by_hash(
240                    &mut index_by_payload_hash,
241                    leaf.payload_hash(),
242                    leaf.height(),
243                );
244                index_by_time
245                    .entry(leaf.header().timestamp())
246                    .or_default()
247                    .push(leaf.height());
248                (leaf.hash(), leaf.height())
249            })
250            .collect();
251
252        let mut index_by_txn_hash = HashMap::new();
253        let mut num_transactions = 0;
254        let mut payload_size = 0;
255        for block in block_storage.iter().flatten() {
256            num_transactions += block.len();
257            payload_size += block.size() as usize;
258
259            let height = block.height();
260            for (_, txn) in block.enumerate() {
261                update_index_by_hash(&mut index_by_txn_hash, txn.commit(), height);
262            }
263        }
264
265        Ok(Self {
266            inner: RwLock::new(FileSystemStorageInner {
267                index_by_leaf_hash,
268                index_by_block_hash,
269                index_by_payload_hash,
270                index_by_txn_hash,
271                index_by_time,
272                num_transactions,
273                payload_size,
274                leaf_storage,
275                block_storage,
276                vid_storage,
277                top_storage: None,
278                latest_qc_chain: None,
279            }),
280            metrics: Default::default(),
281        })
282    }
283
284    /// Advance the version of the persistent store without committing changes to persistent state.
285    pub async fn skip_version(&self) -> Result<(), PersistenceError> {
286        let mut inner = self.inner.write().await;
287        inner.leaf_storage.skip_version()?;
288        inner.block_storage.skip_version()?;
289        inner.vid_storage.skip_version()?;
290        if let Some(store) = &mut inner.top_storage {
291            store.commit_version()?;
292        }
293        Ok(())
294    }
295
296    /// Get the stored VID share for a given block, if one exists.
297    pub async fn get_vid_share(&self, block_id: BlockId<Types>) -> QueryResult<VidShare> {
298        let mut tx = self.read().await.map_err(|err| QueryError::Error {
299            message: err.to_string(),
300        })?;
301        let share = tx.vid_share(block_id).await?;
302        Ok(share)
303    }
304
305    /// Get the stored VID common data for a given block, if one exists.
306    pub async fn get_vid_common(
307        &self,
308        block_id: BlockId<Types>,
309    ) -> QueryResult<VidCommonQueryData<Types>> {
310        let mut tx = self.read().await.map_err(|err| QueryError::Error {
311            message: err.to_string(),
312        })?;
313        let share = tx.get_vid_common(block_id).await?;
314        Ok(share)
315    }
316
317    /// Get the stored VID common metadata for a given block, if one exists.
318    pub async fn get_vid_common_metadata(
319        &self,
320        block_id: BlockId<Types>,
321    ) -> QueryResult<VidCommonMetadata<Types>> {
322        let mut tx = self.read().await.map_err(|err| QueryError::Error {
323            message: err.to_string(),
324        })?;
325        let share = tx.get_vid_common_metadata(block_id).await?;
326        Ok(share)
327    }
328}
329
330pub trait Revert {
331    fn revert(&mut self);
332}
333
334impl<Types> Revert for RwLockWriteGuard<'_, FileSystemStorageInner<Types>>
335where
336    Types: NodeType,
337    Header<Types>: QueryableHeader<Types>,
338    Payload<Types>: QueryablePayload<Types>,
339{
340    fn revert(&mut self) {
341        self.leaf_storage.revert_version().unwrap();
342        self.block_storage.revert_version().unwrap();
343        self.vid_storage.revert_version().unwrap();
344    }
345}
346
347impl<Types> Revert for RwLockReadGuard<'_, FileSystemStorageInner<Types>>
348where
349    Types: NodeType,
350    Header<Types>: QueryableHeader<Types>,
351    Payload<Types>: QueryablePayload<Types>,
352{
353    fn revert(&mut self) {
354        // Nothing to revert for a read-only transaction.
355    }
356}
357
358#[derive(Debug)]
359pub struct Transaction<T: Revert> {
360    inner: T,
361}
362
363impl<T: Revert> Drop for Transaction<T> {
364    fn drop(&mut self) {
365        self.inner.revert();
366    }
367}
368impl<Types> update::Transaction for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
369where
370    Types: NodeType,
371    Header<Types>: QueryableHeader<Types>,
372    Payload<Types>: QueryablePayload<Types>,
373{
374    async fn commit(mut self) -> anyhow::Result<()> {
375        self.inner.leaf_storage.commit_version().await?;
376        self.inner.block_storage.commit_version().await?;
377        self.inner.vid_storage.commit_version().await?;
378        if let Some(store) = &mut self.inner.top_storage {
379            store.commit_version()?;
380        }
381        Ok(())
382    }
383
384    fn revert(self) -> impl Future + Send {
385        // The revert is handled when `self` is dropped.
386        async move {}
387    }
388}
389
390impl<Types> update::Transaction for Transaction<RwLockReadGuard<'_, FileSystemStorageInner<Types>>>
391where
392    Types: NodeType,
393    Header<Types>: QueryableHeader<Types>,
394    Payload<Types>: QueryablePayload<Types>,
395{
396    async fn commit(self) -> anyhow::Result<()> {
397        // Nothing to commit for a read-only transaction.
398        Ok(())
399    }
400
401    fn revert(self) -> impl Future + Send {
402        // The revert is handled when `self` is dropped.
403        async move {}
404    }
405}
406
407impl<Types: NodeType> VersionedDataSource for FileSystemStorage<Types>
408where
409    Header<Types>: QueryableHeader<Types>,
410    Payload<Types>: QueryablePayload<Types>,
411{
412    type Transaction<'a>
413        = Transaction<RwLockWriteGuard<'a, FileSystemStorageInner<Types>>>
414    where
415        Self: 'a;
416    type ReadOnly<'a>
417        = Transaction<RwLockReadGuard<'a, FileSystemStorageInner<Types>>>
418    where
419        Self: 'a;
420
421    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
422        Ok(Transaction {
423            inner: self.inner.write().await,
424        })
425    }
426
427    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
428        Ok(Transaction {
429            inner: self.inner.read().await,
430        })
431    }
432}
433fn range_iter<T>(
434    mut iter: Iter<'_, T>,
435    range: impl RangeBounds<usize>,
436) -> impl '_ + Iterator<Item = QueryResult<T>>
437where
438    T: Clone + Serialize + DeserializeOwned,
439{
440    let start = range.start_bound().cloned();
441    let end = range.end_bound().cloned();
442
443    // Advance the underlying iterator to the start of the range.
444    let mut pos = match start {
445        Bound::Included(n) => {
446            if n > 0 {
447                iter.nth(n - 1);
448            }
449            n
450        },
451        Bound::Excluded(n) => {
452            iter.nth(n);
453            n + 1
454        },
455        Bound::Unbounded => 0,
456    };
457
458    iter::from_fn(move || {
459        // Check if we have reached the end of the range.
460        let reached_end = match end {
461            Bound::Included(n) => pos > n,
462            Bound::Excluded(n) => pos >= n,
463            Bound::Unbounded => false,
464        };
465        if reached_end {
466            return None;
467        }
468        let opt = iter.next()?;
469        pos += 1;
470        Some(opt.context(MissingSnafu))
471    })
472}
473
474#[async_trait]
475impl<Types, T> AvailabilityStorage<Types> for Transaction<T>
476where
477    Types: NodeType,
478    Payload<Types>: QueryablePayload<Types>,
479    Header<Types>: QueryableHeader<Types>,
480    T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
481{
482    async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
483        let n = match id {
484            LeafId::Number(n) => n,
485            LeafId::Hash(h) => *self
486                .inner
487                .index_by_leaf_hash
488                .get(&h)
489                .context(NotFoundSnafu)? as usize,
490        };
491        self.inner
492            .leaf_storage
493            .iter()
494            .nth(n)
495            .context(NotFoundSnafu)?
496            .context(MissingSnafu)
497    }
498
499    async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
500        self.inner.get_block(id)
501    }
502
503    async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
504        self.inner.get_header(id)
505    }
506
507    async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
508        self.get_block(id).await.map(PayloadQueryData::from)
509    }
510
511    async fn get_payload_metadata(
512        &mut self,
513        id: BlockId<Types>,
514    ) -> QueryResult<PayloadMetadata<Types>> {
515        self.get_block(id).await.map(PayloadMetadata::from)
516    }
517
518    async fn get_vid_common(
519        &mut self,
520        id: BlockId<Types>,
521    ) -> QueryResult<VidCommonQueryData<Types>> {
522        Ok(self
523            .inner
524            .vid_storage
525            .iter()
526            .nth(self.inner.get_block_index(id)?)
527            .context(NotFoundSnafu)?
528            .context(MissingSnafu)?
529            .0)
530    }
531
532    async fn get_vid_common_metadata(
533        &mut self,
534        id: BlockId<Types>,
535    ) -> QueryResult<VidCommonMetadata<Types>> {
536        self.get_vid_common(id).await.map(VidCommonMetadata::from)
537    }
538
539    async fn get_leaf_range<R>(
540        &mut self,
541        range: R,
542    ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
543    where
544        R: RangeBounds<usize> + Send,
545    {
546        Ok(range_iter(self.inner.leaf_storage.iter(), range).collect())
547    }
548
549    async fn get_block_range<R>(
550        &mut self,
551        range: R,
552    ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
553    where
554        R: RangeBounds<usize> + Send,
555    {
556        self.inner.get_block_range(range)
557    }
558
559    async fn get_payload_range<R>(
560        &mut self,
561        range: R,
562    ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
563    where
564        R: RangeBounds<usize> + Send,
565    {
566        Ok(range_iter(self.inner.block_storage.iter(), range)
567            .map(|res| res.map(PayloadQueryData::from))
568            .collect())
569    }
570
571    async fn get_payload_metadata_range<R>(
572        &mut self,
573        range: R,
574    ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
575    where
576        R: RangeBounds<usize> + Send + 'static,
577    {
578        Ok(range_iter(self.inner.block_storage.iter(), range)
579            .map(|res| res.map(PayloadMetadata::from))
580            .collect())
581    }
582
583    async fn get_vid_common_range<R>(
584        &mut self,
585        range: R,
586    ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
587    where
588        R: RangeBounds<usize> + Send,
589    {
590        Ok(range_iter(self.inner.vid_storage.iter(), range)
591            .map(|res| res.map(|(common, _)| common))
592            .collect())
593    }
594
595    async fn get_vid_common_metadata_range<R>(
596        &mut self,
597        range: R,
598    ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
599    where
600        R: RangeBounds<usize> + Send,
601    {
602        Ok(range_iter(self.inner.vid_storage.iter(), range)
603            .map(|res| res.map(|(common, _)| common.into()))
604            .collect())
605    }
606
607    async fn get_block_with_transaction(
608        &mut self,
609        hash: TransactionHash<Types>,
610    ) -> QueryResult<BlockQueryData<Types>> {
611        let height = self
612            .inner
613            .index_by_txn_hash
614            .get(&hash)
615            .context(NotFoundSnafu)?;
616        self.inner.get_block((*height as usize).into())
617    }
618
619    async fn first_available_leaf(&mut self, from: u64) -> QueryResult<LeafQueryData<Types>> {
620        // The file system backend doesn't index by whether a leaf is present, so we can't
621        // efficiently seek to the first leaf with height >= `from`. Our best effort is to return
622        // `from` itself if we can, or fail.
623        self.get_leaf((from as usize).into()).await
624    }
625}
626
627impl<Types: NodeType> UpdateAvailabilityStorage<Types>
628    for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
629where
630    Payload<Types>: QueryablePayload<Types>,
631    Header<Types>: QueryableHeader<Types>,
632{
633    async fn insert_qc_chain(
634        &mut self,
635        height: u64,
636        qc_chain: Option<[CertificatePair<Types>; 2]>,
637    ) -> anyhow::Result<()> {
638        if height + 1 >= (self.inner.leaf_storage.iter().len() as u64) {
639            // If this QC chain is for the latest leaf we know about, store it so that we can prove
640            // to clients that the corresponding leaf is finalized. (If it is not the latest leaf,
641            // this is unnecessary, since we can prove it is an ancestor of some later, finalized
642            // leaf.)
643            if let Some(qc_chain) = qc_chain {
644                self.inner.latest_qc_chain = Some(qc_chain);
645            } else {
646                // Since we have a new latest leaf, we have to update latest QC chain even if we
647                // don't actually have a QC chain to store.
648                self.inner.latest_qc_chain = None;
649            }
650        }
651
652        Ok(())
653    }
654
655    async fn insert_leaf_range<'a>(
656        &mut self,
657        leaves: impl Send + IntoIterator<Item = &'a LeafQueryData<Types>>,
658    ) -> anyhow::Result<()> {
659        for leaf in leaves {
660            if !self
661                .inner
662                .leaf_storage
663                .insert(leaf.height() as usize, leaf.clone())?
664            {
665                // The leaf was already present.
666                continue;
667            }
668            self.inner
669                .index_by_leaf_hash
670                .insert(leaf.hash(), leaf.height());
671            update_index_by_hash(
672                &mut self.inner.index_by_block_hash,
673                leaf.block_hash(),
674                leaf.height(),
675            );
676            update_index_by_hash(
677                &mut self.inner.index_by_payload_hash,
678                leaf.payload_hash(),
679                leaf.height(),
680            );
681            self.inner
682                .index_by_time
683                .entry(leaf.header().timestamp())
684                .or_default()
685                .push(leaf.height());
686        }
687
688        Ok(())
689    }
690
691    async fn insert_block_range<'a>(
692        &mut self,
693        blocks: impl Send + IntoIterator<IntoIter: Send, Item = &'a BlockQueryData<Types>>,
694    ) -> anyhow::Result<()> {
695        for block in blocks {
696            if !self
697                .inner
698                .block_storage
699                .insert(block.height() as usize, block.clone())?
700            {
701                // The block was already present.
702                continue;
703            }
704            self.inner.num_transactions += block.len();
705            self.inner.payload_size += block.size() as usize;
706            for (_, txn) in block.enumerate() {
707                update_index_by_hash(
708                    &mut self.inner.index_by_txn_hash,
709                    txn.commit(),
710                    block.height(),
711                );
712            }
713        }
714        Ok(())
715    }
716
717    async fn insert_vid_range<'a>(
718        &mut self,
719        vid: impl Send
720        + IntoIterator<
721            IntoIter: Send,
722            Item = (&'a VidCommonQueryData<Types>, Option<&'a VidShare>),
723        >,
724    ) -> anyhow::Result<()> {
725        for (common, share) in vid {
726            self.inner
727                .vid_storage
728                .insert(common.height() as usize, (common.clone(), share.cloned()))?;
729        }
730        Ok(())
731    }
732}
733
734/// Update an index mapping hashes of objects to their positions in the ledger.
735///
736/// This function will insert the mapping from `hash` to `pos` into `index`, _unless_ there is
737/// already an entry for `hash` at an earlier position in the ledger.
738fn update_index_by_hash<H: Eq + Hash, P: Ord>(index: &mut HashMap<H, P>, hash: H, pos: P) {
739    match index.entry(hash) {
740        Entry::Occupied(mut e) => {
741            if &pos < e.get() {
742                // Overwrite the existing entry if the new object was sequenced first.
743                e.insert(pos);
744            }
745        },
746        Entry::Vacant(e) => {
747            e.insert(pos);
748        },
749    }
750}
751
752#[async_trait]
753impl<Types, T> NodeStorage<Types> for Transaction<T>
754where
755    Types: NodeType,
756    Payload<Types>: QueryablePayload<Types>,
757    Header<Types>: QueryableHeader<Types>,
758    T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
759{
760    async fn block_height(&mut self) -> QueryResult<usize> {
761        Ok(self.inner.leaf_storage.iter().len())
762    }
763
764    async fn count_transactions_in_range(
765        &mut self,
766        range: impl RangeBounds<usize> + Send,
767        namespace: Option<NamespaceId<Types>>,
768    ) -> QueryResult<usize> {
769        if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
770            || !matches!(range.end_bound(), Bound::Unbounded)
771        {
772            return Err(QueryError::Error {
773                message: "partial aggregates are not supported with file system backend".into(),
774            });
775        }
776
777        if namespace.is_some() {
778            return Err(QueryError::Error {
779                message: "file system does not support per-namespace stats".into(),
780            });
781        }
782
783        Ok(self.inner.num_transactions)
784    }
785
786    async fn payload_size_in_range(
787        &mut self,
788        range: impl RangeBounds<usize> + Send,
789        namespace: Option<NamespaceId<Types>>,
790    ) -> QueryResult<usize> {
791        if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
792            || !matches!(range.end_bound(), Bound::Unbounded)
793        {
794            return Err(QueryError::Error {
795                message: "partial aggregates are not supported with file system backend".into(),
796            });
797        }
798
799        if namespace.is_some() {
800            return Err(QueryError::Error {
801                message: "file system does not support per-namespace stats".into(),
802            });
803        }
804
805        Ok(self.inner.payload_size)
806    }
807
808    async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
809    where
810        ID: Into<BlockId<Types>> + Send + Sync,
811    {
812        self.inner
813            .vid_storage
814            .iter()
815            .nth(self.inner.get_block_index(id.into())?)
816            .context(NotFoundSnafu)?
817            .context(MissingSnafu)?
818            .1
819            .context(MissingSnafu)
820    }
821
822    async fn sync_status_for_range(
823        &mut self,
824        start: usize,
825        end: usize,
826    ) -> QueryResult<SyncStatusQueryData> {
827        Ok(SyncStatusQueryData {
828            leaves: self.inner.leaf_storage.sync_status(start, end),
829            blocks: self.inner.block_storage.sync_status(start, end),
830            vid_common: self.inner.vid_storage.sync_status(start, end),
831            pruned_height: None,
832        })
833    }
834
835    async fn get_header_window(
836        &mut self,
837        start: impl Into<WindowStart<Types>> + Send + Sync,
838        end: u64,
839        limit: usize,
840    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
841        let first_block = match start.into() {
842            WindowStart::Height(h) => h,
843            WindowStart::Hash(h) => self.inner.get_header(h.into())?.block_number(),
844            WindowStart::Time(t) => {
845                // Find the minimum timestamp which is at least `t`, and all the blocks with
846                // that timestamp.
847                let blocks = self
848                    .inner
849                    .index_by_time
850                    .range(t..)
851                    .next()
852                    .context(NotFoundSnafu)?
853                    .1;
854                // Multiple blocks can have the same timestamp (when truncated to seconds);
855                // we want the first one. It is an invariant that any timestamp which has an
856                // entry in `index_by_time` has a non-empty list associated with it, so this
857                // indexing is safe.
858                blocks[0]
859            },
860        } as usize;
861
862        let mut res = TimeWindowQueryData::default();
863
864        // Include the block just before the start of the window, if there is one.
865        if first_block > 0 {
866            res.prev = Some(self.inner.get_header((first_block - 1).into())?);
867        }
868
869        // Add blocks to the window, starting from `first_block`, until we reach the end of
870        // the requested time window.
871        for block in self.inner.get_block_range(first_block..)? {
872            let header = block?.header().clone();
873            if header.timestamp() >= end {
874                res.next = Some(header);
875                break;
876            }
877            res.window.push(header);
878            if res.window.len() >= limit {
879                break;
880            }
881        }
882
883        Ok(res)
884    }
885
886    async fn latest_qc_chain(&mut self) -> QueryResult<Option<[CertificatePair<Types>; 2]>> {
887        Ok(self.inner.latest_qc_chain.clone())
888    }
889}
890
891impl<Types, T: Revert + Send> AggregatesStorage<Types> for Transaction<T>
892where
893    Types: NodeType,
894    Header<Types>: QueryableHeader<Types>,
895{
896    async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
897        Ok(0)
898    }
899
900    async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
901        Ok(None)
902    }
903}
904
905impl<Types, T: Revert + Send> UpdateAggregatesStorage<Types> for Transaction<T>
906where
907    Types: NodeType,
908    Header<Types>: QueryableHeader<Types>,
909{
910    async fn update_aggregates(
911        &mut self,
912        _prev: Aggregate<Types>,
913        _blocks: &[PayloadMetadata<Types>],
914    ) -> anyhow::Result<Aggregate<Types>> {
915        Ok(Aggregate::default())
916    }
917}
918
919impl<T: Revert> PrunedHeightStorage for Transaction<T> {}
920
921impl<Types> HasMetrics for FileSystemStorage<Types>
922where
923    Types: NodeType,
924    Header<Types>: QueryableHeader<Types>,
925    Payload<Types>: QueryablePayload<Types>,
926{
927    fn metrics(&self) -> &PrometheusMetrics {
928        &self.metrics
929    }
930}