Skip to main content

hotshot_query_service/data_source/storage/
fs.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13#![cfg(feature = "file-system-data-source")]
14
15use std::{
16    collections::{
17        BTreeMap,
18        hash_map::{Entry, HashMap},
19    },
20    hash::Hash,
21    iter,
22    ops::{Bound, Deref, RangeBounds},
23    path::Path,
24};
25
26use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
27use async_trait::async_trait;
28use atomic_store::{AtomicStore, AtomicStoreLoader, PersistenceError};
29use committable::Committable;
30use futures::future::Future;
31use hotshot_types::{
32    data::{VidCommitment, VidShare},
33    simple_certificate::CertificatePair,
34    traits::{block_contents::BlockHeader, node_implementation::NodeType},
35};
36use serde::{Serialize, de::DeserializeOwned};
37use snafu::OptionExt;
38
39use super::{
40    Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata,
41    UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata,
42    ledger_log::{Iter, LedgerLog},
43    pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig},
44};
45use crate::{
46    Header, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult,
47    availability::{
48        Certificate2, NamespaceId,
49        data_source::{BlockId, LeafId},
50        query_data::{
51            BlockHash, BlockQueryData, LeafHash, LeafQueryData, PayloadQueryData, QueryableHeader,
52            QueryablePayload, TransactionHash, VidCommonQueryData,
53        },
54    },
55    data_source::{VersionedDataSource, update},
56    metrics::PrometheusMetrics,
57    node::{SyncStatusQueryData, TimeWindowQueryData, WindowStart},
58    status::HasMetrics,
59    types::HeightIndexed,
60};
61
62const CACHED_LEAVES_COUNT: usize = 100;
63const CACHED_BLOCKS_COUNT: usize = 100;
64const CACHED_VID_COMMON_COUNT: usize = 100;
65const CACHED_CERT2_COUNT: usize = 100;
66
67#[derive(custom_debug::Debug)]
68pub struct FileSystemStorageInner<Types>
69where
70    Types: NodeType,
71    Header<Types>: QueryableHeader<Types>,
72    Payload<Types>: QueryablePayload<Types>,
73{
74    index_by_leaf_hash: HashMap<LeafHash<Types>, u64>,
75    index_by_block_hash: HashMap<BlockHash<Types>, u64>,
76    index_by_payload_hash: HashMap<VidCommitment, u64>,
77    index_by_txn_hash: HashMap<TransactionHash<Types>, u64>,
78    index_by_time: BTreeMap<u64, Vec<u64>>,
79    num_transactions: usize,
80    payload_size: usize,
81    #[debug(skip)]
82    top_storage: Option<AtomicStore>,
83    leaf_storage: LedgerLog<LeafQueryData<Types>>,
84    block_storage: LedgerLog<BlockQueryData<Types>>,
85    vid_storage: LedgerLog<(VidCommonQueryData<Types>, Option<VidShare>)>,
86    latest_qc_chain: Option<[CertificatePair<Types>; 2]>,
87    cert2_storage: LedgerLog<Certificate2<Types>>,
88}
89
90impl<Types> FileSystemStorageInner<Types>
91where
92    Types: NodeType,
93    Header<Types>: QueryableHeader<Types>,
94    Payload<Types>: QueryablePayload<Types>,
95{
96    fn get_block_index(&self, id: BlockId<Types>) -> QueryResult<usize> {
97        match id {
98            BlockId::Number(n) => Ok(n),
99            BlockId::Hash(h) => {
100                Ok(*self.index_by_block_hash.get(&h).context(NotFoundSnafu)? as usize)
101            },
102            BlockId::PayloadHash(h) => {
103                Ok(*self.index_by_payload_hash.get(&h).context(NotFoundSnafu)? as usize)
104            },
105        }
106    }
107
108    fn get_block(&self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
109        self.block_storage
110            .iter()
111            .nth(self.get_block_index(id)?)
112            .context(NotFoundSnafu)?
113            .context(MissingSnafu)
114    }
115
116    fn get_header(&self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
117        self.get_block(id).map(|block| block.header)
118    }
119
120    fn get_block_range<R>(&self, range: R) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
121    where
122        R: RangeBounds<usize> + Send,
123    {
124        Ok(range_iter(self.block_storage.iter(), range).collect())
125    }
126}
127
128/// Storage for the APIs provided in this crate, backed by a remote PostgreSQL database.
129#[derive(Debug)]
130pub struct FileSystemStorage<Types: NodeType>
131where
132    Header<Types>: QueryableHeader<Types>,
133    Payload<Types>: QueryablePayload<Types>,
134{
135    inner: RwLock<FileSystemStorageInner<Types>>,
136    metrics: PrometheusMetrics,
137}
138
139impl<Types: NodeType> PrunerConfig for FileSystemStorage<Types>
140where
141    Header<Types>: QueryableHeader<Types>,
142    Payload<Types>: QueryablePayload<Types>,
143{
144}
145impl<Types: NodeType> PruneStorage for FileSystemStorage<Types>
146where
147    Header<Types>: QueryableHeader<Types>,
148    Payload<Types>: QueryablePayload<Types>,
149{
150    type Pruner = ();
151}
152
153impl<Types: NodeType> FileSystemStorage<Types>
154where
155    Payload<Types>: QueryablePayload<Types>,
156    Header<Types>: QueryableHeader<Types>,
157{
158    /// Create a new [FileSystemStorage] with storage at `path`.
159    ///
160    /// If there is already data at `path`, it will be archived.
161    ///
162    /// The [FileSystemStorage] will manage its own persistence synchronization.
163    pub async fn create(path: &Path) -> Result<Self, PersistenceError> {
164        let mut loader = AtomicStoreLoader::create(path, "hotshot_data_source")?;
165        loader.retain_archives(1);
166        let data_source = Self::create_with_store(&mut loader).await?;
167        data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
168        Ok(data_source)
169    }
170
171    /// Open an existing [FileSystemStorage] from storage at `path`.
172    ///
173    /// If there is no data at `path`, a new store will be created.
174    ///
175    /// The [FileSystemStorage] will manage its own persistence synchronization.
176    pub async fn open(path: &Path) -> Result<Self, PersistenceError> {
177        let mut loader = AtomicStoreLoader::load(path, "hotshot_data_source")?;
178        loader.retain_archives(1);
179        let data_source = Self::open_with_store(&mut loader).await?;
180        data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
181        Ok(data_source)
182    }
183
184    /// Create a new [FileSystemStorage] using a persistent storage loader.
185    ///
186    /// If there is existing data corresponding to the [FileSystemStorage] data structures, it will
187    /// be archived.
188    ///
189    /// The [FileSystemStorage] will register its persistent data structures with `loader`. The
190    /// caller is responsible for creating an [AtomicStore] from `loader` and managing
191    /// synchronization of the store.
192    pub async fn create_with_store(
193        loader: &mut AtomicStoreLoader,
194    ) -> Result<Self, PersistenceError> {
195        Ok(Self {
196            inner: RwLock::new(FileSystemStorageInner {
197                index_by_leaf_hash: Default::default(),
198                index_by_block_hash: Default::default(),
199                index_by_payload_hash: Default::default(),
200                index_by_txn_hash: Default::default(),
201                index_by_time: Default::default(),
202                num_transactions: 0,
203                payload_size: 0,
204                top_storage: None,
205                leaf_storage: LedgerLog::create(loader, "leaves", CACHED_LEAVES_COUNT)?,
206                block_storage: LedgerLog::create(loader, "blocks", CACHED_BLOCKS_COUNT)?,
207                vid_storage: LedgerLog::create(loader, "vid_common", CACHED_VID_COMMON_COUNT)?,
208                cert2_storage: LedgerLog::create(loader, "cert2", CACHED_CERT2_COUNT)?,
209                latest_qc_chain: None,
210            }),
211            metrics: Default::default(),
212        })
213    }
214
215    /// Open an existing [FileSystemStorage] using a persistent storage loader.
216    ///
217    /// If there is no existing data corresponding to the [FileSystemStorage] data structures, a new
218    /// store will be created.
219    ///
220    /// The [FileSystemStorage] will register its persistent data structures with `loader`. The
221    /// caller is responsible for creating an [AtomicStore] from `loader` and managing
222    /// synchronization of the store.
223    pub async fn open_with_store(loader: &mut AtomicStoreLoader) -> Result<Self, PersistenceError> {
224        let leaf_storage =
225            LedgerLog::<LeafQueryData<Types>>::open(loader, "leaves", CACHED_LEAVES_COUNT)?;
226        let block_storage =
227            LedgerLog::<BlockQueryData<Types>>::open(loader, "blocks", CACHED_BLOCKS_COUNT)?;
228        let vid_storage = LedgerLog::<(VidCommonQueryData<Types>, Option<VidShare>)>::open(
229            loader,
230            "vid_common",
231            CACHED_VID_COMMON_COUNT,
232        )?;
233        let cert2_storage =
234            LedgerLog::<Certificate2<Types>>::open(loader, "cert2", CACHED_CERT2_COUNT)?;
235
236        let mut index_by_block_hash = HashMap::new();
237        let mut index_by_payload_hash = HashMap::new();
238        let mut index_by_time = BTreeMap::<u64, Vec<u64>>::new();
239        let index_by_leaf_hash = leaf_storage
240            .iter()
241            .flatten()
242            .map(|leaf| {
243                update_index_by_hash(&mut index_by_block_hash, leaf.block_hash(), leaf.height());
244                update_index_by_hash(
245                    &mut index_by_payload_hash,
246                    leaf.payload_hash(),
247                    leaf.height(),
248                );
249                index_by_time
250                    .entry(leaf.header().timestamp())
251                    .or_default()
252                    .push(leaf.height());
253                (leaf.hash(), leaf.height())
254            })
255            .collect();
256
257        let mut index_by_txn_hash = HashMap::new();
258        let mut num_transactions = 0;
259        let mut payload_size = 0;
260        for block in block_storage.iter().flatten() {
261            num_transactions += block.len();
262            payload_size += block.size() as usize;
263
264            let height = block.height();
265            for (_, txn) in block.enumerate() {
266                update_index_by_hash(&mut index_by_txn_hash, txn.commit(), height);
267            }
268        }
269
270        Ok(Self {
271            inner: RwLock::new(FileSystemStorageInner {
272                index_by_leaf_hash,
273                index_by_block_hash,
274                index_by_payload_hash,
275                index_by_txn_hash,
276                index_by_time,
277                num_transactions,
278                payload_size,
279                leaf_storage,
280                block_storage,
281                vid_storage,
282                cert2_storage,
283                top_storage: None,
284                latest_qc_chain: None,
285            }),
286            metrics: Default::default(),
287        })
288    }
289
290    /// Advance the version of the persistent store without committing changes to persistent state.
291    pub async fn skip_version(&self) -> Result<(), PersistenceError> {
292        let mut inner = self.inner.write().await;
293        inner.leaf_storage.skip_version()?;
294        inner.block_storage.skip_version()?;
295        inner.vid_storage.skip_version()?;
296        inner.cert2_storage.skip_version()?;
297        if let Some(store) = &mut inner.top_storage {
298            store.commit_version()?;
299        }
300        Ok(())
301    }
302
303    /// Get the stored VID share for a given block, if one exists.
304    pub async fn get_vid_share(&self, block_id: BlockId<Types>) -> QueryResult<VidShare> {
305        let mut tx = self.read().await.map_err(|err| QueryError::Error {
306            message: err.to_string(),
307        })?;
308        let share = tx.vid_share(block_id).await?;
309        Ok(share)
310    }
311
312    /// Get the stored VID common data for a given block, if one exists.
313    pub async fn get_vid_common(
314        &self,
315        block_id: BlockId<Types>,
316    ) -> QueryResult<VidCommonQueryData<Types>> {
317        let mut tx = self.read().await.map_err(|err| QueryError::Error {
318            message: err.to_string(),
319        })?;
320        let share = tx.get_vid_common(block_id).await?;
321        Ok(share)
322    }
323
324    /// Get the stored VID common metadata for a given block, if one exists.
325    pub async fn get_vid_common_metadata(
326        &self,
327        block_id: BlockId<Types>,
328    ) -> QueryResult<VidCommonMetadata<Types>> {
329        let mut tx = self.read().await.map_err(|err| QueryError::Error {
330            message: err.to_string(),
331        })?;
332        let share = tx.get_vid_common_metadata(block_id).await?;
333        Ok(share)
334    }
335}
336
337pub trait Revert {
338    fn revert(&mut self);
339}
340
341impl<Types> Revert for RwLockWriteGuard<'_, FileSystemStorageInner<Types>>
342where
343    Types: NodeType,
344    Header<Types>: QueryableHeader<Types>,
345    Payload<Types>: QueryablePayload<Types>,
346{
347    fn revert(&mut self) {
348        self.leaf_storage.revert_version().unwrap();
349        self.block_storage.revert_version().unwrap();
350        self.vid_storage.revert_version().unwrap();
351        self.cert2_storage.revert_version().unwrap();
352    }
353}
354
355impl<Types> Revert for RwLockReadGuard<'_, FileSystemStorageInner<Types>>
356where
357    Types: NodeType,
358    Header<Types>: QueryableHeader<Types>,
359    Payload<Types>: QueryablePayload<Types>,
360{
361    fn revert(&mut self) {
362        // Nothing to revert for a read-only transaction.
363    }
364}
365
366#[derive(Debug)]
367pub struct Transaction<T: Revert> {
368    inner: T,
369}
370
371impl<T: Revert> Drop for Transaction<T> {
372    fn drop(&mut self) {
373        self.inner.revert();
374    }
375}
376impl<Types> update::Transaction for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
377where
378    Types: NodeType,
379    Header<Types>: QueryableHeader<Types>,
380    Payload<Types>: QueryablePayload<Types>,
381{
382    async fn commit(mut self) -> anyhow::Result<()> {
383        self.inner.leaf_storage.commit_version().await?;
384        self.inner.block_storage.commit_version().await?;
385        self.inner.vid_storage.commit_version().await?;
386        self.inner.cert2_storage.commit_version().await?;
387        if let Some(store) = &mut self.inner.top_storage {
388            store.commit_version()?;
389        }
390        Ok(())
391    }
392
393    fn revert(self) -> impl Future + Send {
394        // The revert is handled when `self` is dropped.
395        async move {}
396    }
397}
398
399impl<Types> update::Transaction for Transaction<RwLockReadGuard<'_, FileSystemStorageInner<Types>>>
400where
401    Types: NodeType,
402    Header<Types>: QueryableHeader<Types>,
403    Payload<Types>: QueryablePayload<Types>,
404{
405    async fn commit(self) -> anyhow::Result<()> {
406        // Nothing to commit for a read-only transaction.
407        Ok(())
408    }
409
410    fn revert(self) -> impl Future + Send {
411        // The revert is handled when `self` is dropped.
412        async move {}
413    }
414}
415
416impl<Types: NodeType> VersionedDataSource for FileSystemStorage<Types>
417where
418    Header<Types>: QueryableHeader<Types>,
419    Payload<Types>: QueryablePayload<Types>,
420{
421    type Transaction<'a>
422        = Transaction<RwLockWriteGuard<'a, FileSystemStorageInner<Types>>>
423    where
424        Self: 'a;
425    type ReadOnly<'a>
426        = Transaction<RwLockReadGuard<'a, FileSystemStorageInner<Types>>>
427    where
428        Self: 'a;
429
430    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
431        Ok(Transaction {
432            inner: self.inner.write().await,
433        })
434    }
435
436    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
437        Ok(Transaction {
438            inner: self.inner.read().await,
439        })
440    }
441}
442fn range_iter<T>(
443    mut iter: Iter<'_, T>,
444    range: impl RangeBounds<usize>,
445) -> impl '_ + Iterator<Item = QueryResult<T>>
446where
447    T: Clone + Serialize + DeserializeOwned,
448{
449    let start = range.start_bound().cloned();
450    let end = range.end_bound().cloned();
451
452    // Advance the underlying iterator to the start of the range.
453    let mut pos = match start {
454        Bound::Included(n) => {
455            if n > 0 {
456                iter.nth(n - 1);
457            }
458            n
459        },
460        Bound::Excluded(n) => {
461            iter.nth(n);
462            n + 1
463        },
464        Bound::Unbounded => 0,
465    };
466
467    iter::from_fn(move || {
468        // Check if we have reached the end of the range.
469        let reached_end = match end {
470            Bound::Included(n) => pos > n,
471            Bound::Excluded(n) => pos >= n,
472            Bound::Unbounded => false,
473        };
474        if reached_end {
475            return None;
476        }
477        let opt = iter.next()?;
478        pos += 1;
479        Some(opt.context(MissingSnafu))
480    })
481}
482
483#[async_trait]
484impl<Types, T> AvailabilityStorage<Types> for Transaction<T>
485where
486    Types: NodeType,
487    Payload<Types>: QueryablePayload<Types>,
488    Header<Types>: QueryableHeader<Types>,
489    T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
490{
491    async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
492        let n = match id {
493            LeafId::Number(n) => n,
494            LeafId::Hash(h) => *self
495                .inner
496                .index_by_leaf_hash
497                .get(&h)
498                .context(NotFoundSnafu)? as usize,
499        };
500        self.inner
501            .leaf_storage
502            .iter()
503            .nth(n)
504            .context(NotFoundSnafu)?
505            .context(MissingSnafu)
506    }
507
508    async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
509        self.inner.get_block(id)
510    }
511
512    async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
513        self.inner.get_header(id)
514    }
515
516    async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
517        self.get_block(id).await.map(PayloadQueryData::from)
518    }
519
520    async fn get_payload_metadata(
521        &mut self,
522        id: BlockId<Types>,
523    ) -> QueryResult<PayloadMetadata<Types>> {
524        self.get_block(id).await.map(PayloadMetadata::from)
525    }
526
527    async fn get_vid_common(
528        &mut self,
529        id: BlockId<Types>,
530    ) -> QueryResult<VidCommonQueryData<Types>> {
531        Ok(self
532            .inner
533            .vid_storage
534            .iter()
535            .nth(self.inner.get_block_index(id)?)
536            .context(NotFoundSnafu)?
537            .context(MissingSnafu)?
538            .0)
539    }
540
541    async fn get_vid_common_metadata(
542        &mut self,
543        id: BlockId<Types>,
544    ) -> QueryResult<VidCommonMetadata<Types>> {
545        self.get_vid_common(id).await.map(VidCommonMetadata::from)
546    }
547
548    async fn get_leaf_range<R>(
549        &mut self,
550        range: R,
551    ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
552    where
553        R: RangeBounds<usize> + Send,
554    {
555        Ok(range_iter(self.inner.leaf_storage.iter(), range).collect())
556    }
557
558    async fn get_block_range<R>(
559        &mut self,
560        range: R,
561    ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
562    where
563        R: RangeBounds<usize> + Send,
564    {
565        self.inner.get_block_range(range)
566    }
567
568    async fn get_payload_range<R>(
569        &mut self,
570        range: R,
571    ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
572    where
573        R: RangeBounds<usize> + Send,
574    {
575        Ok(range_iter(self.inner.block_storage.iter(), range)
576            .map(|res| res.map(PayloadQueryData::from))
577            .collect())
578    }
579
580    async fn get_payload_metadata_range<R>(
581        &mut self,
582        range: R,
583    ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
584    where
585        R: RangeBounds<usize> + Send + 'static,
586    {
587        Ok(range_iter(self.inner.block_storage.iter(), range)
588            .map(|res| res.map(PayloadMetadata::from))
589            .collect())
590    }
591
592    async fn get_vid_common_range<R>(
593        &mut self,
594        range: R,
595    ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
596    where
597        R: RangeBounds<usize> + Send,
598    {
599        Ok(range_iter(self.inner.vid_storage.iter(), range)
600            .map(|res| res.map(|(common, _)| common))
601            .collect())
602    }
603
604    async fn get_vid_common_metadata_range<R>(
605        &mut self,
606        range: R,
607    ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
608    where
609        R: RangeBounds<usize> + Send,
610    {
611        Ok(range_iter(self.inner.vid_storage.iter(), range)
612            .map(|res| res.map(|(common, _)| common.into()))
613            .collect())
614    }
615
616    async fn get_block_with_transaction(
617        &mut self,
618        hash: TransactionHash<Types>,
619    ) -> QueryResult<BlockQueryData<Types>> {
620        let height = self
621            .inner
622            .index_by_txn_hash
623            .get(&hash)
624            .context(NotFoundSnafu)?;
625        self.inner.get_block((*height as usize).into())
626    }
627}
628
629impl<Types: NodeType> UpdateAvailabilityStorage<Types>
630    for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
631where
632    Payload<Types>: QueryablePayload<Types>,
633    Header<Types>: QueryableHeader<Types>,
634{
635    async fn insert_qc_chain(
636        &mut self,
637        height: u64,
638        qc_chain: Option<[CertificatePair<Types>; 2]>,
639    ) -> anyhow::Result<()> {
640        if height + 1 >= (self.inner.leaf_storage.iter().len() as u64) {
641            // If this QC chain is for the latest leaf we know about, store it so that we can prove
642            // to clients that the corresponding leaf is finalized. (If it is not the latest leaf,
643            // this is unnecessary, since we can prove it is an ancestor of some later, finalized
644            // leaf.)
645            if let Some(qc_chain) = qc_chain {
646                self.inner.latest_qc_chain = Some(qc_chain);
647            } else {
648                // Since we have a new latest leaf, we have to update latest QC chain even if we
649                // don't actually have a QC chain to store.
650                self.inner.latest_qc_chain = None;
651            }
652        }
653
654        Ok(())
655    }
656
657    async fn insert_cert2(
658        &mut self,
659        height: u64,
660        cert2: Certificate2<Types>,
661    ) -> anyhow::Result<()> {
662        self.inner.cert2_storage.insert(height as usize, cert2)?;
663        Ok(())
664    }
665
666    async fn insert_leaf_range<'a>(
667        &mut self,
668        leaves: impl Send + IntoIterator<Item = &'a LeafQueryData<Types>>,
669    ) -> anyhow::Result<()> {
670        for leaf in leaves {
671            if !self
672                .inner
673                .leaf_storage
674                .insert(leaf.height() as usize, leaf.clone())?
675            {
676                // The leaf was already present.
677                continue;
678            }
679            self.inner
680                .index_by_leaf_hash
681                .insert(leaf.hash(), leaf.height());
682            update_index_by_hash(
683                &mut self.inner.index_by_block_hash,
684                leaf.block_hash(),
685                leaf.height(),
686            );
687            update_index_by_hash(
688                &mut self.inner.index_by_payload_hash,
689                leaf.payload_hash(),
690                leaf.height(),
691            );
692            self.inner
693                .index_by_time
694                .entry(leaf.header().timestamp())
695                .or_default()
696                .push(leaf.height());
697        }
698
699        Ok(())
700    }
701
702    async fn insert_block_range<'a>(
703        &mut self,
704        blocks: impl Send + IntoIterator<IntoIter: Send, Item = &'a BlockQueryData<Types>>,
705    ) -> anyhow::Result<()> {
706        for block in blocks {
707            if !self
708                .inner
709                .block_storage
710                .insert(block.height() as usize, block.clone())?
711            {
712                // The block was already present.
713                continue;
714            }
715            self.inner.num_transactions += block.len();
716            self.inner.payload_size += block.size() as usize;
717            for (_, txn) in block.enumerate() {
718                update_index_by_hash(
719                    &mut self.inner.index_by_txn_hash,
720                    txn.commit(),
721                    block.height(),
722                );
723            }
724        }
725        Ok(())
726    }
727
728    async fn insert_vid_range<'a>(
729        &mut self,
730        vid: impl Send
731        + IntoIterator<
732            IntoIter: Send,
733            Item = (&'a VidCommonQueryData<Types>, Option<&'a VidShare>),
734        >,
735    ) -> anyhow::Result<()> {
736        for (common, share) in vid {
737            self.inner
738                .vid_storage
739                .insert(common.height() as usize, (common.clone(), share.cloned()))?;
740        }
741        Ok(())
742    }
743}
744
745/// Update an index mapping hashes of objects to their positions in the ledger.
746///
747/// This function will insert the mapping from `hash` to `pos` into `index`, _unless_ there is
748/// already an entry for `hash` at an earlier position in the ledger.
749fn update_index_by_hash<H: Eq + Hash, P: Ord>(index: &mut HashMap<H, P>, hash: H, pos: P) {
750    match index.entry(hash) {
751        Entry::Occupied(mut e) => {
752            if &pos < e.get() {
753                // Overwrite the existing entry if the new object was sequenced first.
754                e.insert(pos);
755            }
756        },
757        Entry::Vacant(e) => {
758            e.insert(pos);
759        },
760    }
761}
762
763#[async_trait]
764impl<Types, T> NodeStorage<Types> for Transaction<T>
765where
766    Types: NodeType,
767    Payload<Types>: QueryablePayload<Types>,
768    Header<Types>: QueryableHeader<Types>,
769    T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
770{
771    async fn block_height(&mut self) -> QueryResult<usize> {
772        Ok(self.inner.leaf_storage.iter().len())
773    }
774
775    async fn count_transactions_in_range(
776        &mut self,
777        range: impl RangeBounds<usize> + Send,
778        namespace: Option<NamespaceId<Types>>,
779    ) -> QueryResult<usize> {
780        if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
781            || !matches!(range.end_bound(), Bound::Unbounded)
782        {
783            return Err(QueryError::Error {
784                message: "partial aggregates are not supported with file system backend".into(),
785            });
786        }
787
788        if namespace.is_some() {
789            return Err(QueryError::Error {
790                message: "file system does not support per-namespace stats".into(),
791            });
792        }
793
794        Ok(self.inner.num_transactions)
795    }
796
797    async fn payload_size_in_range(
798        &mut self,
799        range: impl RangeBounds<usize> + Send,
800        namespace: Option<NamespaceId<Types>>,
801    ) -> QueryResult<usize> {
802        if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
803            || !matches!(range.end_bound(), Bound::Unbounded)
804        {
805            return Err(QueryError::Error {
806                message: "partial aggregates are not supported with file system backend".into(),
807            });
808        }
809
810        if namespace.is_some() {
811            return Err(QueryError::Error {
812                message: "file system does not support per-namespace stats".into(),
813            });
814        }
815
816        Ok(self.inner.payload_size)
817    }
818
819    async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
820    where
821        ID: Into<BlockId<Types>> + Send + Sync,
822    {
823        self.inner
824            .vid_storage
825            .iter()
826            .nth(self.inner.get_block_index(id.into())?)
827            .context(NotFoundSnafu)?
828            .context(MissingSnafu)?
829            .1
830            .context(MissingSnafu)
831    }
832
833    async fn sync_status_for_range(
834        &mut self,
835        start: usize,
836        end: usize,
837    ) -> QueryResult<SyncStatusQueryData> {
838        Ok(SyncStatusQueryData {
839            leaves: self.inner.leaf_storage.sync_status(start, end),
840            blocks: self.inner.block_storage.sync_status(start, end),
841            vid_common: self.inner.vid_storage.sync_status(start, end),
842            pruned_height: None,
843        })
844    }
845
846    async fn get_header_window(
847        &mut self,
848        start: impl Into<WindowStart<Types>> + Send + Sync,
849        end: u64,
850        limit: usize,
851    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
852        let first_block = match start.into() {
853            WindowStart::Height(h) => h,
854            WindowStart::Hash(h) => self.inner.get_header(h.into())?.block_number(),
855            WindowStart::Time(t) => {
856                // Find the minimum timestamp which is at least `t`, and all the blocks with
857                // that timestamp.
858                let blocks = self
859                    .inner
860                    .index_by_time
861                    .range(t..)
862                    .next()
863                    .context(NotFoundSnafu)?
864                    .1;
865                // Multiple blocks can have the same timestamp (when truncated to seconds);
866                // we want the first one. It is an invariant that any timestamp which has an
867                // entry in `index_by_time` has a non-empty list associated with it, so this
868                // indexing is safe.
869                blocks[0]
870            },
871        } as usize;
872
873        let mut res = TimeWindowQueryData::default();
874
875        // Include the block just before the start of the window, if there is one.
876        if first_block > 0 {
877            res.prev = Some(self.inner.get_header((first_block - 1).into())?);
878        }
879
880        // Add blocks to the window, starting from `first_block`, until we reach the end of
881        // the requested time window.
882        for block in self.inner.get_block_range(first_block..)? {
883            let header = block?.header().clone();
884            if header.timestamp() >= end {
885                res.next = Some(header);
886                break;
887            }
888            res.window.push(header);
889            if res.window.len() >= limit {
890                break;
891            }
892        }
893
894        Ok(res)
895    }
896
897    async fn latest_qc_chain(&mut self) -> QueryResult<Option<[CertificatePair<Types>; 2]>> {
898        Ok(self.inner.latest_qc_chain.clone())
899    }
900
901    async fn load_cert2(&mut self, height: u64) -> QueryResult<Option<Certificate2<Types>>> {
902        Ok(self
903            .inner
904            .cert2_storage
905            .iter()
906            .nth(height as usize)
907            .flatten())
908    }
909
910    async fn load_earliest_cert2(
911        &mut self,
912        height: u64,
913    ) -> QueryResult<Option<Certificate2<Types>>> {
914        Ok(self
915            .inner
916            .cert2_storage
917            .iter()
918            .skip(height as usize)
919            .flatten()
920            .next())
921    }
922}
923
924impl<Types, T: Revert + Send> AggregatesStorage<Types> for Transaction<T>
925where
926    Types: NodeType,
927    Header<Types>: QueryableHeader<Types>,
928{
929    async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
930        Ok(0)
931    }
932
933    async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
934        Ok(None)
935    }
936}
937
938impl<Types, T: Revert + Send> UpdateAggregatesStorage<Types> for Transaction<T>
939where
940    Types: NodeType,
941    Header<Types>: QueryableHeader<Types>,
942{
943    async fn update_aggregates(
944        &mut self,
945        _prev: Aggregate<Types>,
946        _blocks: &[PayloadMetadata<Types>],
947    ) -> anyhow::Result<Aggregate<Types>> {
948        Ok(Aggregate::default())
949    }
950}
951
952impl<T: Revert> PrunedHeightStorage for Transaction<T> {}
953
954impl<Types> HasMetrics for FileSystemStorage<Types>
955where
956    Types: NodeType,
957    Header<Types>: QueryableHeader<Types>,
958    Payload<Types>: QueryablePayload<Types>,
959{
960    fn metrics(&self) -> &PrometheusMetrics {
961        &self.metrics
962    }
963}