1#![cfg(feature = "file-system-data-source")]
14
15use std::{
16 collections::{
17 BTreeMap,
18 hash_map::{Entry, HashMap},
19 },
20 hash::Hash,
21 iter,
22 ops::{Bound, Deref, RangeBounds},
23 path::Path,
24};
25
26use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
27use async_trait::async_trait;
28use atomic_store::{AtomicStore, AtomicStoreLoader, PersistenceError};
29use committable::Committable;
30use futures::future::Future;
31use hotshot_types::{
32 data::{VidCommitment, VidShare},
33 simple_certificate::CertificatePair,
34 traits::{block_contents::BlockHeader, node_implementation::NodeType},
35};
36use serde::{Serialize, de::DeserializeOwned};
37use snafu::OptionExt;
38
39use super::{
40 Aggregate, AggregatesStorage, AvailabilityStorage, NodeStorage, PayloadMetadata,
41 UpdateAggregatesStorage, UpdateAvailabilityStorage, VidCommonMetadata,
42 ledger_log::{Iter, LedgerLog},
43 pruning::{PruneStorage, PrunedHeightStorage, PrunerConfig},
44};
45use crate::{
46 Header, MissingSnafu, NotFoundSnafu, Payload, QueryError, QueryResult,
47 availability::{
48 Certificate2, NamespaceId,
49 data_source::{BlockId, LeafId},
50 query_data::{
51 BlockHash, BlockQueryData, LeafHash, LeafQueryData, PayloadQueryData, QueryableHeader,
52 QueryablePayload, TransactionHash, VidCommonQueryData,
53 },
54 },
55 data_source::{VersionedDataSource, update},
56 metrics::PrometheusMetrics,
57 node::{SyncStatusQueryData, TimeWindowQueryData, WindowStart},
58 status::HasMetrics,
59 types::HeightIndexed,
60};
61
62const CACHED_LEAVES_COUNT: usize = 100;
63const CACHED_BLOCKS_COUNT: usize = 100;
64const CACHED_VID_COMMON_COUNT: usize = 100;
65const CACHED_CERT2_COUNT: usize = 100;
66
67#[derive(custom_debug::Debug)]
68pub struct FileSystemStorageInner<Types>
69where
70 Types: NodeType,
71 Header<Types>: QueryableHeader<Types>,
72 Payload<Types>: QueryablePayload<Types>,
73{
74 index_by_leaf_hash: HashMap<LeafHash<Types>, u64>,
75 index_by_block_hash: HashMap<BlockHash<Types>, u64>,
76 index_by_payload_hash: HashMap<VidCommitment, u64>,
77 index_by_txn_hash: HashMap<TransactionHash<Types>, u64>,
78 index_by_time: BTreeMap<u64, Vec<u64>>,
79 num_transactions: usize,
80 payload_size: usize,
81 #[debug(skip)]
82 top_storage: Option<AtomicStore>,
83 leaf_storage: LedgerLog<LeafQueryData<Types>>,
84 block_storage: LedgerLog<BlockQueryData<Types>>,
85 vid_storage: LedgerLog<(VidCommonQueryData<Types>, Option<VidShare>)>,
86 latest_qc_chain: Option<[CertificatePair<Types>; 2]>,
87 cert2_storage: LedgerLog<Certificate2<Types>>,
88}
89
90impl<Types> FileSystemStorageInner<Types>
91where
92 Types: NodeType,
93 Header<Types>: QueryableHeader<Types>,
94 Payload<Types>: QueryablePayload<Types>,
95{
96 fn get_block_index(&self, id: BlockId<Types>) -> QueryResult<usize> {
97 match id {
98 BlockId::Number(n) => Ok(n),
99 BlockId::Hash(h) => {
100 Ok(*self.index_by_block_hash.get(&h).context(NotFoundSnafu)? as usize)
101 },
102 BlockId::PayloadHash(h) => {
103 Ok(*self.index_by_payload_hash.get(&h).context(NotFoundSnafu)? as usize)
104 },
105 }
106 }
107
108 fn get_block(&self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
109 self.block_storage
110 .iter()
111 .nth(self.get_block_index(id)?)
112 .context(NotFoundSnafu)?
113 .context(MissingSnafu)
114 }
115
116 fn get_header(&self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
117 self.get_block(id).map(|block| block.header)
118 }
119
120 fn get_block_range<R>(&self, range: R) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
121 where
122 R: RangeBounds<usize> + Send,
123 {
124 Ok(range_iter(self.block_storage.iter(), range).collect())
125 }
126}
127
128#[derive(Debug)]
130pub struct FileSystemStorage<Types: NodeType>
131where
132 Header<Types>: QueryableHeader<Types>,
133 Payload<Types>: QueryablePayload<Types>,
134{
135 inner: RwLock<FileSystemStorageInner<Types>>,
136 metrics: PrometheusMetrics,
137}
138
139impl<Types: NodeType> PrunerConfig for FileSystemStorage<Types>
140where
141 Header<Types>: QueryableHeader<Types>,
142 Payload<Types>: QueryablePayload<Types>,
143{
144}
145impl<Types: NodeType> PruneStorage for FileSystemStorage<Types>
146where
147 Header<Types>: QueryableHeader<Types>,
148 Payload<Types>: QueryablePayload<Types>,
149{
150 type Pruner = ();
151}
152
153impl<Types: NodeType> FileSystemStorage<Types>
154where
155 Payload<Types>: QueryablePayload<Types>,
156 Header<Types>: QueryableHeader<Types>,
157{
158 pub async fn create(path: &Path) -> Result<Self, PersistenceError> {
164 let mut loader = AtomicStoreLoader::create(path, "hotshot_data_source")?;
165 loader.retain_archives(1);
166 let data_source = Self::create_with_store(&mut loader).await?;
167 data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
168 Ok(data_source)
169 }
170
171 pub async fn open(path: &Path) -> Result<Self, PersistenceError> {
177 let mut loader = AtomicStoreLoader::load(path, "hotshot_data_source")?;
178 loader.retain_archives(1);
179 let data_source = Self::open_with_store(&mut loader).await?;
180 data_source.inner.write().await.top_storage = Some(AtomicStore::open(loader)?);
181 Ok(data_source)
182 }
183
184 pub async fn create_with_store(
193 loader: &mut AtomicStoreLoader,
194 ) -> Result<Self, PersistenceError> {
195 Ok(Self {
196 inner: RwLock::new(FileSystemStorageInner {
197 index_by_leaf_hash: Default::default(),
198 index_by_block_hash: Default::default(),
199 index_by_payload_hash: Default::default(),
200 index_by_txn_hash: Default::default(),
201 index_by_time: Default::default(),
202 num_transactions: 0,
203 payload_size: 0,
204 top_storage: None,
205 leaf_storage: LedgerLog::create(loader, "leaves", CACHED_LEAVES_COUNT)?,
206 block_storage: LedgerLog::create(loader, "blocks", CACHED_BLOCKS_COUNT)?,
207 vid_storage: LedgerLog::create(loader, "vid_common", CACHED_VID_COMMON_COUNT)?,
208 cert2_storage: LedgerLog::create(loader, "cert2", CACHED_CERT2_COUNT)?,
209 latest_qc_chain: None,
210 }),
211 metrics: Default::default(),
212 })
213 }
214
215 pub async fn open_with_store(loader: &mut AtomicStoreLoader) -> Result<Self, PersistenceError> {
224 let leaf_storage =
225 LedgerLog::<LeafQueryData<Types>>::open(loader, "leaves", CACHED_LEAVES_COUNT)?;
226 let block_storage =
227 LedgerLog::<BlockQueryData<Types>>::open(loader, "blocks", CACHED_BLOCKS_COUNT)?;
228 let vid_storage = LedgerLog::<(VidCommonQueryData<Types>, Option<VidShare>)>::open(
229 loader,
230 "vid_common",
231 CACHED_VID_COMMON_COUNT,
232 )?;
233 let cert2_storage =
234 LedgerLog::<Certificate2<Types>>::open(loader, "cert2", CACHED_CERT2_COUNT)?;
235
236 let mut index_by_block_hash = HashMap::new();
237 let mut index_by_payload_hash = HashMap::new();
238 let mut index_by_time = BTreeMap::<u64, Vec<u64>>::new();
239 let index_by_leaf_hash = leaf_storage
240 .iter()
241 .flatten()
242 .map(|leaf| {
243 update_index_by_hash(&mut index_by_block_hash, leaf.block_hash(), leaf.height());
244 update_index_by_hash(
245 &mut index_by_payload_hash,
246 leaf.payload_hash(),
247 leaf.height(),
248 );
249 index_by_time
250 .entry(leaf.header().timestamp())
251 .or_default()
252 .push(leaf.height());
253 (leaf.hash(), leaf.height())
254 })
255 .collect();
256
257 let mut index_by_txn_hash = HashMap::new();
258 let mut num_transactions = 0;
259 let mut payload_size = 0;
260 for block in block_storage.iter().flatten() {
261 num_transactions += block.len();
262 payload_size += block.size() as usize;
263
264 let height = block.height();
265 for (_, txn) in block.enumerate() {
266 update_index_by_hash(&mut index_by_txn_hash, txn.commit(), height);
267 }
268 }
269
270 Ok(Self {
271 inner: RwLock::new(FileSystemStorageInner {
272 index_by_leaf_hash,
273 index_by_block_hash,
274 index_by_payload_hash,
275 index_by_txn_hash,
276 index_by_time,
277 num_transactions,
278 payload_size,
279 leaf_storage,
280 block_storage,
281 vid_storage,
282 cert2_storage,
283 top_storage: None,
284 latest_qc_chain: None,
285 }),
286 metrics: Default::default(),
287 })
288 }
289
290 pub async fn skip_version(&self) -> Result<(), PersistenceError> {
292 let mut inner = self.inner.write().await;
293 inner.leaf_storage.skip_version()?;
294 inner.block_storage.skip_version()?;
295 inner.vid_storage.skip_version()?;
296 inner.cert2_storage.skip_version()?;
297 if let Some(store) = &mut inner.top_storage {
298 store.commit_version()?;
299 }
300 Ok(())
301 }
302
303 pub async fn get_vid_share(&self, block_id: BlockId<Types>) -> QueryResult<VidShare> {
305 let mut tx = self.read().await.map_err(|err| QueryError::Error {
306 message: err.to_string(),
307 })?;
308 let share = tx.vid_share(block_id).await?;
309 Ok(share)
310 }
311
312 pub async fn get_vid_common(
314 &self,
315 block_id: BlockId<Types>,
316 ) -> QueryResult<VidCommonQueryData<Types>> {
317 let mut tx = self.read().await.map_err(|err| QueryError::Error {
318 message: err.to_string(),
319 })?;
320 let share = tx.get_vid_common(block_id).await?;
321 Ok(share)
322 }
323
324 pub async fn get_vid_common_metadata(
326 &self,
327 block_id: BlockId<Types>,
328 ) -> QueryResult<VidCommonMetadata<Types>> {
329 let mut tx = self.read().await.map_err(|err| QueryError::Error {
330 message: err.to_string(),
331 })?;
332 let share = tx.get_vid_common_metadata(block_id).await?;
333 Ok(share)
334 }
335}
336
337pub trait Revert {
338 fn revert(&mut self);
339}
340
341impl<Types> Revert for RwLockWriteGuard<'_, FileSystemStorageInner<Types>>
342where
343 Types: NodeType,
344 Header<Types>: QueryableHeader<Types>,
345 Payload<Types>: QueryablePayload<Types>,
346{
347 fn revert(&mut self) {
348 self.leaf_storage.revert_version().unwrap();
349 self.block_storage.revert_version().unwrap();
350 self.vid_storage.revert_version().unwrap();
351 self.cert2_storage.revert_version().unwrap();
352 }
353}
354
355impl<Types> Revert for RwLockReadGuard<'_, FileSystemStorageInner<Types>>
356where
357 Types: NodeType,
358 Header<Types>: QueryableHeader<Types>,
359 Payload<Types>: QueryablePayload<Types>,
360{
361 fn revert(&mut self) {
362 }
364}
365
366#[derive(Debug)]
367pub struct Transaction<T: Revert> {
368 inner: T,
369}
370
371impl<T: Revert> Drop for Transaction<T> {
372 fn drop(&mut self) {
373 self.inner.revert();
374 }
375}
376impl<Types> update::Transaction for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
377where
378 Types: NodeType,
379 Header<Types>: QueryableHeader<Types>,
380 Payload<Types>: QueryablePayload<Types>,
381{
382 async fn commit(mut self) -> anyhow::Result<()> {
383 self.inner.leaf_storage.commit_version().await?;
384 self.inner.block_storage.commit_version().await?;
385 self.inner.vid_storage.commit_version().await?;
386 self.inner.cert2_storage.commit_version().await?;
387 if let Some(store) = &mut self.inner.top_storage {
388 store.commit_version()?;
389 }
390 Ok(())
391 }
392
393 fn revert(self) -> impl Future + Send {
394 async move {}
396 }
397}
398
399impl<Types> update::Transaction for Transaction<RwLockReadGuard<'_, FileSystemStorageInner<Types>>>
400where
401 Types: NodeType,
402 Header<Types>: QueryableHeader<Types>,
403 Payload<Types>: QueryablePayload<Types>,
404{
405 async fn commit(self) -> anyhow::Result<()> {
406 Ok(())
408 }
409
410 fn revert(self) -> impl Future + Send {
411 async move {}
413 }
414}
415
416impl<Types: NodeType> VersionedDataSource for FileSystemStorage<Types>
417where
418 Header<Types>: QueryableHeader<Types>,
419 Payload<Types>: QueryablePayload<Types>,
420{
421 type Transaction<'a>
422 = Transaction<RwLockWriteGuard<'a, FileSystemStorageInner<Types>>>
423 where
424 Self: 'a;
425 type ReadOnly<'a>
426 = Transaction<RwLockReadGuard<'a, FileSystemStorageInner<Types>>>
427 where
428 Self: 'a;
429
430 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
431 Ok(Transaction {
432 inner: self.inner.write().await,
433 })
434 }
435
436 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
437 Ok(Transaction {
438 inner: self.inner.read().await,
439 })
440 }
441}
442fn range_iter<T>(
443 mut iter: Iter<'_, T>,
444 range: impl RangeBounds<usize>,
445) -> impl '_ + Iterator<Item = QueryResult<T>>
446where
447 T: Clone + Serialize + DeserializeOwned,
448{
449 let start = range.start_bound().cloned();
450 let end = range.end_bound().cloned();
451
452 let mut pos = match start {
454 Bound::Included(n) => {
455 if n > 0 {
456 iter.nth(n - 1);
457 }
458 n
459 },
460 Bound::Excluded(n) => {
461 iter.nth(n);
462 n + 1
463 },
464 Bound::Unbounded => 0,
465 };
466
467 iter::from_fn(move || {
468 let reached_end = match end {
470 Bound::Included(n) => pos > n,
471 Bound::Excluded(n) => pos >= n,
472 Bound::Unbounded => false,
473 };
474 if reached_end {
475 return None;
476 }
477 let opt = iter.next()?;
478 pos += 1;
479 Some(opt.context(MissingSnafu))
480 })
481}
482
483#[async_trait]
484impl<Types, T> AvailabilityStorage<Types> for Transaction<T>
485where
486 Types: NodeType,
487 Payload<Types>: QueryablePayload<Types>,
488 Header<Types>: QueryableHeader<Types>,
489 T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
490{
491 async fn get_leaf(&mut self, id: LeafId<Types>) -> QueryResult<LeafQueryData<Types>> {
492 let n = match id {
493 LeafId::Number(n) => n,
494 LeafId::Hash(h) => *self
495 .inner
496 .index_by_leaf_hash
497 .get(&h)
498 .context(NotFoundSnafu)? as usize,
499 };
500 self.inner
501 .leaf_storage
502 .iter()
503 .nth(n)
504 .context(NotFoundSnafu)?
505 .context(MissingSnafu)
506 }
507
508 async fn get_block(&mut self, id: BlockId<Types>) -> QueryResult<BlockQueryData<Types>> {
509 self.inner.get_block(id)
510 }
511
512 async fn get_header(&mut self, id: BlockId<Types>) -> QueryResult<Header<Types>> {
513 self.inner.get_header(id)
514 }
515
516 async fn get_payload(&mut self, id: BlockId<Types>) -> QueryResult<PayloadQueryData<Types>> {
517 self.get_block(id).await.map(PayloadQueryData::from)
518 }
519
520 async fn get_payload_metadata(
521 &mut self,
522 id: BlockId<Types>,
523 ) -> QueryResult<PayloadMetadata<Types>> {
524 self.get_block(id).await.map(PayloadMetadata::from)
525 }
526
527 async fn get_vid_common(
528 &mut self,
529 id: BlockId<Types>,
530 ) -> QueryResult<VidCommonQueryData<Types>> {
531 Ok(self
532 .inner
533 .vid_storage
534 .iter()
535 .nth(self.inner.get_block_index(id)?)
536 .context(NotFoundSnafu)?
537 .context(MissingSnafu)?
538 .0)
539 }
540
541 async fn get_vid_common_metadata(
542 &mut self,
543 id: BlockId<Types>,
544 ) -> QueryResult<VidCommonMetadata<Types>> {
545 self.get_vid_common(id).await.map(VidCommonMetadata::from)
546 }
547
548 async fn get_leaf_range<R>(
549 &mut self,
550 range: R,
551 ) -> QueryResult<Vec<QueryResult<LeafQueryData<Types>>>>
552 where
553 R: RangeBounds<usize> + Send,
554 {
555 Ok(range_iter(self.inner.leaf_storage.iter(), range).collect())
556 }
557
558 async fn get_block_range<R>(
559 &mut self,
560 range: R,
561 ) -> QueryResult<Vec<QueryResult<BlockQueryData<Types>>>>
562 where
563 R: RangeBounds<usize> + Send,
564 {
565 self.inner.get_block_range(range)
566 }
567
568 async fn get_payload_range<R>(
569 &mut self,
570 range: R,
571 ) -> QueryResult<Vec<QueryResult<PayloadQueryData<Types>>>>
572 where
573 R: RangeBounds<usize> + Send,
574 {
575 Ok(range_iter(self.inner.block_storage.iter(), range)
576 .map(|res| res.map(PayloadQueryData::from))
577 .collect())
578 }
579
580 async fn get_payload_metadata_range<R>(
581 &mut self,
582 range: R,
583 ) -> QueryResult<Vec<QueryResult<PayloadMetadata<Types>>>>
584 where
585 R: RangeBounds<usize> + Send + 'static,
586 {
587 Ok(range_iter(self.inner.block_storage.iter(), range)
588 .map(|res| res.map(PayloadMetadata::from))
589 .collect())
590 }
591
592 async fn get_vid_common_range<R>(
593 &mut self,
594 range: R,
595 ) -> QueryResult<Vec<QueryResult<VidCommonQueryData<Types>>>>
596 where
597 R: RangeBounds<usize> + Send,
598 {
599 Ok(range_iter(self.inner.vid_storage.iter(), range)
600 .map(|res| res.map(|(common, _)| common))
601 .collect())
602 }
603
604 async fn get_vid_common_metadata_range<R>(
605 &mut self,
606 range: R,
607 ) -> QueryResult<Vec<QueryResult<VidCommonMetadata<Types>>>>
608 where
609 R: RangeBounds<usize> + Send,
610 {
611 Ok(range_iter(self.inner.vid_storage.iter(), range)
612 .map(|res| res.map(|(common, _)| common.into()))
613 .collect())
614 }
615
616 async fn get_block_with_transaction(
617 &mut self,
618 hash: TransactionHash<Types>,
619 ) -> QueryResult<BlockQueryData<Types>> {
620 let height = self
621 .inner
622 .index_by_txn_hash
623 .get(&hash)
624 .context(NotFoundSnafu)?;
625 self.inner.get_block((*height as usize).into())
626 }
627}
628
629impl<Types: NodeType> UpdateAvailabilityStorage<Types>
630 for Transaction<RwLockWriteGuard<'_, FileSystemStorageInner<Types>>>
631where
632 Payload<Types>: QueryablePayload<Types>,
633 Header<Types>: QueryableHeader<Types>,
634{
635 async fn insert_qc_chain(
636 &mut self,
637 height: u64,
638 qc_chain: Option<[CertificatePair<Types>; 2]>,
639 ) -> anyhow::Result<()> {
640 if height + 1 >= (self.inner.leaf_storage.iter().len() as u64) {
641 if let Some(qc_chain) = qc_chain {
646 self.inner.latest_qc_chain = Some(qc_chain);
647 } else {
648 self.inner.latest_qc_chain = None;
651 }
652 }
653
654 Ok(())
655 }
656
657 async fn insert_cert2(
658 &mut self,
659 height: u64,
660 cert2: Certificate2<Types>,
661 ) -> anyhow::Result<()> {
662 self.inner.cert2_storage.insert(height as usize, cert2)?;
663 Ok(())
664 }
665
666 async fn insert_leaf_range<'a>(
667 &mut self,
668 leaves: impl Send + IntoIterator<Item = &'a LeafQueryData<Types>>,
669 ) -> anyhow::Result<()> {
670 for leaf in leaves {
671 if !self
672 .inner
673 .leaf_storage
674 .insert(leaf.height() as usize, leaf.clone())?
675 {
676 continue;
678 }
679 self.inner
680 .index_by_leaf_hash
681 .insert(leaf.hash(), leaf.height());
682 update_index_by_hash(
683 &mut self.inner.index_by_block_hash,
684 leaf.block_hash(),
685 leaf.height(),
686 );
687 update_index_by_hash(
688 &mut self.inner.index_by_payload_hash,
689 leaf.payload_hash(),
690 leaf.height(),
691 );
692 self.inner
693 .index_by_time
694 .entry(leaf.header().timestamp())
695 .or_default()
696 .push(leaf.height());
697 }
698
699 Ok(())
700 }
701
702 async fn insert_block_range<'a>(
703 &mut self,
704 blocks: impl Send + IntoIterator<IntoIter: Send, Item = &'a BlockQueryData<Types>>,
705 ) -> anyhow::Result<()> {
706 for block in blocks {
707 if !self
708 .inner
709 .block_storage
710 .insert(block.height() as usize, block.clone())?
711 {
712 continue;
714 }
715 self.inner.num_transactions += block.len();
716 self.inner.payload_size += block.size() as usize;
717 for (_, txn) in block.enumerate() {
718 update_index_by_hash(
719 &mut self.inner.index_by_txn_hash,
720 txn.commit(),
721 block.height(),
722 );
723 }
724 }
725 Ok(())
726 }
727
728 async fn insert_vid_range<'a>(
729 &mut self,
730 vid: impl Send
731 + IntoIterator<
732 IntoIter: Send,
733 Item = (&'a VidCommonQueryData<Types>, Option<&'a VidShare>),
734 >,
735 ) -> anyhow::Result<()> {
736 for (common, share) in vid {
737 self.inner
738 .vid_storage
739 .insert(common.height() as usize, (common.clone(), share.cloned()))?;
740 }
741 Ok(())
742 }
743}
744
745fn update_index_by_hash<H: Eq + Hash, P: Ord>(index: &mut HashMap<H, P>, hash: H, pos: P) {
750 match index.entry(hash) {
751 Entry::Occupied(mut e) => {
752 if &pos < e.get() {
753 e.insert(pos);
755 }
756 },
757 Entry::Vacant(e) => {
758 e.insert(pos);
759 },
760 }
761}
762
763#[async_trait]
764impl<Types, T> NodeStorage<Types> for Transaction<T>
765where
766 Types: NodeType,
767 Payload<Types>: QueryablePayload<Types>,
768 Header<Types>: QueryableHeader<Types>,
769 T: Revert + Deref<Target = FileSystemStorageInner<Types>> + Send + Sync,
770{
771 async fn block_height(&mut self) -> QueryResult<usize> {
772 Ok(self.inner.leaf_storage.iter().len())
773 }
774
775 async fn count_transactions_in_range(
776 &mut self,
777 range: impl RangeBounds<usize> + Send,
778 namespace: Option<NamespaceId<Types>>,
779 ) -> QueryResult<usize> {
780 if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
781 || !matches!(range.end_bound(), Bound::Unbounded)
782 {
783 return Err(QueryError::Error {
784 message: "partial aggregates are not supported with file system backend".into(),
785 });
786 }
787
788 if namespace.is_some() {
789 return Err(QueryError::Error {
790 message: "file system does not support per-namespace stats".into(),
791 });
792 }
793
794 Ok(self.inner.num_transactions)
795 }
796
797 async fn payload_size_in_range(
798 &mut self,
799 range: impl RangeBounds<usize> + Send,
800 namespace: Option<NamespaceId<Types>>,
801 ) -> QueryResult<usize> {
802 if !matches!(range.start_bound(), Bound::Unbounded | Bound::Included(0))
803 || !matches!(range.end_bound(), Bound::Unbounded)
804 {
805 return Err(QueryError::Error {
806 message: "partial aggregates are not supported with file system backend".into(),
807 });
808 }
809
810 if namespace.is_some() {
811 return Err(QueryError::Error {
812 message: "file system does not support per-namespace stats".into(),
813 });
814 }
815
816 Ok(self.inner.payload_size)
817 }
818
819 async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
820 where
821 ID: Into<BlockId<Types>> + Send + Sync,
822 {
823 self.inner
824 .vid_storage
825 .iter()
826 .nth(self.inner.get_block_index(id.into())?)
827 .context(NotFoundSnafu)?
828 .context(MissingSnafu)?
829 .1
830 .context(MissingSnafu)
831 }
832
833 async fn sync_status_for_range(
834 &mut self,
835 start: usize,
836 end: usize,
837 ) -> QueryResult<SyncStatusQueryData> {
838 Ok(SyncStatusQueryData {
839 leaves: self.inner.leaf_storage.sync_status(start, end),
840 blocks: self.inner.block_storage.sync_status(start, end),
841 vid_common: self.inner.vid_storage.sync_status(start, end),
842 pruned_height: None,
843 })
844 }
845
846 async fn get_header_window(
847 &mut self,
848 start: impl Into<WindowStart<Types>> + Send + Sync,
849 end: u64,
850 limit: usize,
851 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
852 let first_block = match start.into() {
853 WindowStart::Height(h) => h,
854 WindowStart::Hash(h) => self.inner.get_header(h.into())?.block_number(),
855 WindowStart::Time(t) => {
856 let blocks = self
859 .inner
860 .index_by_time
861 .range(t..)
862 .next()
863 .context(NotFoundSnafu)?
864 .1;
865 blocks[0]
870 },
871 } as usize;
872
873 let mut res = TimeWindowQueryData::default();
874
875 if first_block > 0 {
877 res.prev = Some(self.inner.get_header((first_block - 1).into())?);
878 }
879
880 for block in self.inner.get_block_range(first_block..)? {
883 let header = block?.header().clone();
884 if header.timestamp() >= end {
885 res.next = Some(header);
886 break;
887 }
888 res.window.push(header);
889 if res.window.len() >= limit {
890 break;
891 }
892 }
893
894 Ok(res)
895 }
896
897 async fn latest_qc_chain(&mut self) -> QueryResult<Option<[CertificatePair<Types>; 2]>> {
898 Ok(self.inner.latest_qc_chain.clone())
899 }
900
901 async fn load_cert2(&mut self, height: u64) -> QueryResult<Option<Certificate2<Types>>> {
902 Ok(self
903 .inner
904 .cert2_storage
905 .iter()
906 .nth(height as usize)
907 .flatten())
908 }
909
910 async fn load_earliest_cert2(
911 &mut self,
912 height: u64,
913 ) -> QueryResult<Option<Certificate2<Types>>> {
914 Ok(self
915 .inner
916 .cert2_storage
917 .iter()
918 .skip(height as usize)
919 .flatten()
920 .next())
921 }
922}
923
924impl<Types, T: Revert + Send> AggregatesStorage<Types> for Transaction<T>
925where
926 Types: NodeType,
927 Header<Types>: QueryableHeader<Types>,
928{
929 async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
930 Ok(0)
931 }
932
933 async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
934 Ok(None)
935 }
936}
937
938impl<Types, T: Revert + Send> UpdateAggregatesStorage<Types> for Transaction<T>
939where
940 Types: NodeType,
941 Header<Types>: QueryableHeader<Types>,
942{
943 async fn update_aggregates(
944 &mut self,
945 _prev: Aggregate<Types>,
946 _blocks: &[PayloadMetadata<Types>],
947 ) -> anyhow::Result<Aggregate<Types>> {
948 Ok(Aggregate::default())
949 }
950}
951
952impl<T: Revert> PrunedHeightStorage for Transaction<T> {}
953
954impl<Types> HasMetrics for FileSystemStorage<Types>
955where
956 Types: NodeType,
957 Header<Types>: QueryableHeader<Types>,
958 Payload<Types>: QueryablePayload<Types>,
959{
960 fn metrics(&self) -> &PrometheusMetrics {
961 &self.metrics
962 }
963}