1#![cfg(feature = "file-system-data-source")]
14
15use std::{
16 collections::{
17 BTreeMap,
18 hash_map::{Entry, HashMap},
19 },
20 hash::Hash,
21 iter,
22 ops::{Bound, Deref, RangeBounds},
23 path::Path,
24};
25
26use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
27use async_trait::async_trait;
28use atomic_store::{AtomicStore, AtomicStoreLoader, PersistenceError};
29use committable::Committable;
30use futures::future::Future;
31use hotshot_types::{
32 data::{VidCommitment, VidShare},
33 simple_certificate::CertificatePair,
34 traits::{block_contents::BlockHeader, node_implementation::NodeType},
35};
36use serde::{Serialize, de::DeserializeOwned};
37use snafu::OptionExt;
38
39use super::{
40 Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata,
41 UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata,
42 ledger_log::{Iter, LedgerLog},
43 pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig},
44};
45use crate::{
46 Header, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult,
47 availability::{
48 NamespaceId,
49 data_source::{BlockId, LeafId},
50 query_data::{
51 BlockHash, BlockQueryData, LeafHash, LeafQueryData, PayloadQueryData, QueryableHeader,
52 QueryablePayload, TransactionHash, VidCommonQueryData,
53 },
54 },
55 data_source::{VersionedDataSource, update},
56 metrics::PrometheusMetrics,
57 node::{SyncStatusQueryData, TimeWindowQueryData, WindowStart},
58 status::HasMetrics,
59 types::HeightIndexed,
60};
61
62const CACHED_LEAVES_COUNT: usize = 100;
63const CACHED_BLOCKS_COUNT: usize = 100;
64const CACHED_VID_COMMON_COUNT: usize = 100;
65
66#[derive(custom_debug::Debug)]
67pub struct FileSystemStorageInner<Types>
68where
69 Types: NodeType,
70 Header<Types>: QueryableHeader<Types>,
71 Payload<Types>: QueryablePayload<Types>,
72{
73 index_by_leaf_hash: HashMap<LeafHash<Types>, u64>,
74 index_by_block_hash: HashMap<BlockHash<Types>, u64>,
75 index_by_payload_hash: HashMap<VidCommitment, u64>,
76 index_by_txn_hash: HashMap<TransactionHash<Types>, u64>,
77 index_by_time: BTreeMap<u64, Vec<u64>>,
78 num_transactions: usize,
79 payload_size: usize,
80 #[debug(skip)]
81 top_storage: Option<AtomicStore>,
82 leaf_storage: LedgerLog<LeafQueryData<Types>>,
83 block_storage: LedgerLog<BlockQueryData<Types>>,
84 vid_storage: LedgerLog<(VidCommonQueryData<Types>, Option<VidShare>)>,
85 latest_qc_chain: Option<[CertificatePair<Types>; 2]>,
86}
87
88impl<Types> FileSystemStorageInner<Types>
89where
90 Types: NodeType,
91 Header<Types>: QueryableHeader<Types>,
92 Payload<Types>: QueryablePayload<Types>,
93{
94 fn get_block_index(&self, id: BlockId<Types>) -> QueryResult<usize> {
95 match id {
96 BlockId::Number(n) => Ok(n),
97 BlockId::Hash(h) => {
98 Ok(*self.index_by_block_hash.get(&h).context(NotFoundSnafu)? as usize)
99 },
100 BlockId::PayloadHash(h) => {
101 Ok(*self.index_by_payload_hash.get(&h).context(NotFoundSnafu)? as usize)
102 },
103 }
104 }
105
106 fn get_block(&self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
107 self.block_storage
108 .iter()
109 .nth(self.get_block_index(id)?)
110 .context(NotFoundSnafu)?
111 .context(MissingSnafu)
112 }
113
114 fn get_header(&self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
115 self.get_block(id).map(|block| block.header)
116 }
117
118 fn get_block_range<R>(&self, range: R) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
119 where
120 R: RangeBounds<usize> + Send,
121 {
122 Ok(range_iter(self.block_storage.iter(), range).collect())
123 }
124}
125
126#[derive(Debug)]
128pub struct FileSystemStorage<Types: NodeType>
129where
130 Header<Types>: QueryableHeader<Types>,
131 Payload<Types>: QueryablePayload<Types>,
132{
133 inner: RwLock<FileSystemStorageInner<Types>>,
134 metrics: PrometheusMetrics,
135}
136
137impl<Types: NodeType> PrunerConfig for FileSystemStorage<Types>
138where
139 Header<Types>: QueryableHeader<Types>,
140 Payload<Types>: QueryablePayload<Types>,
141{
142}
143impl<Types: NodeType> PruneStorage for FileSystemStorage<Types>
144where
145 Header<Types>: QueryableHeader<Types>,
146 Payload<Types>: QueryablePayload<Types>,
147{
148 type Pruner = ();
149}
150
151impl<Types: NodeType> FileSystemStorage<Types>
152where
153 Payload<Types>: QueryablePayload<Types>,
154 Header<Types>: QueryableHeader<Types>,
155{
156 pub async fn create(path: &Path) -> Result<Self, PersistenceError> {
162 let mut loader = AtomicStoreLoader::create(path, "hotshot_data_source")?;
163 loader.retain_archives(1);
164 let data_source = Self::create_with_store(&mut loader).await?;
165 data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
166 Ok(data_source)
167 }
168
169 pub async fn open(path: &Path) -> Result<Self, PersistenceError> {
175 let mut loader = AtomicStoreLoader::load(path, "hotshot_data_source")?;
176 loader.retain_archives(1);
177 let data_source = Self::open_with_store(&mut loader).await?;
178 data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
179 Ok(data_source)
180 }
181
182 pub async fn create_with_store(
191 loader: &mut AtomicStoreLoader,
192 ) -> Result<Self, PersistenceError> {
193 Ok(Self {
194 inner: RwLock::new(FileSystemStorageInner {
195 index_by_leaf_hash: Default::default(),
196 index_by_block_hash: Default::default(),
197 index_by_payload_hash: Default::default(),
198 index_by_txn_hash: Default::default(),
199 index_by_time: Default::default(),
200 num_transactions: 0,
201 payload_size: 0,
202 top_storage: None,
203 leaf_storage: LedgerLog::create(loader, "leaves", CACHED_LEAVES_COUNT)?,
204 block_storage: LedgerLog::create(loader, "blocks", CACHED_BLOCKS_COUNT)?,
205 vid_storage: LedgerLog::create(loader, "vid_common", CACHED_VID_COMMON_COUNT)?,
206 latest_qc_chain: None,
207 }),
208 metrics: Default::default(),
209 })
210 }
211
212 pub async fn open_with_store(loader: &mut AtomicStoreLoader) -> Result<Self, PersistenceError> {
221 let leaf_storage =
222 LedgerLog::<LeafQueryData<Types>>::open(loader, "leaves", CACHED_LEAVES_COUNT)?;
223 let block_storage =
224 LedgerLog::<BlockQueryData<Types>>::open(loader, "blocks", CACHED_BLOCKS_COUNT)?;
225 let vid_storage = LedgerLog::<(VidCommonQueryData<Types>, Option<VidShare>)>::open(
226 loader,
227 "vid_common",
228 CACHED_VID_COMMON_COUNT,
229 )?;
230
231 let mut index_by_block_hash = HashMap::new();
232 let mut index_by_payload_hash = HashMap::new();
233 let mut index_by_time = BTreeMap::<u64, Vec<u64>>::new();
234 let index_by_leaf_hash = leaf_storage
235 .iter()
236 .flatten()
237 .map(|leaf| {
238 update_index_by_hash(&mut index_by_block_hash, leaf.block_hash(), leaf.height());
239 update_index_by_hash(
240 &mut index_by_payload_hash,
241 leaf.payload_hash(),
242 leaf.height(),
243 );
244 index_by_time
245 .entry(leaf.header().timestamp())
246 .or_default()
247 .push(leaf.height());
248 (leaf.hash(), leaf.height())
249 })
250 .collect();
251
252 let mut index_by_txn_hash = HashMap::new();
253 let mut num_transactions = 0;
254 let mut payload_size = 0;
255 for block in block_storage.iter().flatten() {
256 num_transactions += block.len();
257 payload_size += block.size() as usize;
258
259 let height = block.height();
260 for (_, txn) in block.enumerate() {
261 update_index_by_hash(&mut index_by_txn_hash, txn.commit(), height);
262 }
263 }
264
265 Ok(Self {
266 inner: RwLock::new(FileSystemStorageInner {
267 index_by_leaf_hash,
268 index_by_block_hash,
269 index_by_payload_hash,
270 index_by_txn_hash,
271 index_by_time,
272 num_transactions,
273 payload_size,
274 leaf_storage,
275 block_storage,
276 vid_storage,
277 top_storage: None,
278 latest_qc_chain: None,
279 }),
280 metrics: Default::default(),
281 })
282 }
283
284 pub async fn skip_version(&self) -> Result<(), PersistenceError> {
286 let mut inner = self.inner.write().await;
287 inner.leaf_storage.skip_version()?;
288 inner.block_storage.skip_version()?;
289 inner.vid_storage.skip_version()?;
290 if let Some(store) = &mut inner.top_storage {
291 store.commit_version()?;
292 }
293 Ok(())
294 }
295
296 pub async fn get_vid_share(&self, block_id: BlockId<Types>) -> QueryResult<VidShare> {
298 let mut tx = self.read().await.map_err(|err| QueryError::Error {
299 message: err.to_string(),
300 })?;
301 let share = tx.vid_share(block_id).await?;
302 Ok(share)
303 }
304
305 pub async fn get_vid_common(
307 &self,
308 block_id: BlockId<Types>,
309 ) -> QueryResult<VidCommonQueryData<Types>> {
310 let mut tx = self.read().await.map_err(|err| QueryError::Error {
311 message: err.to_string(),
312 })?;
313 let share = tx.get_vid_common(block_id).await?;
314 Ok(share)
315 }
316
317 pub async fn get_vid_common_metadata(
319 &self,
320 block_id: BlockId<Types>,
321 ) -> QueryResult<VidCommonMetadata<Types>> {
322 let mut tx = self.read().await.map_err(|err| QueryError::Error {
323 message: err.to_string(),
324 })?;
325 let share = tx.get_vid_common_metadata(block_id).await?;
326 Ok(share)
327 }
328}
329
330pub trait Revert {
331 fn revert(&mut self);
332}
333
334impl<Types> Revert for RwLockWriteGuard<'_, FileSystemStorageInner<Types>>
335where
336 Types: NodeType,
337 Header<Types>: QueryableHeader<Types>,
338 Payload<Types>: QueryablePayload<Types>,
339{
340 fn revert(&mut self) {
341 self.leaf_storage.revert_version().unwrap();
342 self.block_storage.revert_version().unwrap();
343 self.vid_storage.revert_version().unwrap();
344 }
345}
346
347impl<Types> Revert for RwLockReadGuard<'_, FileSystemStorageInner<Types>>
348where
349 Types: NodeType,
350 Header<Types>: QueryableHeader<Types>,
351 Payload<Types>: QueryablePayload<Types>,
352{
353 fn revert(&mut self) {
354 }
356}
357
358#[derive(Debug)]
359pub struct Transaction<T: Revert> {
360 inner: T,
361}
362
363impl<T: Revert> Drop for Transaction<T> {
364 fn drop(&mut self) {
365 self.inner.revert();
366 }
367}
368impl<Types> update::Transaction for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
369where
370 Types: NodeType,
371 Header<Types>: QueryableHeader<Types>,
372 Payload<Types>: QueryablePayload<Types>,
373{
374 async fn commit(mut self) -> anyhow::Result<()> {
375 self.inner.leaf_storage.commit_version().await?;
376 self.inner.block_storage.commit_version().await?;
377 self.inner.vid_storage.commit_version().await?;
378 if let Some(store) = &mut self.inner.top_storage {
379 store.commit_version()?;
380 }
381 Ok(())
382 }
383
384 fn revert(self) -> impl Future + Send {
385 async move {}
387 }
388}
389
390impl<Types> update::Transaction for Transaction<RwLockReadGuard<'_, FileSystemStorageInner<Types>>>
391where
392 Types: NodeType,
393 Header<Types>: QueryableHeader<Types>,
394 Payload<Types>: QueryablePayload<Types>,
395{
396 async fn commit(self) -> anyhow::Result<()> {
397 Ok(())
399 }
400
401 fn revert(self) -> impl Future + Send {
402 async move {}
404 }
405}
406
407impl<Types: NodeType> VersionedDataSource for FileSystemStorage<Types>
408where
409 Header<Types>: QueryableHeader<Types>,
410 Payload<Types>: QueryablePayload<Types>,
411{
412 type Transaction<'a>
413 = Transaction<RwLockWriteGuard<'a, FileSystemStorageInner<Types>>>
414 where
415 Self: 'a;
416 type ReadOnly<'a>
417 = Transaction<RwLockReadGuard<'a, FileSystemStorageInner<Types>>>
418 where
419 Self: 'a;
420
421 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
422 Ok(Transaction {
423 inner: self.inner.write().await,
424 })
425 }
426
427 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
428 Ok(Transaction {
429 inner: self.inner.read().await,
430 })
431 }
432}
433fn range_iter<T>(
434 mut iter: Iter<'_, T>,
435 range: impl RangeBounds<usize>,
436) -> impl '_ + Iterator<Item = QueryResult<T>>
437where
438 T: Clone + Serialize + DeserializeOwned,
439{
440 let start = range.start_bound().cloned();
441 let end = range.end_bound().cloned();
442
443 let mut pos = match start {
445 Bound::Included(n) => {
446 if n > 0 {
447 iter.nth(n - 1);
448 }
449 n
450 },
451 Bound::Excluded(n) => {
452 iter.nth(n);
453 n + 1
454 },
455 Bound::Unbounded => 0,
456 };
457
458 iter::from_fn(move || {
459 let reached_end = match end {
461 Bound::Included(n) => pos > n,
462 Bound::Excluded(n) => pos >= n,
463 Bound::Unbounded => false,
464 };
465 if reached_end {
466 return None;
467 }
468 let opt = iter.next()?;
469 pos += 1;
470 Some(opt.context(MissingSnafu))
471 })
472}
473
474#[async_trait]
475impl<Types, T> AvailabilityStorage<Types> for Transaction<T>
476where
477 Types: NodeType,
478 Payload<Types>: QueryablePayload<Types>,
479 Header<Types>: QueryableHeader<Types>,
480 T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
481{
482 async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
483 let n = match id {
484 LeafId::Number(n) => n,
485 LeafId::Hash(h) => *self
486 .inner
487 .index_by_leaf_hash
488 .get(&h)
489 .context(NotFoundSnafu)? as usize,
490 };
491 self.inner
492 .leaf_storage
493 .iter()
494 .nth(n)
495 .context(NotFoundSnafu)?
496 .context(MissingSnafu)
497 }
498
499 async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
500 self.inner.get_block(id)
501 }
502
503 async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
504 self.inner.get_header(id)
505 }
506
507 async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
508 self.get_block(id).await.map(PayloadQueryData::from)
509 }
510
511 async fn get_payload_metadata(
512 &mut self,
513 id: BlockId<Types>,
514 ) -> QueryResult<PayloadMetadata<Types>> {
515 self.get_block(id).await.map(PayloadMetadata::from)
516 }
517
518 async fn get_vid_common(
519 &mut self,
520 id: BlockId<Types>,
521 ) -> QueryResult<VidCommonQueryData<Types>> {
522 Ok(self
523 .inner
524 .vid_storage
525 .iter()
526 .nth(self.inner.get_block_index(id)?)
527 .context(NotFoundSnafu)?
528 .context(MissingSnafu)?
529 .0)
530 }
531
532 async fn get_vid_common_metadata(
533 &mut self,
534 id: BlockId<Types>,
535 ) -> QueryResult<VidCommonMetadata<Types>> {
536 self.get_vid_common(id).await.map(VidCommonMetadata::from)
537 }
538
539 async fn get_leaf_range<R>(
540 &mut self,
541 range: R,
542 ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
543 where
544 R: RangeBounds<usize> + Send,
545 {
546 Ok(range_iter(self.inner.leaf_storage.iter(), range).collect())
547 }
548
549 async fn get_block_range<R>(
550 &mut self,
551 range: R,
552 ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
553 where
554 R: RangeBounds<usize> + Send,
555 {
556 self.inner.get_block_range(range)
557 }
558
559 async fn get_payload_range<R>(
560 &mut self,
561 range: R,
562 ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
563 where
564 R: RangeBounds<usize> + Send,
565 {
566 Ok(range_iter(self.inner.block_storage.iter(), range)
567 .map(|res| res.map(PayloadQueryData::from))
568 .collect())
569 }
570
571 async fn get_payload_metadata_range<R>(
572 &mut self,
573 range: R,
574 ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
575 where
576 R: RangeBounds<usize> + Send + 'static,
577 {
578 Ok(range_iter(self.inner.block_storage.iter(), range)
579 .map(|res| res.map(PayloadMetadata::from))
580 .collect())
581 }
582
583 async fn get_vid_common_range<R>(
584 &mut self,
585 range: R,
586 ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
587 where
588 R: RangeBounds<usize> + Send,
589 {
590 Ok(range_iter(self.inner.vid_storage.iter(), range)
591 .map(|res| res.map(|(common, _)| common))
592 .collect())
593 }
594
595 async fn get_vid_common_metadata_range<R>(
596 &mut self,
597 range: R,
598 ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
599 where
600 R: RangeBounds<usize> + Send,
601 {
602 Ok(range_iter(self.inner.vid_storage.iter(), range)
603 .map(|res| res.map(|(common, _)| common.into()))
604 .collect())
605 }
606
607 async fn get_block_with_transaction(
608 &mut self,
609 hash: TransactionHash<Types>,
610 ) -> QueryResult<BlockQueryData<Types>> {
611 let height = self
612 .inner
613 .index_by_txn_hash
614 .get(&hash)
615 .context(NotFoundSnafu)?;
616 self.inner.get_block((*height as usize).into())
617 }
618
619 async fn first_available_leaf(&mut self, from: u64) -> QueryResult<LeafQueryData<Types>> {
620 self.get_leaf((from as usize).into()).await
624 }
625}
626
627impl<Types: NodeType> UpdateAvailabilityStorage<Types>
628 for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
629where
630 Payload<Types>: QueryablePayload<Types>,
631 Header<Types>: QueryableHeader<Types>,
632{
633 async fn insert_qc_chain(
634 &mut self,
635 height: u64,
636 qc_chain: Option<[CertificatePair<Types>; 2]>,
637 ) -> anyhow::Result<()> {
638 if height + 1 >= (self.inner.leaf_storage.iter().len() as u64) {
639 if let Some(qc_chain) = qc_chain {
644 self.inner.latest_qc_chain = Some(qc_chain);
645 } else {
646 self.inner.latest_qc_chain = None;
649 }
650 }
651
652 Ok(())
653 }
654
655 async fn insert_leaf_range<'a>(
656 &mut self,
657 leaves: impl Send + IntoIterator<Item = &'a LeafQueryData<Types>>,
658 ) -> anyhow::Result<()> {
659 for leaf in leaves {
660 if !self
661 .inner
662 .leaf_storage
663 .insert(leaf.height() as usize, leaf.clone())?
664 {
665 continue;
667 }
668 self.inner
669 .index_by_leaf_hash
670 .insert(leaf.hash(), leaf.height());
671 update_index_by_hash(
672 &mut self.inner.index_by_block_hash,
673 leaf.block_hash(),
674 leaf.height(),
675 );
676 update_index_by_hash(
677 &mut self.inner.index_by_payload_hash,
678 leaf.payload_hash(),
679 leaf.height(),
680 );
681 self.inner
682 .index_by_time
683 .entry(leaf.header().timestamp())
684 .or_default()
685 .push(leaf.height());
686 }
687
688 Ok(())
689 }
690
691 async fn insert_block_range<'a>(
692 &mut self,
693 blocks: impl Send + IntoIterator<IntoIter: Send, Item = &'a BlockQueryData<Types>>,
694 ) -> anyhow::Result<()> {
695 for block in blocks {
696 if !self
697 .inner
698 .block_storage
699 .insert(block.height() as usize, block.clone())?
700 {
701 continue;
703 }
704 self.inner.num_transactions += block.len();
705 self.inner.payload_size += block.size() as usize;
706 for (_, txn) in block.enumerate() {
707 update_index_by_hash(
708 &mut self.inner.index_by_txn_hash,
709 txn.commit(),
710 block.height(),
711 );
712 }
713 }
714 Ok(())
715 }
716
717 async fn insert_vid_range<'a>(
718 &mut self,
719 vid: impl Send
720 + IntoIterator<
721 IntoIter: Send,
722 Item = (&'a VidCommonQueryData<Types>, Option<&'a VidShare>),
723 >,
724 ) -> anyhow::Result<()> {
725 for (common, share) in vid {
726 self.inner
727 .vid_storage
728 .insert(common.height() as usize, (common.clone(), share.cloned()))?;
729 }
730 Ok(())
731 }
732}
733
734fn update_index_by_hash<H: Eq + Hash, P: Ord>(index: &mut HashMap<H, P>, hash: H, pos: P) {
739 match index.entry(hash) {
740 Entry::Occupied(mut e) => {
741 if &pos < e.get() {
742 e.insert(pos);
744 }
745 },
746 Entry::Vacant(e) => {
747 e.insert(pos);
748 },
749 }
750}
751
752#[async_trait]
753impl<Types, T> NodeStorage<Types> for Transaction<T>
754where
755 Types: NodeType,
756 Payload<Types>: QueryablePayload<Types>,
757 Header<Types>: QueryableHeader<Types>,
758 T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
759{
760 async fn block_height(&mut self) -> QueryResult<usize> {
761 Ok(self.inner.leaf_storage.iter().len())
762 }
763
764 async fn count_transactions_in_range(
765 &mut self,
766 range: impl RangeBounds<usize> + Send,
767 namespace: Option<NamespaceId<Types>>,
768 ) -> QueryResult<usize> {
769 if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
770 || !matches!(range.end_bound(), Bound::Unbounded)
771 {
772 return Err(QueryError::Error {
773 message: "partial aggregates are not supported with file system backend".into(),
774 });
775 }
776
777 if namespace.is_some() {
778 return Err(QueryError::Error {
779 message: "file system does not support per-namespace stats".into(),
780 });
781 }
782
783 Ok(self.inner.num_transactions)
784 }
785
786 async fn payload_size_in_range(
787 &mut self,
788 range: impl RangeBounds<usize> + Send,
789 namespace: Option<NamespaceId<Types>>,
790 ) -> QueryResult<usize> {
791 if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
792 || !matches!(range.end_bound(), Bound::Unbounded)
793 {
794 return Err(QueryError::Error {
795 message: "partial aggregates are not supported with file system backend".into(),
796 });
797 }
798
799 if namespace.is_some() {
800 return Err(QueryError::Error {
801 message: "file system does not support per-namespace stats".into(),
802 });
803 }
804
805 Ok(self.inner.payload_size)
806 }
807
808 async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
809 where
810 ID: Into<BlockId<Types>> + Send + Sync,
811 {
812 self.inner
813 .vid_storage
814 .iter()
815 .nth(self.inner.get_block_index(id.into())?)
816 .context(NotFoundSnafu)?
817 .context(MissingSnafu)?
818 .1
819 .context(MissingSnafu)
820 }
821
822 async fn sync_status_for_range(
823 &mut self,
824 start: usize,
825 end: usize,
826 ) -> QueryResult<SyncStatusQueryData> {
827 Ok(SyncStatusQueryData {
828 leaves: self.inner.leaf_storage.sync_status(start, end),
829 blocks: self.inner.block_storage.sync_status(start, end),
830 vid_common: self.inner.vid_storage.sync_status(start, end),
831 pruned_height: None,
832 })
833 }
834
835 async fn get_header_window(
836 &mut self,
837 start: impl Into<WindowStart<Types>> + Send + Sync,
838 end: u64,
839 limit: usize,
840 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
841 let first_block = match start.into() {
842 WindowStart::Height(h) => h,
843 WindowStart::Hash(h) => self.inner.get_header(h.into())?.block_number(),
844 WindowStart::Time(t) => {
845 let blocks = self
848 .inner
849 .index_by_time
850 .range(t..)
851 .next()
852 .context(NotFoundSnafu)?
853 .1;
854 blocks[0]
859 },
860 } as usize;
861
862 let mut res = TimeWindowQueryData::default();
863
864 if first_block > 0 {
866 res.prev = Some(self.inner.get_header((first_block - 1).into())?);
867 }
868
869 for block in self.inner.get_block_range(first_block..)? {
872 let header = block?.header().clone();
873 if header.timestamp() >= end {
874 res.next = Some(header);
875 break;
876 }
877 res.window.push(header);
878 if res.window.len() >= limit {
879 break;
880 }
881 }
882
883 Ok(res)
884 }
885
886 async fn latest_qc_chain(&mut self) -> QueryResult<Option<[CertificatePair<Types>; 2]>> {
887 Ok(self.inner.latest_qc_chain.clone())
888 }
889}
890
891impl<Types, T: Revert + Send> AggregatesStorage<Types> for Transaction<T>
892where
893 Types: NodeType,
894 Header<Types>: QueryableHeader<Types>,
895{
896 async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
897 Ok(0)
898 }
899
900 async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
901 Ok(None)
902 }
903}
904
905impl<Types, T: Revert + Send> UpdateAggregatesStorage<Types> for Transaction<T>
906where
907 Types: NodeType,
908 Header<Types>: QueryableHeader<Types>,
909{
910 async fn update_aggregates(
911 &mut self,
912 _prev: Aggregate<Types>,
913 _blocks: &[PayloadMetadata<Types>],
914 ) -> anyhow::Result<Aggregate<Types>> {
915 Ok(Aggregate::default())
916 }
917}
918
919impl<T: Revert> PrunedHeightStorage for Transaction<T> {}
920
921impl<Types> HasMetrics for FileSystemStorage<Types>
922where
923 Types: NodeType,
924 Header<Types>: QueryableHeader<Types>,
925 Payload<Types>: QueryablePayload<Types>,
926{
927 fn metrics(&self) -> &PrometheusMetrics {
928 &self.metrics
929 }
930}