1use std::{
55 cmp::{max, min},
56 fmt::{Debug, Display},
57 iter::repeat_with,
58 marker::PhantomData,
59 ops::{Bound, Range, RangeBounds},
60 sync::Arc,
61 time::{Duration, Instant},
62};
63
64use anyhow::{Context, bail};
65use async_lock::Semaphore;
66use async_trait::async_trait;
67use backoff::{ExponentialBackoff, ExponentialBackoffBuilder, backoff::Backoff};
68use chrono::{DateTime, Utc};
69use derivative::Derivative;
70use futures::{
71 channel::oneshot,
72 future::{self, BoxFuture, Either, Future, FutureExt, join_all},
73 stream::{self, BoxStream, StreamExt},
74};
75use hotshot_types::{
76 data::VidShare,
77 simple_certificate::CertificatePair,
78 traits::{
79 metrics::{Counter, Gauge, Histogram, Metrics},
80 node_implementation::NodeType,
81 },
82};
83use jf_merkle_tree_compat::{MerkleTreeScheme, prelude::MerkleProof};
84use tagged_base64::TaggedBase64;
85use tokio::{spawn, sync::Mutex, time::sleep};
86use tracing::Instrument;
87
88use super::{
89 Transaction, VersionedDataSource,
90 notifier::Notifier,
91 storage::{
92 Aggregate, AggregatesStorage, AvailabilityStorage, ExplorerStorage,
93 MerklizedStateHeightStorage, MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage,
94 UpdateAvailabilityStorage,
95 pruning::{PruneStorage, PrunedHeightDataSource, PrunedHeightStorage},
96 },
97};
98use crate::{
99 Header, Payload, QueryError, QueryResult,
100 availability::{
101 AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction,
102 Certificate2, Fetch, FetchStream, HeaderQueryData, LeafId, LeafQueryData, NamespaceId,
103 PayloadMetadata, PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash,
104 UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
105 },
106 data_source::fetching::{leaf::RangeRequest, vid::VidCommonRangeFetcher},
107 explorer::{self, ExplorerDataSource},
108 fetching::{self, NonEmptyRange, Provider, request},
109 merklized_state::{
110 MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
111 },
112 metrics::PrometheusMetrics,
113 node::{
114 NodeDataSource, SyncStatus, SyncStatusQueryData, SyncStatusRange, TimeWindowQueryData,
115 WindowStart,
116 },
117 status::{HasMetrics, StatusDataSource},
118 task::BackgroundTask,
119 types::HeightIndexed,
120};
121
122mod block;
123mod header;
124mod leaf;
125mod transaction;
126mod vid;
127
128use self::{
129 block::{PayloadFetcher, PayloadRangeFetcher},
130 leaf::{LeafFetcher, LeafRangeFetcher},
131 transaction::TransactionRequest,
132 vid::{VidCommonFetcher, VidCommonRequest},
133};
134
135pub struct Builder<Types, S, P> {
137 storage: S,
138 provider: P,
139 backoff: ExponentialBackoffBuilder,
140 rate_limit: usize,
141 range_chunk_size: usize,
142 proactive_interval: Duration,
143 proactive_range_chunk_size: usize,
144 sync_status_chunk_size: usize,
145 active_fetch_delay: Duration,
146 chunk_fetch_delay: Duration,
147 proactive_fetching: bool,
148 aggregator: bool,
149 aggregator_chunk_size: Option<usize>,
150 leaf_only: bool,
151 sync_status_ttl: Duration,
152 _types: PhantomData<Types>,
153}
154
155impl<Types, S, P> Builder<Types, S, P> {
156 pub fn new(storage: S, provider: P) -> Self {
158 let mut default_backoff = ExponentialBackoffBuilder::default();
159 default_backoff
160 .with_initial_interval(Duration::from_secs(1))
161 .with_multiplier(2.)
162 .with_max_interval(Duration::from_secs(32))
163 .with_max_elapsed_time(Some(Duration::from_secs(64)));
164
165 Self {
166 storage,
167 provider,
168 backoff: default_backoff,
169 rate_limit: 32,
170 range_chunk_size: 25,
171 proactive_interval: Duration::from_hours(8),
172 proactive_range_chunk_size: 100,
173 sync_status_chunk_size: 100_000,
174 active_fetch_delay: Duration::from_millis(50),
175 chunk_fetch_delay: Duration::from_millis(100),
176 proactive_fetching: true,
177 aggregator: true,
178 aggregator_chunk_size: None,
179 leaf_only: false,
180 sync_status_ttl: Duration::from_mins(5),
181 _types: Default::default(),
182 }
183 }
184
185 pub fn leaf_only(mut self) -> Self {
186 self.leaf_only = true;
187 self
188 }
189
190 pub fn with_min_retry_interval(mut self, interval: Duration) -> Self {
192 self.backoff.with_initial_interval(interval);
193 self
194 }
195
196 pub fn with_max_retry_interval(mut self, interval: Duration) -> Self {
198 self.backoff.with_max_interval(interval);
199 self
200 }
201
202 pub fn with_retry_multiplier(mut self, multiplier: f64) -> Self {
204 self.backoff.with_multiplier(multiplier);
205 self
206 }
207
208 pub fn with_retry_randomization_factor(mut self, factor: f64) -> Self {
210 self.backoff.with_randomization_factor(factor);
211 self
212 }
213
214 pub fn with_retry_timeout(mut self, timeout: Duration) -> Self {
216 self.backoff.with_max_elapsed_time(Some(timeout));
217 self
218 }
219
220 pub fn with_rate_limit(mut self, with_rate_limit: usize) -> Self {
222 self.rate_limit = with_rate_limit;
223 self
224 }
225
226 pub fn with_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
233 self.range_chunk_size = range_chunk_size;
234 self
235 }
236
237 pub fn with_proactive_interval(mut self, interval: Duration) -> Self {
241 self.proactive_interval = interval;
242 self
243 }
244
245 pub fn with_proactive_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
251 self.proactive_range_chunk_size = range_chunk_size;
252 self
253 }
254
255 pub fn with_sync_status_chunk_size(mut self, chunk_size: usize) -> Self {
258 self.sync_status_chunk_size = chunk_size;
259 self
260 }
261
262 pub fn with_sync_status_ttl(mut self, ttl: Duration) -> Self {
268 self.sync_status_ttl = ttl;
269 self
270 }
271
272 pub fn with_active_fetch_delay(mut self, active_fetch_delay: Duration) -> Self {
278 self.active_fetch_delay = active_fetch_delay;
279 self
280 }
281
282 pub fn with_chunk_fetch_delay(mut self, chunk_fetch_delay: Duration) -> Self {
294 self.chunk_fetch_delay = chunk_fetch_delay;
295 self
296 }
297
298 pub fn disable_proactive_fetching(mut self) -> Self {
307 self.proactive_fetching = false;
308 self
309 }
310
311 pub fn disable_aggregator(mut self) -> Self {
316 self.aggregator = false;
317 self
318 }
319
320 pub fn with_aggregator_chunk_size(mut self, chunk_size: usize) -> Self {
329 self.aggregator_chunk_size = Some(chunk_size);
330 self
331 }
332
333 pub fn is_leaf_only(&self) -> bool {
334 self.leaf_only
335 }
336}
337
338impl<Types, S, P> Builder<Types, S, P>
339where
340 Types: NodeType,
341 Payload<Types>: QueryablePayload<Types>,
342 Header<Types>: QueryableHeader<Types>,
343 S: PruneStorage + VersionedDataSource + HasMetrics + 'static,
344 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
345 + PrunedHeightStorage
346 + NodeStorage<Types>
347 + AggregatesStorage<Types>,
348 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
349 P: AvailabilityProvider<Types>,
350{
351 pub async fn build(self) -> anyhow::Result<FetchingDataSource<Types, S, P>> {
353 FetchingDataSource::new(self).await
354 }
355}
356
357#[derive(Derivative)]
374#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, P: Debug"))]
375pub struct FetchingDataSource<Types, S, P>
376where
377 Types: NodeType,
378{
379 fetcher: Arc<Fetcher<Types, S, P>>,
384 scanner: Option<BackgroundTask>,
386 aggregator: Option<BackgroundTask>,
388 pruner: Pruner<Types, S>,
389}
390
391#[derive(Derivative)]
392#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, "))]
393pub struct Pruner<Types, S>
394where
395 Types: NodeType,
396{
397 handle: Option<BackgroundTask>,
398 _types: PhantomData<(Types, S)>,
399}
400
401impl<Types, S> Pruner<Types, S>
402where
403 Types: NodeType,
404 Header<Types>: QueryableHeader<Types>,
405 Payload<Types>: QueryablePayload<Types>,
406 S: PruneStorage + Send + Sync + 'static,
407{
408 async fn new(storage: Arc<S>, backoff: ExponentialBackoff) -> Self {
409 let cfg = storage.get_pruning_config();
410 let Some(cfg) = cfg else {
411 return Self {
412 handle: None,
413 _types: Default::default(),
414 };
415 };
416
417 let future = async move {
418 for i in 1.. {
419 sleep(cfg.interval()).await;
422
423 tracing::warn!("starting pruner run {i} ");
424 Self::prune(storage.clone(), &backoff).await;
425 }
426 };
427
428 let task = BackgroundTask::spawn("pruner", future);
429
430 Self {
431 handle: Some(task),
432 _types: Default::default(),
433 }
434 }
435
436 async fn prune(storage: Arc<S>, backoff: &ExponentialBackoff) {
437 let mut pruner = S::Pruner::default();
439 'run: loop {
440 let mut backoff = backoff.clone();
441 backoff.reset();
442 'batch: loop {
443 match storage.prune(&mut pruner).await {
444 Ok(Some(height)) => {
445 tracing::warn!("Pruned to height {height}");
446 break 'batch;
447 },
448 Ok(None) => {
449 tracing::warn!("pruner run complete.");
450 break 'run;
451 },
452 Err(e) => {
453 tracing::warn!("error pruning batch: {e:#}");
454 if let Some(delay) = backoff.next_backoff() {
455 sleep(delay).await;
456 } else {
457 tracing::error!("pruning run failed after too many errors: {e:#}");
458 break 'run;
459 }
460 },
461 }
462 }
463 }
464 }
465}
466
467impl<Types, S, P> FetchingDataSource<Types, S, P>
468where
469 Types: NodeType,
470 Payload<Types>: QueryablePayload<Types>,
471 Header<Types>: QueryableHeader<Types>,
472 S: VersionedDataSource + PruneStorage + HasMetrics + 'static,
473 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
474 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
475 + NodeStorage<Types>
476 + PrunedHeightStorage
477 + AggregatesStorage<Types>,
478 P: AvailabilityProvider<Types>,
479{
480 pub fn builder(storage: S, provider: P) -> Builder<Types, S, P> {
482 Builder::new(storage, provider)
483 }
484
485 async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
486 let leaf_only = builder.is_leaf_only();
487 let aggregator = builder.aggregator;
488 let aggregator_chunk_size = builder
489 .aggregator_chunk_size
490 .unwrap_or(builder.range_chunk_size);
491 let proactive_fetching = builder.proactive_fetching;
492 let proactive_interval = builder.proactive_interval;
493 let proactive_range_chunk_size = builder.proactive_range_chunk_size;
494 let backoff = builder.backoff.build();
495 let scanner_metrics = ScannerMetrics::new(builder.storage.metrics());
496 let aggregator_metrics = AggregatorMetrics::new(builder.storage.metrics());
497
498 let fetcher = Arc::new(Fetcher::new(builder).await?);
499 let scanner = if proactive_fetching && !leaf_only {
500 Some(BackgroundTask::spawn(
501 "proactive scanner",
502 fetcher.clone().proactive_scan(
503 proactive_interval,
504 proactive_range_chunk_size,
505 scanner_metrics,
506 ),
507 ))
508 } else {
509 None
510 };
511
512 let aggregator = if aggregator && !leaf_only {
513 Some(BackgroundTask::spawn(
514 "aggregator",
515 fetcher
516 .clone()
517 .aggregate(aggregator_chunk_size, aggregator_metrics),
518 ))
519 } else {
520 None
521 };
522
523 let storage = fetcher.storage.clone();
524
525 let pruner = Pruner::new(storage, backoff).await;
526 let ds = Self {
527 fetcher,
528 scanner,
529 pruner,
530 aggregator,
531 };
532
533 Ok(ds)
534 }
535
536 pub fn inner(&self) -> Arc<S> {
538 self.fetcher.storage.clone()
539 }
540}
541
542impl<Types, S, P> AsRef<S> for FetchingDataSource<Types, S, P>
543where
544 Types: NodeType,
545{
546 fn as_ref(&self) -> &S {
547 &self.fetcher.storage
548 }
549}
550
551impl<Types, S, P> HasMetrics for FetchingDataSource<Types, S, P>
552where
553 Types: NodeType,
554 S: HasMetrics,
555{
556 fn metrics(&self) -> &PrometheusMetrics {
557 self.as_ref().metrics()
558 }
559}
560
561#[async_trait]
562impl<Types, S, P> StatusDataSource for FetchingDataSource<Types, S, P>
563where
564 Types: NodeType,
565 Header<Types>: QueryableHeader<Types>,
566 S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
567 for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
568 P: Send + Sync,
569{
570 async fn block_height(&self) -> QueryResult<usize> {
571 let mut tx = self.read().await.map_err(|err| QueryError::Error {
572 message: err.to_string(),
573 })?;
574 tx.block_height().await
575 }
576}
577
578#[async_trait]
579impl<Types, S, P> PrunedHeightDataSource for FetchingDataSource<Types, S, P>
580where
581 Types: NodeType,
582 S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
583 for<'a> S::ReadOnly<'a>: PrunedHeightStorage,
584 P: Send + Sync,
585{
586 async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
587 let mut tx = self.read().await?;
588 tx.load_pruned_height().await
589 }
590}
591
592#[async_trait]
593impl<Types, S, P> AvailabilityDataSource<Types> for FetchingDataSource<Types, S, P>
594where
595 Types: NodeType,
596 Header<Types>: QueryableHeader<Types>,
597 Payload<Types>: QueryablePayload<Types>,
598 S: VersionedDataSource + 'static,
599 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
600 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
601 P: AvailabilityProvider<Types>,
602{
603 async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
604 where
605 ID: Into<LeafId<Types>> + Send + Sync,
606 {
607 self.fetcher.get(id.into()).await
608 }
609
610 async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
611 where
612 ID: Into<BlockId<Types>> + Send + Sync,
613 {
614 self.fetcher
615 .get::<HeaderQueryData<_>>(id.into())
616 .await
617 .map(|h| h.header)
618 }
619
620 async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
621 where
622 ID: Into<BlockId<Types>> + Send + Sync,
623 {
624 self.fetcher.get(id.into()).await
625 }
626
627 async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
628 where
629 ID: Into<BlockId<Types>> + Send + Sync,
630 {
631 self.fetcher.get(id.into()).await
632 }
633
634 async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
635 where
636 ID: Into<BlockId<Types>> + Send + Sync,
637 {
638 self.fetcher.get(id.into()).await
639 }
640
641 async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
642 where
643 ID: Into<BlockId<Types>> + Send + Sync,
644 {
645 self.fetcher.get(VidCommonRequest::from(id.into())).await
646 }
647
648 async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
649 where
650 ID: Into<BlockId<Types>> + Send + Sync,
651 {
652 self.fetcher.get(VidCommonRequest::from(id.into())).await
653 }
654
655 async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
656 where
657 R: RangeBounds<usize> + Send + 'static,
658 {
659 self.fetcher.clone().get_range(range)
660 }
661
662 async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
663 where
664 R: RangeBounds<usize> + Send + 'static,
665 {
666 self.fetcher.clone().get_range(range)
667 }
668
669 async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
670 where
671 R: RangeBounds<usize> + Send + 'static,
672 {
673 let leaves: FetchStream<LeafQueryData<Types>> = self.fetcher.clone().get_range(range);
674
675 leaves
676 .map(|fetch| fetch.map(|leaf| leaf.leaf.block_header().clone()))
677 .boxed()
678 }
679
680 async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
681 where
682 R: RangeBounds<usize> + Send + 'static,
683 {
684 self.fetcher.clone().get_range(range)
685 }
686
687 async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
688 where
689 R: RangeBounds<usize> + Send + 'static,
690 {
691 self.fetcher.clone().get_range(range)
692 }
693
694 async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
695 where
696 R: RangeBounds<usize> + Send + 'static,
697 {
698 self.fetcher.clone().get_range(range)
699 }
700
701 async fn get_vid_common_metadata_range<R>(
702 &self,
703 range: R,
704 ) -> FetchStream<VidCommonMetadata<Types>>
705 where
706 R: RangeBounds<usize> + Send + 'static,
707 {
708 self.fetcher.clone().get_range(range)
709 }
710
711 async fn get_leaf_range_rev(
712 &self,
713 start: Bound<usize>,
714 end: usize,
715 ) -> FetchStream<LeafQueryData<Types>> {
716 self.fetcher.clone().get_range_rev(start, end)
717 }
718
719 async fn get_block_range_rev(
720 &self,
721 start: Bound<usize>,
722 end: usize,
723 ) -> FetchStream<BlockQueryData<Types>> {
724 self.fetcher.clone().get_range_rev(start, end)
725 }
726
727 async fn get_payload_range_rev(
728 &self,
729 start: Bound<usize>,
730 end: usize,
731 ) -> FetchStream<PayloadQueryData<Types>> {
732 self.fetcher.clone().get_range_rev(start, end)
733 }
734
735 async fn get_payload_metadata_range_rev(
736 &self,
737 start: Bound<usize>,
738 end: usize,
739 ) -> FetchStream<PayloadMetadata<Types>> {
740 self.fetcher.clone().get_range_rev(start, end)
741 }
742
743 async fn get_vid_common_range_rev(
744 &self,
745 start: Bound<usize>,
746 end: usize,
747 ) -> FetchStream<VidCommonQueryData<Types>> {
748 self.fetcher.clone().get_range_rev(start, end)
749 }
750
751 async fn get_vid_common_metadata_range_rev(
752 &self,
753 start: Bound<usize>,
754 end: usize,
755 ) -> FetchStream<VidCommonMetadata<Types>> {
756 self.fetcher.clone().get_range_rev(start, end)
757 }
758
759 async fn get_block_containing_transaction(
760 &self,
761 h: TransactionHash<Types>,
762 ) -> Fetch<BlockWithTransaction<Types>> {
763 self.fetcher.clone().get(TransactionRequest::from(h)).await
764 }
765
766 async fn get_cert2(&self, height: u64) -> QueryResult<Option<Certificate2<Types>>> {
767 self.fetcher.get_cert2(height).await
768 }
769}
770
771impl<Types, S, P> UpdateAvailabilityData<Types> for FetchingDataSource<Types, S, P>
772where
773 Types: NodeType,
774 Header<Types>: QueryableHeader<Types>,
775 Payload<Types>: QueryablePayload<Types>,
776 S: VersionedDataSource + 'static,
777 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
778 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
779 P: AvailabilityProvider<Types>,
780{
781 async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
782 let height = info.height() as usize;
783
784 self.fetcher
786 .store(&(info.leaf.clone(), info.qc_chain, info.cert2.clone()))
787 .await;
788
789 leaf::trigger_fetch_for_parent(&self.fetcher, &info.leaf);
791
792 let block = match info.block {
806 Some(block) => Some(block),
807 None => match self.fetcher.get::<BlockQueryData<Types>>(height).await {
808 Fetch::Ready(block) => Some(block),
809 Fetch::Pending(fut) => {
810 let span = tracing::info_span!("fetch missing block", height);
811 spawn(
812 async move {
813 tracing::info!("fetching missing block");
814 fut.await;
815 }
816 .instrument(span),
817 );
818 None
819 },
820 },
821 };
822 if let Some(block) = &block {
823 self.fetcher.store(block).await;
824 }
825 let vid = match info.vid_common {
826 Some(vid) => Some(vid),
827 None => match self.fetcher.get::<VidCommonQueryData<Types>>(height).await {
828 Fetch::Ready(vid) => Some(vid),
829 Fetch::Pending(fut) => {
830 let span = tracing::info_span!("fetch missing VID common", height);
831 spawn(
832 async move {
833 tracing::info!("fetching missing VID common");
834 fut.await;
835 }
836 .instrument(span),
837 );
838 None
839 },
840 },
841 };
842 if let Some(vid) = &vid {
843 self.fetcher.store(&(vid.clone(), info.vid_share)).await;
844 }
845
846 info.leaf.notify(&self.fetcher.notifiers).await;
852 if let Some(block) = &block {
853 block.notify(&self.fetcher.notifiers).await;
854 }
855 if let Some(vid) = &vid {
856 vid.notify(&self.fetcher.notifiers).await;
857 }
858
859 Ok(())
860 }
861
862 async fn append_payload(&self, block: BlockQueryData<Types>) -> anyhow::Result<()> {
869 self.fetcher.store(&block).await;
871 block.notify(&self.fetcher.notifiers).await;
872 Ok(())
873 }
874}
875
876impl<Types, S, P> VersionedDataSource for FetchingDataSource<Types, S, P>
877where
878 Types: NodeType,
879 S: VersionedDataSource + Send + Sync,
880 P: Send + Sync,
881{
882 type Transaction<'a>
883 = S::Transaction<'a>
884 where
885 Self: 'a;
886 type ReadOnly<'a>
887 = S::ReadOnly<'a>
888 where
889 Self: 'a;
890
891 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
892 self.fetcher.write().await
893 }
894
895 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
896 self.fetcher.read().await
897 }
898}
899
900#[derive(Debug)]
902struct Fetcher<Types, S, P>
903where
904 Types: NodeType,
905{
906 storage: Arc<S>,
907 notifiers: Notifiers<Types>,
908 provider: Arc<P>,
909 leaf_fetcher: Arc<LeafFetcher<Types, S, P>>,
910 leaf_range_fetcher: Arc<LeafRangeFetcher<Types, S, P>>,
911 payload_fetcher: Option<Arc<PayloadFetcher<Types, S, P>>>,
912 payload_range_fetcher: Option<Arc<PayloadRangeFetcher<Types, S, P>>>,
913 vid_common_fetcher: Option<Arc<VidCommonFetcher<Types, S, P>>>,
914 vid_common_range_fetcher: Option<Arc<VidCommonRangeFetcher<Types, S, P>>>,
915 range_chunk_size: usize,
916 sync_status_chunk_size: usize,
917 active_fetch_delay: Duration,
919 chunk_fetch_delay: Duration,
921 backoff: ExponentialBackoff,
923 retry_semaphore: Arc<Semaphore>,
926 leaf_only: bool,
927 sync_status_metrics: SyncStatusMetrics,
928 sync_status: Mutex<CachedSyncStatus>,
929}
930
931impl<Types, S, P> VersionedDataSource for Fetcher<Types, S, P>
932where
933 Types: NodeType,
934 S: VersionedDataSource + Send + Sync,
935 P: Send + Sync,
936{
937 type Transaction<'a>
938 = S::Transaction<'a>
939 where
940 Self: 'a;
941 type ReadOnly<'a>
942 = S::ReadOnly<'a>
943 where
944 Self: 'a;
945
946 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
947 self.storage.write().await
948 }
949
950 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
951 self.storage.read().await
952 }
953}
954
955impl<Types, S, P> Fetcher<Types, S, P>
956where
957 Types: NodeType,
958 Header<Types>: QueryableHeader<Types>,
959 S: VersionedDataSource + HasMetrics + Sync,
960 for<'a> S::ReadOnly<'a>: PrunedHeightStorage + NodeStorage<Types>,
961{
962 pub async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
963 let retry_semaphore = Arc::new(Semaphore::new(builder.rate_limit));
964 let backoff = builder.backoff.build();
965
966 let (payload_fetcher, payload_range_fetcher, vid_common_fetcher, vid_common_range_fetcher) =
967 if builder.is_leaf_only() {
968 (None, None, None, None)
969 } else {
970 (
971 Some(Arc::new(fetching::Fetcher::new(
972 retry_semaphore.clone(),
973 backoff.clone(),
974 ))),
975 Some(Arc::new(fetching::Fetcher::new(
976 retry_semaphore.clone(),
977 backoff.clone(),
978 ))),
979 Some(Arc::new(fetching::Fetcher::new(
980 retry_semaphore.clone(),
981 backoff.clone(),
982 ))),
983 Some(Arc::new(fetching::Fetcher::new(
984 retry_semaphore.clone(),
985 backoff.clone(),
986 ))),
987 )
988 };
989 let leaf_fetcher = fetching::Fetcher::new(retry_semaphore.clone(), backoff.clone());
990 let leaf_range_fetcher = fetching::Fetcher::new(retry_semaphore.clone(), backoff.clone());
991
992 let leaf_only = builder.leaf_only;
993 let sync_status_metrics =
994 SyncStatusMetrics::new(builder.storage.metrics(), builder.sync_status_chunk_size);
995
996 Ok(Self {
997 storage: Arc::new(builder.storage),
998 notifiers: Default::default(),
999 provider: Arc::new(builder.provider),
1000 leaf_fetcher: Arc::new(leaf_fetcher),
1001 leaf_range_fetcher: Arc::new(leaf_range_fetcher),
1002 payload_fetcher,
1003 payload_range_fetcher,
1004 vid_common_fetcher,
1005 vid_common_range_fetcher,
1006 range_chunk_size: builder.range_chunk_size,
1007 sync_status_chunk_size: builder.sync_status_chunk_size,
1008 active_fetch_delay: builder.active_fetch_delay,
1009 chunk_fetch_delay: builder.chunk_fetch_delay,
1010 backoff,
1011 retry_semaphore,
1012 leaf_only,
1013 sync_status_metrics,
1014 sync_status: Mutex::new(CachedSyncStatus::new(builder.sync_status_ttl)),
1015 })
1016 }
1017}
1018
1019impl<Types, S, P> Fetcher<Types, S, P>
1020where
1021 Types: NodeType,
1022 Header<Types>: QueryableHeader<Types>,
1023 Payload<Types>: QueryablePayload<Types>,
1024 S: VersionedDataSource + 'static,
1025 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1026 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
1027 P: AvailabilityProvider<Types>,
1028{
1029 async fn get<T>(self: &Arc<Self>, req: impl Into<T::Request> + Send) -> Fetch<T>
1030 where
1031 T: Fetchable<Types>,
1032 {
1033 let req = req.into();
1034
1035 let passive_fetch = T::passive_fetch(&self.notifiers, req).await;
1045
1046 match self.try_get(req).await {
1047 Ok(Some(obj)) => return Fetch::Ready(obj),
1048 Ok(None) => return passive(req, passive_fetch),
1049 Err(err) => {
1050 tracing::warn!(
1051 ?req,
1052 "unable to fetch object; spawning a task to retry: {err:#}"
1053 );
1054 },
1055 }
1056
1057 let (send, recv) = oneshot::channel();
1059
1060 let fetcher = self.clone();
1061 let mut backoff = fetcher.backoff.clone();
1062 let span = tracing::warn_span!("get retry", ?req);
1063 spawn(
1064 async move {
1065 backoff.reset();
1066 let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1067 loop {
1068 let res = {
1069 let _guard = fetcher.retry_semaphore.acquire().await;
1073 fetcher.try_get(req).await
1074 };
1075 match res {
1076 Ok(Some(obj)) => {
1077 tracing::info!(?req, "object was ready after retries");
1081 send.send(obj).ok();
1082 break;
1083 },
1084 Ok(None) => {
1085 tracing::info!(?req, "spawned fetch after retries");
1089 break;
1090 },
1091 Err(err) => {
1092 tracing::warn!(
1093 ?req,
1094 ?delay,
1095 "unable to fetch object, will retry: {err:#}"
1096 );
1097 sleep(delay).await;
1098 if let Some(next_delay) = backoff.next_backoff() {
1099 delay = next_delay;
1100 }
1101 },
1102 }
1103 }
1104 }
1105 .instrument(span),
1106 );
1107
1108 passive(req, select_some(passive_fetch, recv.map(Result::ok)))
1111 }
1112
1113 async fn try_get<T>(self: &Arc<Self>, req: T::Request) -> anyhow::Result<Option<T>>
1124 where
1125 T: Fetchable<Types>,
1126 {
1127 let mut tx = self.read().await.context("opening read transaction")?;
1128 match T::load(&mut tx, req).await {
1129 Ok(t) => Ok(Some(t)),
1130 Err(QueryError::Missing | QueryError::NotFound) => {
1131 tracing::debug!(?req, "object missing from local storage, will try to fetch");
1134 self.fetch::<T>(&mut tx, req).await?;
1135 Ok(None)
1136 },
1137 Err(err) => {
1138 bail!("failed to fetch resource {req:?} from local storage: {err:#}");
1141 },
1142 }
1143 }
1144
1145 fn get_range<R, T>(self: Arc<Self>, range: R) -> BoxStream<'static, Fetch<T>>
1157 where
1158 R: RangeBounds<usize> + Send + 'static,
1159 T: RangedFetchable<Types>,
1160 {
1161 let chunk_size = self.range_chunk_size;
1162 self.get_range_with_chunk_size(chunk_size, range)
1163 }
1164
1165 fn get_range_with_chunk_size<R, T>(
1167 self: Arc<Self>,
1168 chunk_size: usize,
1169 range: R,
1170 ) -> BoxStream<'static, Fetch<T>>
1171 where
1172 R: RangeBounds<usize> + Send + 'static,
1173 T: RangedFetchable<Types>,
1174 {
1175 let chunk_fetch_delay = self.chunk_fetch_delay;
1176 let active_fetch_delay = self.active_fetch_delay;
1177
1178 stream::iter(range_chunks(range, chunk_size))
1179 .then(move |chunk| {
1180 let self_clone = self.clone();
1181 async move {
1182 {
1183 let chunk = self_clone.get_chunk(chunk).await;
1184
1185 sleep(chunk_fetch_delay).await;
1190 stream::iter(chunk)
1191 }
1192 }
1193 })
1194 .flatten()
1195 .then(move |f| async move {
1196 match f {
1197 Fetch::Pending(_) => sleep(active_fetch_delay).await,
1201 Fetch::Ready(_) => (),
1202 };
1203 f
1204 })
1205 .boxed()
1206 }
1207
1208 fn get_range_rev<T>(
1215 self: Arc<Self>,
1216 start: Bound<usize>,
1217 end: usize,
1218 ) -> BoxStream<'static, Fetch<T>>
1219 where
1220 T: RangedFetchable<Types>,
1221 {
1222 let chunk_size = self.range_chunk_size;
1223 self.get_range_with_chunk_size_rev(chunk_size, start, end)
1224 }
1225
1226 fn get_range_with_chunk_size_rev<T>(
1228 self: Arc<Self>,
1229 chunk_size: usize,
1230 start: Bound<usize>,
1231 end: usize,
1232 ) -> BoxStream<'static, Fetch<T>>
1233 where
1234 T: RangedFetchable<Types>,
1235 {
1236 let chunk_fetch_delay = self.chunk_fetch_delay;
1237 let active_fetch_delay = self.active_fetch_delay;
1238
1239 stream::iter(range_chunks_rev(start, end, chunk_size))
1240 .then(move |chunk| {
1241 let self_clone = self.clone();
1242 async move {
1243 {
1244 let chunk = self_clone.get_chunk(chunk).await;
1245
1246 sleep(chunk_fetch_delay).await;
1251 stream::iter(chunk.into_iter().rev())
1252 }
1253 }
1254 })
1255 .flatten()
1256 .then(move |f| async move {
1257 match f {
1258 Fetch::Pending(_) => sleep(active_fetch_delay).await,
1262 Fetch::Ready(_) => (),
1263 };
1264 f
1265 })
1266 .boxed()
1267 }
1268
1269 async fn get_chunk<T>(self: &Arc<Self>, chunk: Range<usize>) -> Vec<Fetch<T>>
1276 where
1277 T: RangedFetchable<Types>,
1278 {
1279 let passive_fetches = join_all(
1283 chunk
1284 .clone()
1285 .map(|i| T::passive_fetch(&self.notifiers, i.into())),
1286 )
1287 .await;
1288
1289 match self.try_get_chunk(&chunk).await {
1290 Ok(objs) => {
1291 return objs
1294 .into_iter()
1295 .zip(passive_fetches)
1296 .enumerate()
1297 .map(move |(i, (obj, passive_fetch))| match obj {
1298 Some(obj) => Fetch::Ready(obj),
1299 None => passive(T::Request::from(chunk.start + i), passive_fetch),
1300 })
1301 .collect();
1302 },
1303 Err(err) => {
1304 tracing::warn!(
1305 ?chunk,
1306 "unable to fetch chunk; spawning a task to retry: {err:#}"
1307 );
1308 },
1309 }
1310
1311 let (send, recv): (Vec<_>, Vec<_>) =
1313 repeat_with(oneshot::channel).take(chunk.len()).unzip();
1314
1315 {
1316 let fetcher = self.clone();
1317 let mut backoff = fetcher.backoff.clone();
1318 let chunk = chunk.clone();
1319 let span = tracing::warn_span!("get_chunk retry", ?chunk);
1320 spawn(
1321 async move {
1322 backoff.reset();
1323 let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1324 loop {
1325 let res = {
1326 let _guard = fetcher.retry_semaphore.acquire().await;
1331 fetcher.try_get_chunk(&chunk).await
1332 };
1333 match res {
1334 Ok(objs) => {
1335 for (i, (obj, sender)) in objs.into_iter().zip(send).enumerate() {
1336 if let Some(obj) = obj {
1337 tracing::info!(?chunk, i, "object was ready after retries");
1341 sender.send(obj).ok();
1342 } else {
1343 tracing::info!(?chunk, i, "spawned fetch after retries");
1348 }
1349 }
1350 break;
1351 },
1352 Err(err) => {
1353 tracing::warn!(
1354 ?chunk,
1355 ?delay,
1356 "unable to fetch chunk, will retry: {err:#}"
1357 );
1358 sleep(delay).await;
1359 if let Some(next_delay) = backoff.next_backoff() {
1360 delay = next_delay;
1361 }
1362 },
1363 }
1364 }
1365 }
1366 .instrument(span),
1367 );
1368 }
1369
1370 passive_fetches
1373 .into_iter()
1374 .zip(recv)
1375 .enumerate()
1376 .map(move |(i, (passive_fetch, recv))| {
1377 passive(
1378 T::Request::from(chunk.start + i),
1379 select_some(passive_fetch, recv.map(Result::ok)),
1380 )
1381 })
1382 .collect()
1383 }
1384
1385 async fn try_get_chunk<T>(
1398 self: &Arc<Self>,
1399 chunk: &Range<usize>,
1400 ) -> anyhow::Result<Vec<Option<T>>>
1401 where
1402 T: RangedFetchable<Types>,
1403 {
1404 let mut tx = self.read().await.context("opening read transaction")?;
1405 let ts = T::load_range(&mut tx, chunk.clone())
1406 .await
1407 .context(format!("when fetching items in range {chunk:?}"))?;
1408
1409 let ts = ts.into_iter().filter_map(ResultExt::ok_or_trace);
1416
1417 let mut results = Vec::with_capacity(chunk.len());
1419 for t in ts {
1420 while chunk.start + results.len() < t.height() as usize {
1422 tracing::debug!(
1423 "item {} in chunk not available, will be fetched",
1424 results.len()
1425 );
1426 self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1427 .await?;
1428 results.push(None);
1429 }
1430
1431 results.push(Some(t));
1432 }
1433 while results.len() < chunk.len() {
1435 self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1436 .await?;
1437 results.push(None);
1438 }
1439
1440 Ok(results)
1441 }
1442
1443 async fn fetch<T>(
1449 self: &Arc<Self>,
1450 tx: &mut <Self as VersionedDataSource>::ReadOnly<'_>,
1451 req: T::Request,
1452 ) -> anyhow::Result<()>
1453 where
1454 T: Fetchable<Types>,
1455 {
1456 tracing::debug!("fetching resource {req:?}");
1457
1458 let heights = Heights::load(tx)
1460 .await
1461 .context("failed to load heights; cannot definitively say object might exist")?;
1462 if req.might_exist(heights) {
1463 T::active_fetch(tx, self.clone(), req).await?;
1464 } else {
1465 tracing::debug!("not fetching object {req:?} that cannot exist at {heights:?}");
1466 }
1467 Ok(())
1468 }
1469
1470 async fn proactive_scan(
1476 self: Arc<Self>,
1477 interval: Duration,
1478 chunk_size: usize,
1479 metrics: ScannerMetrics,
1480 ) {
1481 for i in 0.. {
1482 let span = tracing::warn_span!("proactive scan", i);
1483 metrics.running.set(1);
1484 metrics.current_scan.set(i);
1485 async {
1486 let sync_status = {
1487 match self.sync_status().await {
1488 Ok(st) => st,
1489 Err(err) => {
1490 tracing::warn!(
1491 "unable to load sync status, scan will be skipped: {err:#}"
1492 );
1493 return;
1494 },
1495 }
1496 };
1497 tracing::info!(?sync_status, "starting scan");
1498 metrics.missing_blocks.set(sync_status.blocks.missing);
1499 metrics.missing_vid.set(sync_status.vid_common.missing);
1500
1501 for range in sync_status.blocks.ranges {
1504 metrics.scanned_blocks.set(range.start);
1505 if range.status != SyncStatus::Missing {
1506 metrics.scanned_blocks.set(range.end);
1507 continue;
1508 }
1509
1510 tracing::info!(?range, "fetching missing block range");
1511
1512 for chunk in range_chunks_aligned(range.start..range.end, chunk_size) {
1515 tracing::info!(?chunk, "fetching missing block chunk");
1516
1517 self.get::<NonEmptyRange<BlockQueryData<Types>>>(RangeRequest {
1520 start: chunk.start as u64,
1521 end: chunk.end as u64,
1522 })
1523 .await
1524 .await;
1525
1526 metrics
1527 .missing_blocks
1528 .update((chunk.start as i64) - (chunk.end as i64));
1529 metrics.scanned_blocks.set(chunk.end);
1530 }
1531 }
1532
1533 for range in sync_status.vid_common.ranges {
1535 metrics.scanned_vid.set(range.start);
1536 if range.status != SyncStatus::Missing {
1537 metrics.scanned_vid.set(range.end);
1538 continue;
1539 }
1540
1541 tracing::info!(?range, "fetching missing VID range");
1542 for chunk in range_chunks_aligned(range.start..range.end, chunk_size) {
1543 tracing::info!(?chunk, "fetching missing VID chunk");
1544 self.get::<NonEmptyRange<VidCommonQueryData<Types>>>(RangeRequest {
1545 start: chunk.start as u64,
1546 end: chunk.end as u64,
1547 })
1548 .await
1549 .await;
1550
1551 metrics
1552 .missing_vid
1553 .update((chunk.start as i64) - (chunk.end as i64));
1554 metrics.scanned_vid.set(chunk.end);
1555 }
1556 }
1557
1558 tracing::info!("completed proactive scan, will scan again in {interval:?}");
1559
1560 metrics.running.set(0);
1562 }
1563 .instrument(span)
1564 .await;
1565
1566 sleep(interval).await;
1567 }
1568 }
1569}
1570
1571impl<Types, S, P> Fetcher<Types, S, P>
1572where
1573 Types: NodeType,
1574 Header<Types>: QueryableHeader<Types>,
1575 S: VersionedDataSource + 'static,
1576 for<'a> S::ReadOnly<'a>: NodeStorage<Types> + PrunedHeightStorage,
1577 P: Send + Sync,
1578{
1579 async fn sync_status(&self) -> anyhow::Result<SyncStatusQueryData> {
1580 let mut cache = self.sync_status.lock().await;
1585 if let Some(sync_status) = cache.try_get() {
1586 return Ok(sync_status.clone());
1587 }
1588 tracing::debug!("updating sync status");
1589
1590 let heights = {
1591 let mut tx = self
1592 .read()
1593 .await
1594 .context("opening transaction to load heights")?;
1595 Heights::load(&mut tx).await.context("loading heights")?
1596 };
1597
1598 let mut res = SyncStatusQueryData {
1599 pruned_height: heights.pruned_height.map(|h| h as usize),
1600 ..Default::default()
1601 };
1602 let start = if let Some(height) = res.pruned_height {
1603 let range = SyncStatusRange {
1605 status: SyncStatus::Pruned,
1606 start: 0,
1607 end: height + 1,
1608 };
1609 res.blocks.ranges.push(range);
1610 res.leaves.ranges.push(range);
1611 res.vid_common.ranges.push(range);
1612
1613 height + 1
1614 } else {
1615 0
1616 };
1617
1618 for chunk in range_chunks(
1621 start..(heights.height as usize),
1622 self.sync_status_chunk_size,
1623 ) {
1624 tracing::debug!(chunk.start, chunk.end, "checking sync status in sub-range");
1625 let metrics = self.sync_status_metrics.start_range(&chunk);
1626 let mut tx = self
1627 .read()
1628 .await
1629 .context("opening transaction to sync status range")?;
1630 let range_status = tx
1631 .sync_status_for_range(chunk.start, chunk.end)
1632 .await
1633 .context(format!("checking sync status in sub-range {chunk:?}"))?;
1634 tracing::debug!(
1635 chunk.start,
1636 chunk.end,
1637 ?range_status,
1638 "found sync status for range"
1639 );
1640
1641 res.blocks.extend(range_status.blocks);
1642 res.leaves.extend(range_status.leaves);
1643 res.vid_common.extend(range_status.vid_common);
1644 metrics.end();
1645 }
1646
1647 cache.update(res.clone());
1648 Ok(res)
1649 }
1650}
1651
1652impl<Types, S, P> Fetcher<Types, S, P>
1653where
1654 Types: NodeType,
1655 Header<Types>: QueryableHeader<Types>,
1656 Payload<Types>: QueryablePayload<Types>,
1657 S: VersionedDataSource + 'static,
1658 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
1659 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
1660 + NodeStorage<Types>
1661 + PrunedHeightStorage
1662 + AggregatesStorage<Types>,
1663 P: AvailabilityProvider<Types>,
1664{
1665 #[tracing::instrument(skip_all)]
1666 async fn aggregate(self: Arc<Self>, chunk_size: usize, metrics: AggregatorMetrics) {
1667 loop {
1668 let prev_aggregate = loop {
1669 let mut tx = match self.read().await {
1670 Ok(tx) => tx,
1671 Err(err) => {
1672 tracing::error!("unable to open read tx: {err:#}");
1673 sleep(Duration::from_secs(5)).await;
1674 continue;
1675 },
1676 };
1677 match tx.load_prev_aggregate().await {
1678 Ok(agg) => break agg,
1679 Err(err) => {
1680 tracing::error!("unable to load previous aggregate: {err:#}");
1681 sleep(Duration::from_secs(5)).await;
1682 continue;
1683 },
1684 }
1685 };
1686
1687 let (start, mut prev_aggregate) = match prev_aggregate {
1688 Some(aggregate) => (aggregate.height as usize + 1, aggregate),
1689 None => (0, Aggregate::default()),
1690 };
1691
1692 tracing::info!(start, "starting aggregator");
1693 metrics.height.set(start);
1694
1695 let mut blocks = self
1696 .clone()
1697 .get_range_with_chunk_size::<_, PayloadMetadata<Types>>(chunk_size, start..)
1698 .then(Fetch::resolve)
1699 .ready_chunks(chunk_size)
1700 .boxed();
1701 while let Some(chunk) = blocks.next().await {
1702 let Some(last) = chunk.last() else {
1703 tracing::warn!("ready_chunks returned an empty chunk");
1705 continue;
1706 };
1707 let height = last.height();
1708 let num_blocks = chunk.len();
1709 tracing::debug!(
1710 num_blocks,
1711 height,
1712 "updating aggregate statistics for chunk"
1713 );
1714 loop {
1715 let res = async {
1716 let mut tx = self.write().await.context("opening transaction")?;
1717 let aggregate =
1718 tx.update_aggregates(prev_aggregate.clone(), &chunk).await?;
1719 tx.commit().await.context("committing transaction")?;
1720 prev_aggregate = aggregate;
1721 anyhow::Result::<_>::Ok(())
1722 }
1723 .await;
1724 match res {
1725 Ok(()) => {
1726 break;
1727 },
1728 Err(err) => {
1729 tracing::warn!(
1730 num_blocks,
1731 height,
1732 "failed to update aggregates for chunk: {err:#}"
1733 );
1734 sleep(Duration::from_secs(1)).await;
1735 },
1736 }
1737 }
1738 metrics.height.set(height as usize);
1739 }
1740 tracing::warn!("aggregator block stream ended unexpectedly; will restart");
1741 }
1742 }
1743}
1744
1745impl<Types, S, P> Fetcher<Types, S, P>
1746where
1747 Types: NodeType,
1748 Header<Types>: QueryableHeader<Types>,
1749 Payload<Types>: QueryablePayload<Types>,
1750 S: VersionedDataSource + 'static,
1751 for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
1752 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1753 P: AvailabilityProvider<Types>,
1754{
1755 async fn get_cert2(self: &Arc<Self>, height: u64) -> QueryResult<Option<Certificate2<Types>>> {
1757 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1758 message: err.to_string(),
1759 })?;
1760
1761 if let Some(cert2) = tx.load_cert2(height).await? {
1762 return Ok(Some(cert2));
1763 }
1764
1765 drop(tx);
1766
1767 let Some(cert2) = self
1768 .provider
1769 .fetch(request::Certificate2Request { height })
1770 .await
1771 .flatten()
1772 else {
1773 return Ok(None);
1774 };
1775
1776 self.store(&(height, cert2.clone())).await;
1777 Ok(Some(cert2))
1778 }
1779}
1780
1781impl<Types, S, P> Fetcher<Types, S, P>
1782where
1783 Types: NodeType,
1784 S: VersionedDataSource,
1785 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1786{
1787 async fn store_and_notify<T>(&self, obj: &T)
1789 where
1790 T: Storable<Types>,
1791 {
1792 self.store(obj).await;
1793
1794 obj.notify(&self.notifiers).await;
1819 }
1820
1821 async fn store<T>(&self, obj: &T)
1822 where
1823 T: Storable<Types>,
1824 {
1825 let try_store = || async {
1826 let mut tx = self.storage.write().await?;
1827 obj.clone().store(&mut tx, self.leaf_only).await?;
1828 tx.commit().await
1829 };
1830
1831 let mut backoff = self.backoff.clone();
1833 backoff.reset();
1834 loop {
1835 let Err(err) = try_store().await else {
1836 break;
1837 };
1838 tracing::warn!(
1842 obj = obj.debug_name(),
1843 "failed to store fetched object: {err:#}"
1844 );
1845
1846 let Some(delay) = backoff.next_backoff() else {
1847 break;
1848 };
1849 tracing::info!(?delay, "retrying failed operation");
1850 sleep(delay).await;
1851 }
1852 }
1853}
1854
1855#[derive(Debug)]
1856struct Notifiers<Types>
1857where
1858 Types: NodeType,
1859{
1860 block: Notifier<BlockQueryData<Types>>,
1861 leaf: Notifier<LeafQueryData<Types>>,
1862 vid_common: Notifier<VidCommonQueryData<Types>>,
1863}
1864
1865impl<Types> Default for Notifiers<Types>
1866where
1867 Types: NodeType,
1868{
1869 fn default() -> Self {
1870 Self {
1871 block: Notifier::new(),
1872 leaf: Notifier::new(),
1873 vid_common: Notifier::new(),
1874 }
1875 }
1876}
1877
1878#[derive(Clone, Copy, Debug)]
1879struct Heights {
1880 height: u64,
1881 pruned_height: Option<u64>,
1882}
1883
1884impl Heights {
1885 async fn load<Types, T>(tx: &mut T) -> anyhow::Result<Self>
1886 where
1887 Types: NodeType,
1888 Header<Types>: QueryableHeader<Types>,
1889 T: NodeStorage<Types> + PrunedHeightStorage + Send,
1890 {
1891 let height = tx.block_height().await.context("loading block height")? as u64;
1892 let pruned_height = tx
1893 .load_pruned_height()
1894 .await
1895 .context("loading pruned height")?;
1896 Ok(Self {
1897 height,
1898 pruned_height,
1899 })
1900 }
1901
1902 fn might_exist(self, h: u64) -> bool {
1903 h < self.height && self.pruned_height.is_none_or(|ph| h > ph)
1904 }
1905}
1906
1907#[async_trait]
1908impl<Types, S, P, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
1909 for FetchingDataSource<Types, S, P>
1910where
1911 Types: NodeType,
1912 S: VersionedDataSource + 'static,
1913 for<'a> S::ReadOnly<'a>: MerklizedStateStorage<Types, State, ARITY>,
1914 P: Send + Sync,
1915 State: MerklizedState<Types, ARITY> + 'static,
1916 <State as MerkleTreeScheme>::Commitment: Send,
1917{
1918 async fn get_path(
1919 &self,
1920 snapshot: Snapshot<Types, State, ARITY>,
1921 key: State::Key,
1922 ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
1923 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1924 message: err.to_string(),
1925 })?;
1926 tx.get_path(snapshot, key).await
1927 }
1928}
1929
1930#[async_trait]
1931impl<Types, S, P> MerklizedStateHeightPersistence for FetchingDataSource<Types, S, P>
1932where
1933 Types: NodeType,
1934 Header<Types>: QueryableHeader<Types>,
1935 Payload<Types>: QueryablePayload<Types>,
1936 S: VersionedDataSource + 'static,
1937 for<'a> S::ReadOnly<'a>: MerklizedStateHeightStorage,
1938 P: Send + Sync,
1939{
1940 async fn get_last_state_height(&self) -> QueryResult<usize> {
1941 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1942 message: err.to_string(),
1943 })?;
1944 tx.get_last_state_height().await
1945 }
1946}
1947
1948#[async_trait]
1949impl<Types, S, P> NodeDataSource<Types> for FetchingDataSource<Types, S, P>
1950where
1951 Types: NodeType,
1952 Header<Types>: QueryableHeader<Types>,
1953 S: VersionedDataSource + 'static,
1954 for<'a> S::ReadOnly<'a>: NodeStorage<Types> + PrunedHeightStorage,
1955 P: Send + Sync,
1956{
1957 async fn block_height(&self) -> QueryResult<usize> {
1958 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1959 message: err.to_string(),
1960 })?;
1961 tx.block_height().await
1962 }
1963
1964 async fn count_transactions_in_range(
1965 &self,
1966 range: impl RangeBounds<usize> + Send,
1967 namespace: Option<NamespaceId<Types>>,
1968 ) -> QueryResult<usize> {
1969 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1970 message: err.to_string(),
1971 })?;
1972 tx.count_transactions_in_range(range, namespace).await
1973 }
1974
1975 async fn payload_size_in_range(
1976 &self,
1977 range: impl RangeBounds<usize> + Send,
1978 namespace: Option<NamespaceId<Types>>,
1979 ) -> QueryResult<usize> {
1980 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1981 message: err.to_string(),
1982 })?;
1983 tx.payload_size_in_range(range, namespace).await
1984 }
1985
1986 async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
1987 where
1988 ID: Into<BlockId<Types>> + Send + Sync,
1989 {
1990 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1991 message: err.to_string(),
1992 })?;
1993 tx.vid_share(id).await
1994 }
1995
1996 async fn sync_status(&self) -> QueryResult<SyncStatusQueryData> {
1997 self.fetcher
1998 .sync_status()
1999 .await
2000 .map_err(|err| QueryError::Error {
2001 message: format!("{err:#}"),
2002 })
2003 }
2004
2005 async fn get_header_window(
2006 &self,
2007 start: impl Into<WindowStart<Types>> + Send + Sync,
2008 end: u64,
2009 limit: usize,
2010 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
2011 let mut tx = self.read().await.map_err(|err| QueryError::Error {
2012 message: err.to_string(),
2013 })?;
2014 tx.get_header_window(start, end, limit).await
2015 }
2016}
2017
2018#[async_trait]
2019impl<Types, S, P> ExplorerDataSource<Types> for FetchingDataSource<Types, S, P>
2020where
2021 Types: NodeType,
2022 Payload<Types>: QueryablePayload<Types>,
2023 Header<Types>: QueryableHeader<Types> + explorer::traits::ExplorerHeader<Types>,
2024 crate::Transaction<Types>: explorer::traits::ExplorerTransaction<Types>,
2025 S: VersionedDataSource + 'static,
2026 for<'a> S::ReadOnly<'a>: ExplorerStorage<Types>,
2027 P: Send + Sync,
2028{
2029 async fn get_block_summaries(
2030 &self,
2031 request: explorer::query_data::GetBlockSummariesRequest<Types>,
2032 ) -> Result<
2033 Vec<explorer::query_data::BlockSummary<Types>>,
2034 explorer::query_data::GetBlockSummariesError,
2035 > {
2036 let mut tx = self.read().await.map_err(|err| QueryError::Error {
2037 message: err.to_string(),
2038 })?;
2039 tx.get_block_summaries(request).await
2040 }
2041
2042 async fn get_block_detail(
2043 &self,
2044 request: explorer::query_data::BlockIdentifier<Types>,
2045 ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
2046 {
2047 let mut tx = self.read().await.map_err(|err| QueryError::Error {
2048 message: err.to_string(),
2049 })?;
2050 tx.get_block_detail(request).await
2051 }
2052
2053 async fn get_transaction_summaries(
2054 &self,
2055 request: explorer::query_data::GetTransactionSummariesRequest<Types>,
2056 ) -> Result<
2057 Vec<explorer::query_data::TransactionSummary<Types>>,
2058 explorer::query_data::GetTransactionSummariesError,
2059 > {
2060 let mut tx = self.read().await.map_err(|err| QueryError::Error {
2061 message: err.to_string(),
2062 })?;
2063 tx.get_transaction_summaries(request).await
2064 }
2065
2066 async fn get_transaction_detail(
2067 &self,
2068 request: explorer::query_data::TransactionIdentifier<Types>,
2069 ) -> Result<
2070 explorer::query_data::TransactionDetailResponse<Types>,
2071 explorer::query_data::GetTransactionDetailError,
2072 > {
2073 let mut tx = self.read().await.map_err(|err| QueryError::Error {
2074 message: err.to_string(),
2075 })?;
2076 tx.get_transaction_detail(request).await
2077 }
2078
2079 async fn get_explorer_summary(
2080 &self,
2081 ) -> Result<
2082 explorer::query_data::ExplorerSummary<Types>,
2083 explorer::query_data::GetExplorerSummaryError,
2084 > {
2085 let mut tx = self.read().await.map_err(|err| QueryError::Error {
2086 message: err.to_string(),
2087 })?;
2088 tx.get_explorer_summary().await
2089 }
2090
2091 async fn get_search_results(
2092 &self,
2093 query: TaggedBase64,
2094 ) -> Result<
2095 explorer::query_data::SearchResult<Types>,
2096 explorer::query_data::GetSearchResultsError,
2097 > {
2098 let mut tx = self.read().await.map_err(|err| QueryError::Error {
2099 message: err.to_string(),
2100 })?;
2101 tx.get_search_results(query).await
2102 }
2103}
2104
2105pub trait AvailabilityProvider<Types: NodeType>:
2107 Provider<Types, request::LeafRequest>
2108 + Provider<Types, request::LeafRangeRequest>
2109 + Provider<Types, request::PayloadRequest>
2110 + Provider<Types, request::BlockRangeRequest>
2111 + Provider<Types, request::VidCommonRequest>
2112 + Provider<Types, request::VidCommonRangeRequest>
2113 + Provider<Types, request::Certificate2Request>
2114 + Sync
2115 + 'static
2116{
2117}
2118impl<Types: NodeType, P> AvailabilityProvider<Types> for P where
2119 P: Provider<Types, request::LeafRequest>
2120 + Provider<Types, request::LeafRangeRequest>
2121 + Provider<Types, request::PayloadRequest>
2122 + Provider<Types, request::BlockRangeRequest>
2123 + Provider<Types, request::VidCommonRequest>
2124 + Provider<Types, request::VidCommonRangeRequest>
2125 + Provider<Types, request::Certificate2Request>
2126 + Sync
2127 + 'static
2128{
2129}
2130
2131trait FetchRequest: Copy + Debug + Send + Sync + 'static {
2132 fn might_exist(self, _heights: Heights) -> bool {
2143 true
2144 }
2145}
2146
2147#[async_trait]
2153trait Fetchable<Types>: Clone + Send + Sync + 'static
2154where
2155 Types: NodeType,
2156 Header<Types>: QueryableHeader<Types>,
2157 Payload<Types>: QueryablePayload<Types>,
2158{
2159 type Request: FetchRequest;
2161
2162 fn satisfies(&self, req: Self::Request) -> bool;
2164
2165 async fn active_fetch<S, P>(
2184 tx: &mut impl AvailabilityStorage<Types>,
2185 fetcher: Arc<Fetcher<Types, S, P>>,
2186 req: Self::Request,
2187 ) -> anyhow::Result<()>
2188 where
2189 S: VersionedDataSource + 'static,
2190 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
2191 for<'a> S::ReadOnly<'a>:
2192 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
2193 P: AvailabilityProvider<Types>;
2194
2195 async fn passive_fetch(notifiers: &Notifiers<Types>, req: Self::Request) -> PassiveFetch<Self>;
2197
2198 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
2203 where
2204 S: AvailabilityStorage<Types>;
2205}
2206
2207type PassiveFetch<T> = BoxFuture<'static, Option<T>>;
2208
2209#[async_trait]
2210trait RangedFetchable<Types>: Fetchable<Types, Request = Self::RangedRequest> + HeightIndexed
2211where
2212 Types: NodeType,
2213 Header<Types>: QueryableHeader<Types>,
2214 Payload<Types>: QueryablePayload<Types>,
2215{
2216 type RangedRequest: FetchRequest + From<usize> + Send;
2217
2218 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
2220 where
2221 S: AvailabilityStorage<Types>,
2222 R: RangeBounds<usize> + Send + 'static;
2223}
2224
2225trait Storable<Types: NodeType>: Clone {
2227 fn debug_name(&self) -> String;
2229
2230 fn notify(&self, notifiers: &Notifiers<Types>) -> impl Send + Future<Output = ()>;
2232
2233 fn store(
2235 &self,
2236 storage: &mut impl UpdateAvailabilityStorage<Types>,
2237 leaf_only: bool,
2238 ) -> impl Send + Future<Output = anyhow::Result<()>>;
2239}
2240
2241impl<Types: NodeType> Storable<Types>
2242 for (
2243 LeafQueryData<Types>,
2244 Option<[CertificatePair<Types>; 2]>,
2245 Option<Certificate2<Types>>,
2246 )
2247{
2248 fn debug_name(&self) -> String {
2249 format!("leaf {} with QC chain", self.0.height())
2250 }
2251
2252 async fn notify(&self, notifiers: &Notifiers<Types>) {
2253 self.0.notify(notifiers).await;
2254 }
2255
2256 async fn store(
2257 &self,
2258 storage: &mut impl UpdateAvailabilityStorage<Types>,
2259 _leaf_only: bool,
2260 ) -> anyhow::Result<()> {
2261 storage
2262 .insert_leaf_with_qc_chain(&self.0, self.1.clone())
2263 .await
2264 .context("inserting leaf with QC chain")?;
2265 if let Some(cert2) = &self.2 {
2266 storage.insert_cert2(self.0.height(), cert2.clone()).await?;
2267 }
2268 Ok(())
2269 }
2270}
2271
2272impl<Types: NodeType> Storable<Types> for (u64, Certificate2<Types>) {
2273 fn debug_name(&self) -> String {
2274 format!("cert2 at height {}", self.0)
2275 }
2276
2277 async fn notify(&self, _notifiers: &Notifiers<Types>) {
2278 }
2280
2281 async fn store(
2282 &self,
2283 storage: &mut impl UpdateAvailabilityStorage<Types>,
2284 _leaf_only: bool,
2285 ) -> anyhow::Result<()> {
2286 storage.insert_cert2(self.0, self.1.clone()).await
2287 }
2288}
2289
2290fn range_chunks<R>(range: R, chunk_size: usize) -> impl Iterator<Item = Range<usize>>
2292where
2293 R: RangeBounds<usize>,
2294{
2295 let Range { mut start, end } = range_to_bounds(range);
2297 std::iter::from_fn(move || {
2298 let chunk_end = min(start + chunk_size, end);
2299 if chunk_end == start {
2300 return None;
2301 }
2302
2303 let chunk = start..chunk_end;
2304 start = chunk_end;
2305 Some(chunk)
2306 })
2307}
2308
2309fn range_chunks_aligned<R>(range: R, alignment: usize) -> impl Iterator<Item = Range<usize>>
2315where
2316 R: RangeBounds<usize>,
2317{
2318 let Range { mut start, end } = range_to_bounds(range);
2320
2321 let first = if start.is_multiple_of(alignment) {
2323 None
2324 } else {
2325 let chunk_end = min(start.next_multiple_of(alignment), end);
2328 let chunk = start..chunk_end;
2329
2330 start = chunk_end;
2332 Some(chunk)
2333 };
2334
2335 first.into_iter().chain(range_chunks(start..end, alignment))
2336}
2337
2338fn range_to_bounds(range: impl RangeBounds<usize>) -> Range<usize> {
2340 let start = match range.start_bound() {
2341 Bound::Included(i) => *i,
2342 Bound::Excluded(i) => *i + 1,
2343 Bound::Unbounded => 0,
2344 };
2345 let end = match range.end_bound() {
2346 Bound::Included(i) => *i + 1,
2347 Bound::Excluded(i) => *i,
2348 Bound::Unbounded => usize::MAX,
2349 };
2350 Range { start, end }
2351}
2352
2353fn range_chunks_rev(
2364 start: Bound<usize>,
2365 end: usize,
2366 chunk_size: usize,
2367) -> impl Iterator<Item = Range<usize>> {
2368 let start = match start {
2370 Bound::Included(i) => i,
2371 Bound::Excluded(i) => i + 1,
2372 Bound::Unbounded => 0,
2373 };
2374 let mut end = end + 1;
2376
2377 std::iter::from_fn(move || {
2378 let chunk_start = max(start, end.saturating_sub(chunk_size));
2379 if end <= chunk_start {
2380 return None;
2381 }
2382
2383 let chunk = chunk_start..end;
2384 end = chunk_start;
2385 Some(chunk)
2386 })
2387}
2388
2389trait ResultExt<T, E> {
2390 fn ok_or_trace(self) -> Option<T>
2391 where
2392 E: Display;
2393}
2394
2395impl<T, E> ResultExt<T, E> for Result<T, E> {
2396 fn ok_or_trace(self) -> Option<T>
2397 where
2398 E: Display,
2399 {
2400 match self {
2401 Ok(t) => Some(t),
2402 Err(err) => {
2403 tracing::info!(
2404 "error loading resource from local storage, will try to fetch: {err:#}"
2405 );
2406 None
2407 },
2408 }
2409 }
2410}
2411
2412#[derive(Debug)]
2413struct ScannerMetrics {
2414 running: Box<dyn Gauge>,
2416 current_scan: Box<dyn Gauge>,
2418 scanned_blocks: Box<dyn Gauge>,
2420 scanned_vid: Box<dyn Gauge>,
2422 missing_blocks: Box<dyn Gauge>,
2424 missing_vid: Box<dyn Gauge>,
2426}
2427
2428impl ScannerMetrics {
2429 fn new(metrics: &PrometheusMetrics) -> Self {
2430 let group = metrics.subgroup("scanner".into());
2431 Self {
2432 running: group.create_gauge("running".into(), None),
2433 current_scan: group.create_gauge("current".into(), None),
2434 scanned_blocks: group.create_gauge("scanned_blocks".into(), None),
2435 scanned_vid: group.create_gauge("scanned_vid".into(), None),
2436 missing_blocks: group.create_gauge("missing_blocks".into(), None),
2437 missing_vid: group.create_gauge("missing_vid".into(), None),
2438 }
2439 }
2440}
2441
2442#[derive(Debug)]
2443struct AggregatorMetrics {
2444 height: Box<dyn Gauge>,
2446}
2447
2448impl AggregatorMetrics {
2449 fn new(metrics: &PrometheusMetrics) -> Self {
2450 let group = metrics.subgroup("aggregator".into());
2451 Self {
2452 height: group.create_gauge("height".into(), None),
2453 }
2454 }
2455}
2456
2457#[derive(Debug)]
2458struct SyncStatusMetrics {
2459 current_range_start: Box<dyn Gauge>,
2460 current_range_end: Box<dyn Gauge>,
2461 current_start_time: Box<dyn Gauge>,
2462 avg_rate: Box<dyn Histogram>,
2463 ranges_scanned: Box<dyn Counter>,
2464 running: Box<dyn Gauge>,
2465}
2466
2467impl SyncStatusMetrics {
2468 fn new(metrics: &PrometheusMetrics, size: usize) -> Self {
2469 let group = metrics.subgroup("sync_status".into());
2470 group.create_gauge("range_size".into(), None).set(size);
2471
2472 Self {
2473 current_range_start: group.create_gauge("current_range_start".into(), None),
2474 current_range_end: group.create_gauge("current_range_end".into(), None),
2475 current_start_time: group
2476 .create_gauge("current_range_start_time".into(), Some("s".into())),
2477 avg_rate: group
2478 .create_histogram("avg_time_per_block_scanned".into(), Some("ms".into())),
2479 ranges_scanned: group.create_counter("ranges_scanned".into(), None),
2480 running: group.create_gauge("running".into(), None),
2481 }
2482 }
2483
2484 fn start_range(&self, range: &Range<usize>) -> SyncStatusRangeMetrics<'_> {
2485 let start = Utc::now();
2486 self.current_range_start.set(range.start);
2487 self.current_range_end.set(range.end);
2488 self.current_start_time.set(start.timestamp() as usize);
2489 self.running.set(1);
2490 SyncStatusRangeMetrics {
2491 size: range.end - range.start,
2492 start,
2493 metrics: self,
2494 }
2495 }
2496}
2497
2498#[must_use]
2499#[derive(Debug)]
2500struct SyncStatusRangeMetrics<'a> {
2501 size: usize,
2502 start: DateTime<Utc>,
2503 metrics: &'a SyncStatusMetrics,
2504}
2505
2506impl<'a> SyncStatusRangeMetrics<'a> {
2507 fn end(self) {
2508 let elapsed = Utc::now() - self.start;
2509 self.metrics
2510 .avg_rate
2511 .add_point((elapsed.num_milliseconds() as f64) / (self.size as f64));
2512 self.metrics.ranges_scanned.add(1);
2513 self.metrics.running.set(0);
2514 }
2515}
2516
2517#[derive(Debug)]
2518struct CachedSyncStatus {
2519 last_updated: Instant,
2520 ttl: Duration,
2521 cached: Option<SyncStatusQueryData>,
2522}
2523
2524impl CachedSyncStatus {
2525 fn new(ttl: Duration) -> Self {
2526 Self {
2527 last_updated: Instant::now(),
2528 ttl,
2529 cached: None,
2530 }
2531 }
2532
2533 fn try_get(&self) -> Option<&SyncStatusQueryData> {
2535 if self.last_updated.elapsed() > self.ttl {
2536 return None;
2538 }
2539 self.cached.as_ref()
2540 }
2541
2542 fn update(&mut self, value: SyncStatusQueryData) {
2544 self.last_updated = Instant::now();
2545 self.cached = Some(value);
2546 }
2547}
2548
2549fn passive<T>(
2553 req: impl Debug + Send + 'static,
2554 fut: impl Future<Output = Option<T>> + Send + 'static,
2555) -> Fetch<T>
2556where
2557 T: Send + 'static,
2558{
2559 Fetch::Pending(
2560 fut.then(move |opt| async move {
2561 match opt {
2562 Some(t) => t,
2563 None => {
2564 panic!("notifier dropped without satisfying request {req:?}");
2579 },
2580 }
2581 })
2582 .boxed(),
2583 )
2584}
2585
2586async fn select_some<T>(
2588 a: impl Future<Output = Option<T>> + Unpin,
2589 b: impl Future<Output = Option<T>> + Unpin,
2590) -> Option<T> {
2591 match future::select(a, b).await {
2592 Either::Left((Some(a), _)) => Some(a),
2594 Either::Right((Some(b), _)) => Some(b),
2595
2596 Either::Left((None, b)) => b.await,
2598 Either::Right((None, a)) => a.await,
2599 }
2600}
2601
2602#[cfg(test)]
2603mod test {
2604 use hotshot_example_types::node_types::TEST_VERSIONS;
2605
2606 use super::*;
2607 use crate::{
2608 data_source::{
2609 sql::testing::TmpDb,
2610 storage::{SqlStorage, StorageConnectionType},
2611 },
2612 fetching::provider::NoFetching,
2613 testing::{consensus::MockSqlDataSource, mocks::MockTypes},
2614 };
2615
2616 #[test]
2617 fn test_range_chunks() {
2618 assert_eq!(
2620 range_chunks(0..=4, 2).collect::<Vec<_>>(),
2621 [0..2, 2..4, 4..5]
2622 );
2623
2624 assert_eq!(
2626 range_chunks(0..=5, 2).collect::<Vec<_>>(),
2627 [0..2, 2..4, 4..6]
2628 );
2629
2630 assert_eq!(
2632 range_chunks(0..5, 2).collect::<Vec<_>>(),
2633 [0..2, 2..4, 4..5]
2634 );
2635
2636 assert_eq!(
2638 range_chunks(0..6, 2).collect::<Vec<_>>(),
2639 [0..2, 2..4, 4..6]
2640 );
2641
2642 assert_eq!(
2644 range_chunks(0.., 2).take(5).collect::<Vec<_>>(),
2645 [0..2, 2..4, 4..6, 6..8, 8..10]
2646 );
2647 }
2648
2649 #[test]
2650 fn test_range_chunks_aligned() {
2651 #![allow(clippy::single_range_in_vec_init)]
2652
2653 assert_eq!(
2655 range_chunks_aligned(2..5, 2).collect::<Vec<_>>(),
2656 [2..4, 4..5]
2657 );
2658
2659 assert_eq!(
2661 range_chunks_aligned(1..4, 2).collect::<Vec<_>>(),
2662 [1..2, 2..4]
2663 );
2664
2665 assert_eq!(range_chunks_aligned(1..3, 10).collect::<Vec<_>>(), [1..3]);
2667
2668 assert_eq!(
2670 range_chunks_aligned(1.., 2).take(5).collect::<Vec<_>>(),
2671 [1..2, 2..4, 4..6, 6..8, 8..10]
2672 );
2673 }
2674
2675 #[test]
2676 fn test_range_chunks_rev() {
2677 assert_eq!(
2679 range_chunks_rev(Bound::Included(0), 4, 2).collect::<Vec<_>>(),
2680 [3..5, 1..3, 0..1]
2681 );
2682
2683 assert_eq!(
2685 range_chunks_rev(Bound::Included(0), 5, 2).collect::<Vec<_>>(),
2686 [4..6, 2..4, 0..2]
2687 );
2688
2689 assert_eq!(
2691 range_chunks_rev(Bound::Excluded(0), 5, 2).collect::<Vec<_>>(),
2692 [4..6, 2..4, 1..2]
2693 );
2694
2695 assert_eq!(
2697 range_chunks_rev(Bound::Excluded(0), 4, 2).collect::<Vec<_>>(),
2698 [3..5, 1..3]
2699 );
2700 }
2701
2702 async fn test_sync_status(chunk_size: usize, present_ranges: &[(usize, usize)]) {
2703 let block_height = present_ranges.last().unwrap().1;
2704 let storage = TmpDb::init().await;
2705 let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
2706 .await
2707 .unwrap();
2708 let ds = MockSqlDataSource::builder(db, NoFetching)
2709 .with_sync_status_chunk_size(chunk_size)
2710 .with_sync_status_ttl(Duration::ZERO)
2711 .build()
2712 .await
2713 .unwrap();
2714
2715 let mut leaves: Vec<LeafQueryData<MockTypes>> = vec![
2717 LeafQueryData::<MockTypes>::genesis(
2718 &Default::default(),
2719 &Default::default(),
2720 TEST_VERSIONS.test,
2721 )
2722 .await,
2723 ];
2724 for i in 1..block_height {
2725 let mut leaf = leaves[i - 1].clone();
2726 leaf.leaf.block_header_mut().block_number = i as u64;
2727 leaves.push(leaf);
2728 }
2729
2730 {
2732 let mut tx = ds.write().await.unwrap();
2733
2734 for &(start, end) in present_ranges {
2735 for leaf in &leaves[start..end] {
2736 tracing::info!(height = leaf.height(), "insert leaf");
2737 tx.insert_leaf(leaf).await.unwrap();
2738 }
2739 }
2740
2741 if present_ranges[0].0 > 0 {
2742 tx.save_pruned_height((present_ranges[0].0 - 1) as u64)
2743 .await
2744 .unwrap();
2745 }
2746
2747 tx.commit().await.unwrap();
2748 }
2749
2750 let sync_status = ds.sync_status().await.unwrap().leaves;
2751
2752 let present: usize = present_ranges.iter().map(|(start, end)| end - start).sum();
2754 assert_eq!(
2755 sync_status.missing,
2756 block_height - present - present_ranges[0].0
2757 );
2758
2759 let mut ranges = sync_status.ranges.into_iter();
2761 let mut prev = 0;
2762 for &(start, end) in present_ranges {
2763 if start != prev {
2764 let range = ranges.next().unwrap();
2765 assert_eq!(
2766 range,
2767 SyncStatusRange {
2768 start: prev,
2769 end: start,
2770 status: if prev == 0 {
2771 SyncStatus::Pruned
2772 } else {
2773 SyncStatus::Missing
2774 },
2775 }
2776 );
2777 }
2778 let range = ranges.next().unwrap();
2779 assert_eq!(
2780 range,
2781 SyncStatusRange {
2782 start,
2783 end,
2784 status: SyncStatus::Present,
2785 }
2786 );
2787 prev = end;
2788 }
2789
2790 if prev != block_height {
2791 let range = ranges.next().unwrap();
2792 assert_eq!(
2793 range,
2794 SyncStatusRange {
2795 start: prev,
2796 end: block_height,
2797 status: SyncStatus::Missing,
2798 }
2799 );
2800 }
2801
2802 assert_eq!(ranges.next(), None);
2803 }
2804
2805 #[tokio::test]
2806 #[test_log::test]
2807 async fn test_sync_status_multiple_chunks() {
2808 test_sync_status(10, &[(0, 1), (3, 5), (8, 10)]).await;
2809 }
2810
2811 #[tokio::test]
2812 #[test_log::test]
2813 async fn test_sync_status_multiple_chunks_present_range_overlapping_chunk() {
2814 test_sync_status(5, &[(1, 4)]).await;
2815 }
2816
2817 #[tokio::test]
2818 #[test_log::test]
2819 async fn test_sync_status_multiple_chunks_missing_range_overlapping_chunk() {
2820 test_sync_status(5, &[(0, 1), (4, 5)]).await;
2821 }
2822
2823 #[tokio::test]
2824 #[test_log::test]
2825 async fn test_load_range_incomplete() {
2826 let storage = TmpDb::init().await;
2827 let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
2828 .await
2829 .unwrap();
2830 {
2831 let mut tx = db.write().await.unwrap();
2832 tx.insert_leaf(
2833 &LeafQueryData::<MockTypes>::genesis(
2834 &Default::default(),
2835 &Default::default(),
2836 TEST_VERSIONS.test,
2837 )
2838 .await,
2839 )
2840 .await
2841 .unwrap();
2842 tx.insert_block(
2843 &BlockQueryData::<MockTypes>::genesis(
2844 &Default::default(),
2845 &Default::default(),
2846 TEST_VERSIONS.test.base,
2847 )
2848 .await,
2849 )
2850 .await
2851 .unwrap();
2852 tx.insert_vid(
2853 &VidCommonQueryData::<MockTypes>::genesis(
2854 &Default::default(),
2855 &Default::default(),
2856 TEST_VERSIONS.test.base,
2857 )
2858 .await,
2859 None,
2860 )
2861 .await
2862 .unwrap();
2863 tx.commit().await.unwrap();
2864 }
2865
2866 let mut tx = db.read().await.unwrap();
2867 let req = RangeRequest { start: 0, end: 100 };
2868
2869 let err = <NonEmptyRange<BlockQueryData<MockTypes>>>::load(&mut tx, req)
2870 .await
2871 .unwrap_err();
2872 tracing::info!("loading partial block range failed as expected: {err:#}");
2873 assert!(matches!(err, QueryError::Missing));
2874
2875 let err =
2876 <NonEmptyRange<LeafQueryData<MockTypes>> as Fetchable<MockTypes>>::load(&mut tx, req)
2877 .await
2878 .unwrap_err();
2879 tracing::info!("loading partial leaf range failed as expected: {err:#}");
2880 assert!(matches!(err, QueryError::Missing));
2881
2882 let err = <NonEmptyRange<VidCommonQueryData<MockTypes>>>::load(&mut tx, req)
2883 .await
2884 .unwrap_err();
2885 tracing::info!("loading partial VID common range failed as expected: {err:#}");
2886 assert!(matches!(err, QueryError::Missing));
2887 }
2888}