1use std::{
55 cmp::{max, min},
56 fmt::{Debug, Display},
57 iter::repeat_with,
58 marker::PhantomData,
59 ops::{Bound, Range, RangeBounds},
60 sync::Arc,
61 time::{Duration, Instant},
62};
63
64use anyhow::{Context, bail};
65use async_lock::Semaphore;
66use async_trait::async_trait;
67use backoff::{ExponentialBackoff, ExponentialBackoffBuilder, backoff::Backoff};
68use chrono::{DateTime, Utc};
69use derivative::Derivative;
70use futures::{
71 channel::oneshot,
72 future::{self, BoxFuture, Either, Future, FutureExt, join_all},
73 stream::{self, BoxStream, StreamExt},
74};
75use hotshot_types::{
76 data::VidShare,
77 simple_certificate::CertificatePair,
78 traits::{
79 metrics::{Counter, Gauge, Histogram, Metrics},
80 node_implementation::NodeType,
81 },
82};
83use jf_merkle_tree_compat::{MerkleTreeScheme, prelude::MerkleProof};
84use tagged_base64::TaggedBase64;
85use tokio::{spawn, sync::Mutex, time::sleep};
86use tracing::Instrument;
87
88use super::{
89 Transaction, VersionedDataSource,
90 notifier::Notifier,
91 storage::{
92 Aggregate, AggregatesStorage, AvailabilityStorage, ExplorerStorage,
93 MerklizedStateHeightStorage, MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage,
94 UpdateAvailabilityStorage,
95 pruning::{PruneStorage, PrunedHeightDataSource, PrunedHeightStorage},
96 },
97};
98use crate::{
99 Header, Payload, QueryError, QueryResult,
100 availability::{
101 AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction, Fetch,
102 FetchStream, HeaderQueryData, LeafId, LeafQueryData, NamespaceId, PayloadMetadata,
103 PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash,
104 UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
105 },
106 data_source::fetching::{leaf::RangeRequest, vid::VidCommonRangeFetcher},
107 explorer::{self, ExplorerDataSource},
108 fetching::{self, NonEmptyRange, Provider, request},
109 merklized_state::{
110 MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
111 },
112 metrics::PrometheusMetrics,
113 node::{
114 NodeDataSource, SyncStatus, SyncStatusQueryData, SyncStatusRange, TimeWindowQueryData,
115 WindowStart,
116 },
117 status::{HasMetrics, StatusDataSource},
118 task::BackgroundTask,
119 types::HeightIndexed,
120};
121
122mod block;
123mod header;
124mod leaf;
125mod transaction;
126mod vid;
127
128use self::{
129 block::{PayloadFetcher, PayloadRangeFetcher},
130 leaf::{LeafFetcher, LeafRangeFetcher},
131 transaction::TransactionRequest,
132 vid::{VidCommonFetcher, VidCommonRequest},
133};
134
135pub struct Builder<Types, S, P> {
137 storage: S,
138 provider: P,
139 backoff: ExponentialBackoffBuilder,
140 rate_limit: usize,
141 range_chunk_size: usize,
142 proactive_interval: Duration,
143 proactive_range_chunk_size: usize,
144 sync_status_chunk_size: usize,
145 active_fetch_delay: Duration,
146 chunk_fetch_delay: Duration,
147 proactive_fetching: bool,
148 aggregator: bool,
149 aggregator_chunk_size: Option<usize>,
150 leaf_only: bool,
151 sync_status_ttl: Duration,
152 _types: PhantomData<Types>,
153}
154
155impl<Types, S, P> Builder<Types, S, P> {
156 pub fn new(storage: S, provider: P) -> Self {
158 let mut default_backoff = ExponentialBackoffBuilder::default();
159 default_backoff
160 .with_initial_interval(Duration::from_secs(1))
161 .with_multiplier(2.)
162 .with_max_interval(Duration::from_secs(32))
163 .with_max_elapsed_time(Some(Duration::from_secs(64)));
164
165 Self {
166 storage,
167 provider,
168 backoff: default_backoff,
169 rate_limit: 32,
170 range_chunk_size: 25,
171 proactive_interval: Duration::from_hours(8),
172 proactive_range_chunk_size: 100,
173 sync_status_chunk_size: 100_000,
174 active_fetch_delay: Duration::from_millis(50),
175 chunk_fetch_delay: Duration::from_millis(100),
176 proactive_fetching: true,
177 aggregator: true,
178 aggregator_chunk_size: None,
179 leaf_only: false,
180 sync_status_ttl: Duration::from_mins(5),
181 _types: Default::default(),
182 }
183 }
184
185 pub fn leaf_only(mut self) -> Self {
186 self.leaf_only = true;
187 self
188 }
189
190 pub fn with_min_retry_interval(mut self, interval: Duration) -> Self {
192 self.backoff.with_initial_interval(interval);
193 self
194 }
195
196 pub fn with_max_retry_interval(mut self, interval: Duration) -> Self {
198 self.backoff.with_max_interval(interval);
199 self
200 }
201
202 pub fn with_retry_multiplier(mut self, multiplier: f64) -> Self {
204 self.backoff.with_multiplier(multiplier);
205 self
206 }
207
208 pub fn with_retry_randomization_factor(mut self, factor: f64) -> Self {
210 self.backoff.with_randomization_factor(factor);
211 self
212 }
213
214 pub fn with_retry_timeout(mut self, timeout: Duration) -> Self {
216 self.backoff.with_max_elapsed_time(Some(timeout));
217 self
218 }
219
220 pub fn with_rate_limit(mut self, with_rate_limit: usize) -> Self {
222 self.rate_limit = with_rate_limit;
223 self
224 }
225
226 pub fn with_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
233 self.range_chunk_size = range_chunk_size;
234 self
235 }
236
237 pub fn with_proactive_interval(mut self, interval: Duration) -> Self {
241 self.proactive_interval = interval;
242 self
243 }
244
245 pub fn with_proactive_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
251 self.proactive_range_chunk_size = range_chunk_size;
252 self
253 }
254
255 pub fn with_sync_status_chunk_size(mut self, chunk_size: usize) -> Self {
258 self.sync_status_chunk_size = chunk_size;
259 self
260 }
261
262 pub fn with_sync_status_ttl(mut self, ttl: Duration) -> Self {
268 self.sync_status_ttl = ttl;
269 self
270 }
271
272 pub fn with_active_fetch_delay(mut self, active_fetch_delay: Duration) -> Self {
278 self.active_fetch_delay = active_fetch_delay;
279 self
280 }
281
282 pub fn with_chunk_fetch_delay(mut self, chunk_fetch_delay: Duration) -> Self {
294 self.chunk_fetch_delay = chunk_fetch_delay;
295 self
296 }
297
298 pub fn disable_proactive_fetching(mut self) -> Self {
307 self.proactive_fetching = false;
308 self
309 }
310
311 pub fn disable_aggregator(mut self) -> Self {
316 self.aggregator = false;
317 self
318 }
319
320 pub fn with_aggregator_chunk_size(mut self, chunk_size: usize) -> Self {
329 self.aggregator_chunk_size = Some(chunk_size);
330 self
331 }
332
333 pub fn is_leaf_only(&self) -> bool {
334 self.leaf_only
335 }
336}
337
338impl<Types, S, P> Builder<Types, S, P>
339where
340 Types: NodeType,
341 Payload<Types>: QueryablePayload<Types>,
342 Header<Types>: QueryableHeader<Types>,
343 S: PruneStorage + VersionedDataSource + HasMetrics + 'static,
344 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
345 + PrunedHeightStorage
346 + NodeStorage<Types>
347 + AggregatesStorage<Types>,
348 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
349 P: AvailabilityProvider<Types>,
350{
351 pub async fn build(self) -> anyhow::Result<FetchingDataSource<Types, S, P>> {
353 FetchingDataSource::new(self).await
354 }
355}
356
357#[derive(Derivative)]
374#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, P: Debug"))]
375pub struct FetchingDataSource<Types, S, P>
376where
377 Types: NodeType,
378{
379 fetcher: Arc<Fetcher<Types, S, P>>,
384 scanner: Option<BackgroundTask>,
386 aggregator: Option<BackgroundTask>,
388 pruner: Pruner<Types, S>,
389}
390
391#[derive(Derivative)]
392#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, "))]
393pub struct Pruner<Types, S>
394where
395 Types: NodeType,
396{
397 handle: Option<BackgroundTask>,
398 _types: PhantomData<(Types, S)>,
399}
400
401impl<Types, S> Pruner<Types, S>
402where
403 Types: NodeType,
404 Header<Types>: QueryableHeader<Types>,
405 Payload<Types>: QueryablePayload<Types>,
406 S: PruneStorage + Send + Sync + 'static,
407{
408 async fn new(storage: Arc<S>, backoff: ExponentialBackoff) -> Self {
409 let cfg = storage.get_pruning_config();
410 let Some(cfg) = cfg else {
411 return Self {
412 handle: None,
413 _types: Default::default(),
414 };
415 };
416
417 let future = async move {
418 for i in 1.. {
419 sleep(cfg.interval()).await;
422
423 tracing::warn!("starting pruner run {i} ");
424 Self::prune(storage.clone(), &backoff).await;
425 }
426 };
427
428 let task = BackgroundTask::spawn("pruner", future);
429
430 Self {
431 handle: Some(task),
432 _types: Default::default(),
433 }
434 }
435
436 async fn prune(storage: Arc<S>, backoff: &ExponentialBackoff) {
437 let mut pruner = S::Pruner::default();
439 'run: loop {
440 let mut backoff = backoff.clone();
441 backoff.reset();
442 'batch: loop {
443 match storage.prune(&mut pruner).await {
444 Ok(Some(height)) => {
445 tracing::warn!("Pruned to height {height}");
446 break 'batch;
447 },
448 Ok(None) => {
449 tracing::warn!("pruner run complete.");
450 break 'run;
451 },
452 Err(e) => {
453 tracing::warn!("error pruning batch: {e:#}");
454 if let Some(delay) = backoff.next_backoff() {
455 sleep(delay).await;
456 } else {
457 tracing::error!("pruning run failed after too many errors: {e:#}");
458 break 'run;
459 }
460 },
461 }
462 }
463 }
464 }
465}
466
467impl<Types, S, P> FetchingDataSource<Types, S, P>
468where
469 Types: NodeType,
470 Payload<Types>: QueryablePayload<Types>,
471 Header<Types>: QueryableHeader<Types>,
472 S: VersionedDataSource + PruneStorage + HasMetrics + 'static,
473 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
474 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
475 + NodeStorage<Types>
476 + PrunedHeightStorage
477 + AggregatesStorage<Types>,
478 P: AvailabilityProvider<Types>,
479{
480 pub fn builder(storage: S, provider: P) -> Builder<Types, S, P> {
482 Builder::new(storage, provider)
483 }
484
485 async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
486 let leaf_only = builder.is_leaf_only();
487 let aggregator = builder.aggregator;
488 let aggregator_chunk_size = builder
489 .aggregator_chunk_size
490 .unwrap_or(builder.range_chunk_size);
491 let proactive_fetching = builder.proactive_fetching;
492 let proactive_interval = builder.proactive_interval;
493 let proactive_range_chunk_size = builder.proactive_range_chunk_size;
494 let backoff = builder.backoff.build();
495 let scanner_metrics = ScannerMetrics::new(builder.storage.metrics());
496 let aggregator_metrics = AggregatorMetrics::new(builder.storage.metrics());
497
498 let fetcher = Arc::new(Fetcher::new(builder).await?);
499 let scanner = if proactive_fetching && !leaf_only {
500 Some(BackgroundTask::spawn(
501 "proactive scanner",
502 fetcher.clone().proactive_scan(
503 proactive_interval,
504 proactive_range_chunk_size,
505 scanner_metrics,
506 ),
507 ))
508 } else {
509 None
510 };
511
512 let aggregator = if aggregator && !leaf_only {
513 Some(BackgroundTask::spawn(
514 "aggregator",
515 fetcher
516 .clone()
517 .aggregate(aggregator_chunk_size, aggregator_metrics),
518 ))
519 } else {
520 None
521 };
522
523 let storage = fetcher.storage.clone();
524
525 let pruner = Pruner::new(storage, backoff).await;
526 let ds = Self {
527 fetcher,
528 scanner,
529 pruner,
530 aggregator,
531 };
532
533 Ok(ds)
534 }
535
536 pub fn inner(&self) -> Arc<S> {
538 self.fetcher.storage.clone()
539 }
540}
541
542impl<Types, S, P> AsRef<S> for FetchingDataSource<Types, S, P>
543where
544 Types: NodeType,
545{
546 fn as_ref(&self) -> &S {
547 &self.fetcher.storage
548 }
549}
550
551impl<Types, S, P> HasMetrics for FetchingDataSource<Types, S, P>
552where
553 Types: NodeType,
554 S: HasMetrics,
555{
556 fn metrics(&self) -> &PrometheusMetrics {
557 self.as_ref().metrics()
558 }
559}
560
561#[async_trait]
562impl<Types, S, P> StatusDataSource for FetchingDataSource<Types, S, P>
563where
564 Types: NodeType,
565 Header<Types>: QueryableHeader<Types>,
566 S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
567 for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
568 P: Send + Sync,
569{
570 async fn block_height(&self) -> QueryResult<usize> {
571 let mut tx = self.read().await.map_err(|err| QueryError::Error {
572 message: err.to_string(),
573 })?;
574 tx.block_height().await
575 }
576}
577
578#[async_trait]
579impl<Types, S, P> PrunedHeightDataSource for FetchingDataSource<Types, S, P>
580where
581 Types: NodeType,
582 S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
583 for<'a> S::ReadOnly<'a>: PrunedHeightStorage,
584 P: Send + Sync,
585{
586 async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
587 let mut tx = self.read().await?;
588 tx.load_pruned_height().await
589 }
590}
591
592#[async_trait]
593impl<Types, S, P> AvailabilityDataSource<Types> for FetchingDataSource<Types, S, P>
594where
595 Types: NodeType,
596 Header<Types>: QueryableHeader<Types>,
597 Payload<Types>: QueryablePayload<Types>,
598 S: VersionedDataSource + 'static,
599 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
600 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
601 P: AvailabilityProvider<Types>,
602{
603 async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
604 where
605 ID: Into<LeafId<Types>> + Send + Sync,
606 {
607 self.fetcher.get(id.into()).await
608 }
609
610 async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
611 where
612 ID: Into<BlockId<Types>> + Send + Sync,
613 {
614 self.fetcher
615 .get::<HeaderQueryData<_>>(id.into())
616 .await
617 .map(|h| h.header)
618 }
619
620 async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
621 where
622 ID: Into<BlockId<Types>> + Send + Sync,
623 {
624 self.fetcher.get(id.into()).await
625 }
626
627 async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
628 where
629 ID: Into<BlockId<Types>> + Send + Sync,
630 {
631 self.fetcher.get(id.into()).await
632 }
633
634 async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
635 where
636 ID: Into<BlockId<Types>> + Send + Sync,
637 {
638 self.fetcher.get(id.into()).await
639 }
640
641 async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
642 where
643 ID: Into<BlockId<Types>> + Send + Sync,
644 {
645 self.fetcher.get(VidCommonRequest::from(id.into())).await
646 }
647
648 async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
649 where
650 ID: Into<BlockId<Types>> + Send + Sync,
651 {
652 self.fetcher.get(VidCommonRequest::from(id.into())).await
653 }
654
655 async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
656 where
657 R: RangeBounds<usize> + Send + 'static,
658 {
659 self.fetcher.clone().get_range(range)
660 }
661
662 async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
663 where
664 R: RangeBounds<usize> + Send + 'static,
665 {
666 self.fetcher.clone().get_range(range)
667 }
668
669 async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
670 where
671 R: RangeBounds<usize> + Send + 'static,
672 {
673 let leaves: FetchStream<LeafQueryData<Types>> = self.fetcher.clone().get_range(range);
674
675 leaves
676 .map(|fetch| fetch.map(|leaf| leaf.leaf.block_header().clone()))
677 .boxed()
678 }
679
680 async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
681 where
682 R: RangeBounds<usize> + Send + 'static,
683 {
684 self.fetcher.clone().get_range(range)
685 }
686
687 async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
688 where
689 R: RangeBounds<usize> + Send + 'static,
690 {
691 self.fetcher.clone().get_range(range)
692 }
693
694 async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
695 where
696 R: RangeBounds<usize> + Send + 'static,
697 {
698 self.fetcher.clone().get_range(range)
699 }
700
701 async fn get_vid_common_metadata_range<R>(
702 &self,
703 range: R,
704 ) -> FetchStream<VidCommonMetadata<Types>>
705 where
706 R: RangeBounds<usize> + Send + 'static,
707 {
708 self.fetcher.clone().get_range(range)
709 }
710
711 async fn get_leaf_range_rev(
712 &self,
713 start: Bound<usize>,
714 end: usize,
715 ) -> FetchStream<LeafQueryData<Types>> {
716 self.fetcher.clone().get_range_rev(start, end)
717 }
718
719 async fn get_block_range_rev(
720 &self,
721 start: Bound<usize>,
722 end: usize,
723 ) -> FetchStream<BlockQueryData<Types>> {
724 self.fetcher.clone().get_range_rev(start, end)
725 }
726
727 async fn get_payload_range_rev(
728 &self,
729 start: Bound<usize>,
730 end: usize,
731 ) -> FetchStream<PayloadQueryData<Types>> {
732 self.fetcher.clone().get_range_rev(start, end)
733 }
734
735 async fn get_payload_metadata_range_rev(
736 &self,
737 start: Bound<usize>,
738 end: usize,
739 ) -> FetchStream<PayloadMetadata<Types>> {
740 self.fetcher.clone().get_range_rev(start, end)
741 }
742
743 async fn get_vid_common_range_rev(
744 &self,
745 start: Bound<usize>,
746 end: usize,
747 ) -> FetchStream<VidCommonQueryData<Types>> {
748 self.fetcher.clone().get_range_rev(start, end)
749 }
750
751 async fn get_vid_common_metadata_range_rev(
752 &self,
753 start: Bound<usize>,
754 end: usize,
755 ) -> FetchStream<VidCommonMetadata<Types>> {
756 self.fetcher.clone().get_range_rev(start, end)
757 }
758
759 async fn get_block_containing_transaction(
760 &self,
761 h: TransactionHash<Types>,
762 ) -> Fetch<BlockWithTransaction<Types>> {
763 self.fetcher.clone().get(TransactionRequest::from(h)).await
764 }
765}
766
767impl<Types, S, P> UpdateAvailabilityData<Types> for FetchingDataSource<Types, S, P>
768where
769 Types: NodeType,
770 Header<Types>: QueryableHeader<Types>,
771 Payload<Types>: QueryablePayload<Types>,
772 S: VersionedDataSource + 'static,
773 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
774 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
775 P: AvailabilityProvider<Types>,
776{
777 async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
778 let height = info.height() as usize;
779
780 self.fetcher
782 .store(&(info.leaf.clone(), info.qc_chain))
783 .await;
784
785 leaf::trigger_fetch_for_parent(&self.fetcher, &info.leaf);
787
788 let block = match info.block {
802 Some(block) => Some(block),
803 None => match self.fetcher.get::<BlockQueryData<Types>>(height).await {
804 Fetch::Ready(block) => Some(block),
805 Fetch::Pending(fut) => {
806 let span = tracing::info_span!("fetch missing block", height);
807 spawn(
808 async move {
809 tracing::info!("fetching missing block");
810 fut.await;
811 }
812 .instrument(span),
813 );
814 None
815 },
816 },
817 };
818 if let Some(block) = &block {
819 self.fetcher.store(block).await;
820 }
821 let vid = match info.vid_common {
822 Some(vid) => Some(vid),
823 None => match self.fetcher.get::<VidCommonQueryData<Types>>(height).await {
824 Fetch::Ready(vid) => Some(vid),
825 Fetch::Pending(fut) => {
826 let span = tracing::info_span!("fetch missing VID common", height);
827 spawn(
828 async move {
829 tracing::info!("fetching missing VID common");
830 fut.await;
831 }
832 .instrument(span),
833 );
834 None
835 },
836 },
837 };
838 if let Some(vid) = &vid {
839 self.fetcher.store(&(vid.clone(), info.vid_share)).await;
840 }
841
842 info.leaf.notify(&self.fetcher.notifiers).await;
848 if let Some(block) = &block {
849 block.notify(&self.fetcher.notifiers).await;
850 }
851 if let Some(vid) = &vid {
852 vid.notify(&self.fetcher.notifiers).await;
853 }
854
855 Ok(())
856 }
857}
858
859impl<Types, S, P> VersionedDataSource for FetchingDataSource<Types, S, P>
860where
861 Types: NodeType,
862 S: VersionedDataSource + Send + Sync,
863 P: Send + Sync,
864{
865 type Transaction<'a>
866 = S::Transaction<'a>
867 where
868 Self: 'a;
869 type ReadOnly<'a>
870 = S::ReadOnly<'a>
871 where
872 Self: 'a;
873
874 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
875 self.fetcher.write().await
876 }
877
878 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
879 self.fetcher.read().await
880 }
881}
882
883#[derive(Debug)]
885struct Fetcher<Types, S, P>
886where
887 Types: NodeType,
888{
889 storage: Arc<S>,
890 notifiers: Notifiers<Types>,
891 provider: Arc<P>,
892 leaf_fetcher: Arc<LeafFetcher<Types, S, P>>,
893 leaf_range_fetcher: Arc<LeafRangeFetcher<Types, S, P>>,
894 payload_fetcher: Option<Arc<PayloadFetcher<Types, S, P>>>,
895 payload_range_fetcher: Option<Arc<PayloadRangeFetcher<Types, S, P>>>,
896 vid_common_fetcher: Option<Arc<VidCommonFetcher<Types, S, P>>>,
897 vid_common_range_fetcher: Option<Arc<VidCommonRangeFetcher<Types, S, P>>>,
898 range_chunk_size: usize,
899 sync_status_chunk_size: usize,
900 active_fetch_delay: Duration,
902 chunk_fetch_delay: Duration,
904 backoff: ExponentialBackoff,
906 retry_semaphore: Arc<Semaphore>,
909 leaf_only: bool,
910 sync_status_metrics: SyncStatusMetrics,
911 sync_status: Mutex<CachedSyncStatus>,
912}
913
914impl<Types, S, P> VersionedDataSource for Fetcher<Types, S, P>
915where
916 Types: NodeType,
917 S: VersionedDataSource + Send + Sync,
918 P: Send + Sync,
919{
920 type Transaction<'a>
921 = S::Transaction<'a>
922 where
923 Self: 'a;
924 type ReadOnly<'a>
925 = S::ReadOnly<'a>
926 where
927 Self: 'a;
928
929 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
930 self.storage.write().await
931 }
932
933 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
934 self.storage.read().await
935 }
936}
937
938impl<Types, S, P> Fetcher<Types, S, P>
939where
940 Types: NodeType,
941 Header<Types>: QueryableHeader<Types>,
942 S: VersionedDataSource + HasMetrics + Sync,
943 for<'a> S::ReadOnly<'a>: PrunedHeightStorage + NodeStorage<Types>,
944{
945 pub async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
946 let retry_semaphore = Arc::new(Semaphore::new(builder.rate_limit));
947 let backoff = builder.backoff.build();
948
949 let (payload_fetcher, payload_range_fetcher, vid_common_fetcher, vid_common_range_fetcher) =
950 if builder.is_leaf_only() {
951 (None, None, None, None)
952 } else {
953 (
954 Some(Arc::new(fetching::Fetcher::new(
955 retry_semaphore.clone(),
956 backoff.clone(),
957 ))),
958 Some(Arc::new(fetching::Fetcher::new(
959 retry_semaphore.clone(),
960 backoff.clone(),
961 ))),
962 Some(Arc::new(fetching::Fetcher::new(
963 retry_semaphore.clone(),
964 backoff.clone(),
965 ))),
966 Some(Arc::new(fetching::Fetcher::new(
967 retry_semaphore.clone(),
968 backoff.clone(),
969 ))),
970 )
971 };
972 let leaf_fetcher = fetching::Fetcher::new(retry_semaphore.clone(), backoff.clone());
973 let leaf_range_fetcher = fetching::Fetcher::new(retry_semaphore.clone(), backoff.clone());
974
975 let leaf_only = builder.leaf_only;
976 let sync_status_metrics =
977 SyncStatusMetrics::new(builder.storage.metrics(), builder.sync_status_chunk_size);
978
979 Ok(Self {
980 storage: Arc::new(builder.storage),
981 notifiers: Default::default(),
982 provider: Arc::new(builder.provider),
983 leaf_fetcher: Arc::new(leaf_fetcher),
984 leaf_range_fetcher: Arc::new(leaf_range_fetcher),
985 payload_fetcher,
986 payload_range_fetcher,
987 vid_common_fetcher,
988 vid_common_range_fetcher,
989 range_chunk_size: builder.range_chunk_size,
990 sync_status_chunk_size: builder.sync_status_chunk_size,
991 active_fetch_delay: builder.active_fetch_delay,
992 chunk_fetch_delay: builder.chunk_fetch_delay,
993 backoff,
994 retry_semaphore,
995 leaf_only,
996 sync_status_metrics,
997 sync_status: Mutex::new(CachedSyncStatus::new(builder.sync_status_ttl)),
998 })
999 }
1000}
1001
1002impl<Types, S, P> Fetcher<Types, S, P>
1003where
1004 Types: NodeType,
1005 Header<Types>: QueryableHeader<Types>,
1006 Payload<Types>: QueryablePayload<Types>,
1007 S: VersionedDataSource + 'static,
1008 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1009 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
1010 P: AvailabilityProvider<Types>,
1011{
1012 async fn get<T>(self: &Arc<Self>, req: impl Into<T::Request> + Send) -> Fetch<T>
1013 where
1014 T: Fetchable<Types>,
1015 {
1016 let req = req.into();
1017
1018 let passive_fetch = T::passive_fetch(&self.notifiers, req).await;
1028
1029 match self.try_get(req).await {
1030 Ok(Some(obj)) => return Fetch::Ready(obj),
1031 Ok(None) => return passive(req, passive_fetch),
1032 Err(err) => {
1033 tracing::warn!(
1034 ?req,
1035 "unable to fetch object; spawning a task to retry: {err:#}"
1036 );
1037 },
1038 }
1039
1040 let (send, recv) = oneshot::channel();
1042
1043 let fetcher = self.clone();
1044 let mut backoff = fetcher.backoff.clone();
1045 let span = tracing::warn_span!("get retry", ?req);
1046 spawn(
1047 async move {
1048 backoff.reset();
1049 let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1050 loop {
1051 let res = {
1052 let _guard = fetcher.retry_semaphore.acquire().await;
1056 fetcher.try_get(req).await
1057 };
1058 match res {
1059 Ok(Some(obj)) => {
1060 tracing::info!(?req, "object was ready after retries");
1064 send.send(obj).ok();
1065 break;
1066 },
1067 Ok(None) => {
1068 tracing::info!(?req, "spawned fetch after retries");
1072 break;
1073 },
1074 Err(err) => {
1075 tracing::warn!(
1076 ?req,
1077 ?delay,
1078 "unable to fetch object, will retry: {err:#}"
1079 );
1080 sleep(delay).await;
1081 if let Some(next_delay) = backoff.next_backoff() {
1082 delay = next_delay;
1083 }
1084 },
1085 }
1086 }
1087 }
1088 .instrument(span),
1089 );
1090
1091 passive(req, select_some(passive_fetch, recv.map(Result::ok)))
1094 }
1095
1096 async fn try_get<T>(self: &Arc<Self>, req: T::Request) -> anyhow::Result<Option<T>>
1107 where
1108 T: Fetchable<Types>,
1109 {
1110 let mut tx = self.read().await.context("opening read transaction")?;
1111 match T::load(&mut tx, req).await {
1112 Ok(t) => Ok(Some(t)),
1113 Err(QueryError::Missing | QueryError::NotFound) => {
1114 tracing::debug!(?req, "object missing from local storage, will try to fetch");
1117 self.fetch::<T>(&mut tx, req).await?;
1118 Ok(None)
1119 },
1120 Err(err) => {
1121 bail!("failed to fetch resource {req:?} from local storage: {err:#}");
1124 },
1125 }
1126 }
1127
1128 fn get_range<R, T>(self: Arc<Self>, range: R) -> BoxStream<'static, Fetch<T>>
1140 where
1141 R: RangeBounds<usize> + Send + 'static,
1142 T: RangedFetchable<Types>,
1143 {
1144 let chunk_size = self.range_chunk_size;
1145 self.get_range_with_chunk_size(chunk_size, range)
1146 }
1147
1148 fn get_range_with_chunk_size<R, T>(
1150 self: Arc<Self>,
1151 chunk_size: usize,
1152 range: R,
1153 ) -> BoxStream<'static, Fetch<T>>
1154 where
1155 R: RangeBounds<usize> + Send + 'static,
1156 T: RangedFetchable<Types>,
1157 {
1158 let chunk_fetch_delay = self.chunk_fetch_delay;
1159 let active_fetch_delay = self.active_fetch_delay;
1160
1161 stream::iter(range_chunks(range, chunk_size))
1162 .then(move |chunk| {
1163 let self_clone = self.clone();
1164 async move {
1165 {
1166 let chunk = self_clone.get_chunk(chunk).await;
1167
1168 sleep(chunk_fetch_delay).await;
1173 stream::iter(chunk)
1174 }
1175 }
1176 })
1177 .flatten()
1178 .then(move |f| async move {
1179 match f {
1180 Fetch::Pending(_) => sleep(active_fetch_delay).await,
1184 Fetch::Ready(_) => (),
1185 };
1186 f
1187 })
1188 .boxed()
1189 }
1190
1191 fn get_range_rev<T>(
1198 self: Arc<Self>,
1199 start: Bound<usize>,
1200 end: usize,
1201 ) -> BoxStream<'static, Fetch<T>>
1202 where
1203 T: RangedFetchable<Types>,
1204 {
1205 let chunk_size = self.range_chunk_size;
1206 self.get_range_with_chunk_size_rev(chunk_size, start, end)
1207 }
1208
1209 fn get_range_with_chunk_size_rev<T>(
1211 self: Arc<Self>,
1212 chunk_size: usize,
1213 start: Bound<usize>,
1214 end: usize,
1215 ) -> BoxStream<'static, Fetch<T>>
1216 where
1217 T: RangedFetchable<Types>,
1218 {
1219 let chunk_fetch_delay = self.chunk_fetch_delay;
1220 let active_fetch_delay = self.active_fetch_delay;
1221
1222 stream::iter(range_chunks_rev(start, end, chunk_size))
1223 .then(move |chunk| {
1224 let self_clone = self.clone();
1225 async move {
1226 {
1227 let chunk = self_clone.get_chunk(chunk).await;
1228
1229 sleep(chunk_fetch_delay).await;
1234 stream::iter(chunk.into_iter().rev())
1235 }
1236 }
1237 })
1238 .flatten()
1239 .then(move |f| async move {
1240 match f {
1241 Fetch::Pending(_) => sleep(active_fetch_delay).await,
1245 Fetch::Ready(_) => (),
1246 };
1247 f
1248 })
1249 .boxed()
1250 }
1251
1252 async fn get_chunk<T>(self: &Arc<Self>, chunk: Range<usize>) -> Vec<Fetch<T>>
1259 where
1260 T: RangedFetchable<Types>,
1261 {
1262 let passive_fetches = join_all(
1266 chunk
1267 .clone()
1268 .map(|i| T::passive_fetch(&self.notifiers, i.into())),
1269 )
1270 .await;
1271
1272 match self.try_get_chunk(&chunk).await {
1273 Ok(objs) => {
1274 return objs
1277 .into_iter()
1278 .zip(passive_fetches)
1279 .enumerate()
1280 .map(move |(i, (obj, passive_fetch))| match obj {
1281 Some(obj) => Fetch::Ready(obj),
1282 None => passive(T::Request::from(chunk.start + i), passive_fetch),
1283 })
1284 .collect();
1285 },
1286 Err(err) => {
1287 tracing::warn!(
1288 ?chunk,
1289 "unable to fetch chunk; spawning a task to retry: {err:#}"
1290 );
1291 },
1292 }
1293
1294 let (send, recv): (Vec<_>, Vec<_>) =
1296 repeat_with(oneshot::channel).take(chunk.len()).unzip();
1297
1298 {
1299 let fetcher = self.clone();
1300 let mut backoff = fetcher.backoff.clone();
1301 let chunk = chunk.clone();
1302 let span = tracing::warn_span!("get_chunk retry", ?chunk);
1303 spawn(
1304 async move {
1305 backoff.reset();
1306 let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1307 loop {
1308 let res = {
1309 let _guard = fetcher.retry_semaphore.acquire().await;
1314 fetcher.try_get_chunk(&chunk).await
1315 };
1316 match res {
1317 Ok(objs) => {
1318 for (i, (obj, sender)) in objs.into_iter().zip(send).enumerate() {
1319 if let Some(obj) = obj {
1320 tracing::info!(?chunk, i, "object was ready after retries");
1324 sender.send(obj).ok();
1325 } else {
1326 tracing::info!(?chunk, i, "spawned fetch after retries");
1331 }
1332 }
1333 break;
1334 },
1335 Err(err) => {
1336 tracing::warn!(
1337 ?chunk,
1338 ?delay,
1339 "unable to fetch chunk, will retry: {err:#}"
1340 );
1341 sleep(delay).await;
1342 if let Some(next_delay) = backoff.next_backoff() {
1343 delay = next_delay;
1344 }
1345 },
1346 }
1347 }
1348 }
1349 .instrument(span),
1350 );
1351 }
1352
1353 passive_fetches
1356 .into_iter()
1357 .zip(recv)
1358 .enumerate()
1359 .map(move |(i, (passive_fetch, recv))| {
1360 passive(
1361 T::Request::from(chunk.start + i),
1362 select_some(passive_fetch, recv.map(Result::ok)),
1363 )
1364 })
1365 .collect()
1366 }
1367
1368 async fn try_get_chunk<T>(
1381 self: &Arc<Self>,
1382 chunk: &Range<usize>,
1383 ) -> anyhow::Result<Vec<Option<T>>>
1384 where
1385 T: RangedFetchable<Types>,
1386 {
1387 let mut tx = self.read().await.context("opening read transaction")?;
1388 let ts = T::load_range(&mut tx, chunk.clone())
1389 .await
1390 .context(format!("when fetching items in range {chunk:?}"))?;
1391
1392 let ts = ts.into_iter().filter_map(ResultExt::ok_or_trace);
1399
1400 let mut results = Vec::with_capacity(chunk.len());
1402 for t in ts {
1403 while chunk.start + results.len() < t.height() as usize {
1405 tracing::debug!(
1406 "item {} in chunk not available, will be fetched",
1407 results.len()
1408 );
1409 self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1410 .await?;
1411 results.push(None);
1412 }
1413
1414 results.push(Some(t));
1415 }
1416 while results.len() < chunk.len() {
1418 self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1419 .await?;
1420 results.push(None);
1421 }
1422
1423 Ok(results)
1424 }
1425
1426 async fn fetch<T>(
1432 self: &Arc<Self>,
1433 tx: &mut <Self as VersionedDataSource>::ReadOnly<'_>,
1434 req: T::Request,
1435 ) -> anyhow::Result<()>
1436 where
1437 T: Fetchable<Types>,
1438 {
1439 tracing::debug!("fetching resource {req:?}");
1440
1441 let heights = Heights::load(tx)
1443 .await
1444 .context("failed to load heights; cannot definitively say object might exist")?;
1445 if req.might_exist(heights) {
1446 T::active_fetch(tx, self.clone(), req).await?;
1447 } else {
1448 tracing::debug!("not fetching object {req:?} that cannot exist at {heights:?}");
1449 }
1450 Ok(())
1451 }
1452
1453 async fn proactive_scan(
1459 self: Arc<Self>,
1460 interval: Duration,
1461 chunk_size: usize,
1462 metrics: ScannerMetrics,
1463 ) {
1464 for i in 0.. {
1465 let span = tracing::warn_span!("proactive scan", i);
1466 metrics.running.set(1);
1467 metrics.current_scan.set(i);
1468 async {
1469 let sync_status = {
1470 match self.sync_status().await {
1471 Ok(st) => st,
1472 Err(err) => {
1473 tracing::warn!(
1474 "unable to load sync status, scan will be skipped: {err:#}"
1475 );
1476 return;
1477 },
1478 }
1479 };
1480 tracing::info!(?sync_status, "starting scan");
1481 metrics.missing_blocks.set(sync_status.blocks.missing);
1482 metrics.missing_vid.set(sync_status.vid_common.missing);
1483
1484 for range in sync_status.blocks.ranges {
1487 metrics.scanned_blocks.set(range.start);
1488 if range.status != SyncStatus::Missing {
1489 metrics.scanned_blocks.set(range.end);
1490 continue;
1491 }
1492
1493 tracing::info!(?range, "fetching missing block range");
1494
1495 for chunk in range_chunks_aligned_rev(
1502 Bound::Included(range.start),
1503 range.end - 1,
1504 chunk_size,
1505 ) {
1506 tracing::info!(?chunk, "fetching missing block chunk");
1507
1508 self.get::<NonEmptyRange<BlockQueryData<Types>>>(RangeRequest {
1511 start: chunk.start as u64,
1512 end: chunk.end as u64,
1513 })
1514 .await
1515 .await;
1516
1517 metrics
1518 .missing_blocks
1519 .update((chunk.start as i64) - (chunk.end as i64));
1520 metrics.scanned_blocks.set(chunk.end);
1521 }
1522 }
1523
1524 for range in sync_status.vid_common.ranges {
1526 metrics.scanned_vid.set(range.start);
1527 if range.status != SyncStatus::Missing {
1528 metrics.scanned_vid.set(range.end);
1529 continue;
1530 }
1531
1532 tracing::info!(?range, "fetching missing VID range");
1533 for chunk in range_chunks_aligned_rev(
1534 Bound::Included(range.start),
1535 range.end - 1,
1536 chunk_size,
1537 ) {
1538 tracing::info!(?chunk, "fetching missing VID chunk");
1539 self.get::<NonEmptyRange<VidCommonQueryData<Types>>>(RangeRequest {
1540 start: chunk.start as u64,
1541 end: chunk.end as u64,
1542 })
1543 .await
1544 .await;
1545
1546 metrics
1547 .missing_vid
1548 .update((chunk.start as i64) - (chunk.end as i64));
1549 metrics.scanned_vid.set(chunk.end);
1550 }
1551 }
1552
1553 tracing::info!("completed proactive scan, will scan again in {interval:?}");
1554
1555 metrics.running.set(0);
1557 }
1558 .instrument(span)
1559 .await;
1560
1561 sleep(interval).await;
1562 }
1563 }
1564}
1565
1566impl<Types, S, P> Fetcher<Types, S, P>
1567where
1568 Types: NodeType,
1569 Header<Types>: QueryableHeader<Types>,
1570 S: VersionedDataSource + 'static,
1571 for<'a> S::ReadOnly<'a>: NodeStorage<Types> + PrunedHeightStorage,
1572 P: Send + Sync,
1573{
1574 async fn sync_status(&self) -> anyhow::Result<SyncStatusQueryData> {
1575 let mut cache = self.sync_status.lock().await;
1580 if let Some(sync_status) = cache.try_get() {
1581 return Ok(sync_status.clone());
1582 }
1583 tracing::debug!("updating sync status");
1584
1585 let heights = {
1586 let mut tx = self
1587 .read()
1588 .await
1589 .context("opening transaction to load heights")?;
1590 Heights::load(&mut tx).await.context("loading heights")?
1591 };
1592
1593 let mut res = SyncStatusQueryData {
1594 pruned_height: heights.pruned_height.map(|h| h as usize),
1595 ..Default::default()
1596 };
1597 let start = if let Some(height) = res.pruned_height {
1598 let range = SyncStatusRange {
1600 status: SyncStatus::Pruned,
1601 start: 0,
1602 end: height + 1,
1603 };
1604 res.blocks.ranges.push(range);
1605 res.leaves.ranges.push(range);
1606 res.vid_common.ranges.push(range);
1607
1608 height + 1
1609 } else {
1610 0
1611 };
1612
1613 for chunk in range_chunks(
1616 start..(heights.height as usize),
1617 self.sync_status_chunk_size,
1618 ) {
1619 tracing::debug!(chunk.start, chunk.end, "checking sync status in sub-range");
1620 let metrics = self.sync_status_metrics.start_range(&chunk);
1621 let mut tx = self
1622 .read()
1623 .await
1624 .context("opening transaction to sync status range")?;
1625 let range_status = tx
1626 .sync_status_for_range(chunk.start, chunk.end)
1627 .await
1628 .context(format!("checking sync status in sub-range {chunk:?}"))?;
1629 tracing::debug!(
1630 chunk.start,
1631 chunk.end,
1632 ?range_status,
1633 "found sync status for range"
1634 );
1635
1636 res.blocks.extend(range_status.blocks);
1637 res.leaves.extend(range_status.leaves);
1638 res.vid_common.extend(range_status.vid_common);
1639 metrics.end();
1640 }
1641
1642 cache.update(res.clone());
1643 Ok(res)
1644 }
1645}
1646
1647impl<Types, S, P> Fetcher<Types, S, P>
1648where
1649 Types: NodeType,
1650 Header<Types>: QueryableHeader<Types>,
1651 Payload<Types>: QueryablePayload<Types>,
1652 S: VersionedDataSource + 'static,
1653 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
1654 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
1655 + NodeStorage<Types>
1656 + PrunedHeightStorage
1657 + AggregatesStorage<Types>,
1658 P: AvailabilityProvider<Types>,
1659{
1660 #[tracing::instrument(skip_all)]
1661 async fn aggregate(self: Arc<Self>, chunk_size: usize, metrics: AggregatorMetrics) {
1662 loop {
1663 let prev_aggregate = loop {
1664 let mut tx = match self.read().await {
1665 Ok(tx) => tx,
1666 Err(err) => {
1667 tracing::error!("unable to open read tx: {err:#}");
1668 sleep(Duration::from_secs(5)).await;
1669 continue;
1670 },
1671 };
1672 match tx.load_prev_aggregate().await {
1673 Ok(agg) => break agg,
1674 Err(err) => {
1675 tracing::error!("unable to load previous aggregate: {err:#}");
1676 sleep(Duration::from_secs(5)).await;
1677 continue;
1678 },
1679 }
1680 };
1681
1682 let (start, mut prev_aggregate) = match prev_aggregate {
1683 Some(aggregate) => (aggregate.height as usize + 1, aggregate),
1684 None => (0, Aggregate::default()),
1685 };
1686
1687 tracing::info!(start, "starting aggregator");
1688 metrics.height.set(start);
1689
1690 let mut blocks = self
1691 .clone()
1692 .get_range_with_chunk_size::<_, PayloadMetadata<Types>>(chunk_size, start..)
1693 .then(Fetch::resolve)
1694 .ready_chunks(chunk_size)
1695 .boxed();
1696 while let Some(chunk) = blocks.next().await {
1697 let Some(last) = chunk.last() else {
1698 tracing::warn!("ready_chunks returned an empty chunk");
1700 continue;
1701 };
1702 let height = last.height();
1703 let num_blocks = chunk.len();
1704 tracing::debug!(
1705 num_blocks,
1706 height,
1707 "updating aggregate statistics for chunk"
1708 );
1709 loop {
1710 let res = async {
1711 let mut tx = self.write().await.context("opening transaction")?;
1712 let aggregate =
1713 tx.update_aggregates(prev_aggregate.clone(), &chunk).await?;
1714 tx.commit().await.context("committing transaction")?;
1715 prev_aggregate = aggregate;
1716 anyhow::Result::<_>::Ok(())
1717 }
1718 .await;
1719 match res {
1720 Ok(()) => {
1721 break;
1722 },
1723 Err(err) => {
1724 tracing::warn!(
1725 num_blocks,
1726 height,
1727 "failed to update aggregates for chunk: {err:#}"
1728 );
1729 sleep(Duration::from_secs(1)).await;
1730 },
1731 }
1732 }
1733 metrics.height.set(height as usize);
1734 }
1735 tracing::warn!("aggregator block stream ended unexpectedly; will restart");
1736 }
1737 }
1738}
1739
1740impl<Types, S, P> Fetcher<Types, S, P>
1741where
1742 Types: NodeType,
1743 S: VersionedDataSource,
1744 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1745{
1746 async fn store_and_notify<T>(&self, obj: &T)
1748 where
1749 T: Storable<Types>,
1750 {
1751 self.store(obj).await;
1752
1753 obj.notify(&self.notifiers).await;
1778 }
1779
1780 async fn store<T>(&self, obj: &T)
1781 where
1782 T: Storable<Types>,
1783 {
1784 let try_store = || async {
1785 let mut tx = self.storage.write().await?;
1786 obj.clone().store(&mut tx, self.leaf_only).await?;
1787 tx.commit().await
1788 };
1789
1790 let mut backoff = self.backoff.clone();
1792 backoff.reset();
1793 loop {
1794 let Err(err) = try_store().await else {
1795 break;
1796 };
1797 tracing::warn!(
1801 obj = obj.debug_name(),
1802 "failed to store fetched object: {err:#}"
1803 );
1804
1805 let Some(delay) = backoff.next_backoff() else {
1806 break;
1807 };
1808 tracing::info!(?delay, "retrying failed operation");
1809 sleep(delay).await;
1810 }
1811 }
1812}
1813
1814#[derive(Debug)]
1815struct Notifiers<Types>
1816where
1817 Types: NodeType,
1818{
1819 block: Notifier<BlockQueryData<Types>>,
1820 leaf: Notifier<LeafQueryData<Types>>,
1821 vid_common: Notifier<VidCommonQueryData<Types>>,
1822}
1823
1824impl<Types> Default for Notifiers<Types>
1825where
1826 Types: NodeType,
1827{
1828 fn default() -> Self {
1829 Self {
1830 block: Notifier::new(),
1831 leaf: Notifier::new(),
1832 vid_common: Notifier::new(),
1833 }
1834 }
1835}
1836
1837#[derive(Clone, Copy, Debug)]
1838struct Heights {
1839 height: u64,
1840 pruned_height: Option<u64>,
1841}
1842
1843impl Heights {
1844 async fn load<Types, T>(tx: &mut T) -> anyhow::Result<Self>
1845 where
1846 Types: NodeType,
1847 Header<Types>: QueryableHeader<Types>,
1848 T: NodeStorage<Types> + PrunedHeightStorage + Send,
1849 {
1850 let height = tx.block_height().await.context("loading block height")? as u64;
1851 let pruned_height = tx
1852 .load_pruned_height()
1853 .await
1854 .context("loading pruned height")?;
1855 Ok(Self {
1856 height,
1857 pruned_height,
1858 })
1859 }
1860
1861 fn might_exist(self, h: u64) -> bool {
1862 h < self.height && self.pruned_height.is_none_or(|ph| h > ph)
1863 }
1864}
1865
1866#[async_trait]
1867impl<Types, S, P, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
1868 for FetchingDataSource<Types, S, P>
1869where
1870 Types: NodeType,
1871 S: VersionedDataSource + 'static,
1872 for<'a> S::ReadOnly<'a>: MerklizedStateStorage<Types, State, ARITY>,
1873 P: Send + Sync,
1874 State: MerklizedState<Types, ARITY> + 'static,
1875 <State as MerkleTreeScheme>::Commitment: Send,
1876{
1877 async fn get_path(
1878 &self,
1879 snapshot: Snapshot<Types, State, ARITY>,
1880 key: State::Key,
1881 ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
1882 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1883 message: err.to_string(),
1884 })?;
1885 tx.get_path(snapshot, key).await
1886 }
1887}
1888
1889#[async_trait]
1890impl<Types, S, P> MerklizedStateHeightPersistence for FetchingDataSource<Types, S, P>
1891where
1892 Types: NodeType,
1893 Header<Types>: QueryableHeader<Types>,
1894 Payload<Types>: QueryablePayload<Types>,
1895 S: VersionedDataSource + 'static,
1896 for<'a> S::ReadOnly<'a>: MerklizedStateHeightStorage,
1897 P: Send + Sync,
1898{
1899 async fn get_last_state_height(&self) -> QueryResult<usize> {
1900 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1901 message: err.to_string(),
1902 })?;
1903 tx.get_last_state_height().await
1904 }
1905}
1906
1907#[async_trait]
1908impl<Types, S, P> NodeDataSource<Types> for FetchingDataSource<Types, S, P>
1909where
1910 Types: NodeType,
1911 Header<Types>: QueryableHeader<Types>,
1912 S: VersionedDataSource + 'static,
1913 for<'a> S::ReadOnly<'a>: NodeStorage<Types> + PrunedHeightStorage,
1914 P: Send + Sync,
1915{
1916 async fn block_height(&self) -> QueryResult<usize> {
1917 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1918 message: err.to_string(),
1919 })?;
1920 tx.block_height().await
1921 }
1922
1923 async fn count_transactions_in_range(
1924 &self,
1925 range: impl RangeBounds<usize> + Send,
1926 namespace: Option<NamespaceId<Types>>,
1927 ) -> QueryResult<usize> {
1928 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1929 message: err.to_string(),
1930 })?;
1931 tx.count_transactions_in_range(range, namespace).await
1932 }
1933
1934 async fn payload_size_in_range(
1935 &self,
1936 range: impl RangeBounds<usize> + Send,
1937 namespace: Option<NamespaceId<Types>>,
1938 ) -> QueryResult<usize> {
1939 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1940 message: err.to_string(),
1941 })?;
1942 tx.payload_size_in_range(range, namespace).await
1943 }
1944
1945 async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
1946 where
1947 ID: Into<BlockId<Types>> + Send + Sync,
1948 {
1949 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1950 message: err.to_string(),
1951 })?;
1952 tx.vid_share(id).await
1953 }
1954
1955 async fn sync_status(&self) -> QueryResult<SyncStatusQueryData> {
1956 self.fetcher
1957 .sync_status()
1958 .await
1959 .map_err(|err| QueryError::Error {
1960 message: format!("{err:#}"),
1961 })
1962 }
1963
1964 async fn get_header_window(
1965 &self,
1966 start: impl Into<WindowStart<Types>> + Send + Sync,
1967 end: u64,
1968 limit: usize,
1969 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
1970 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1971 message: err.to_string(),
1972 })?;
1973 tx.get_header_window(start, end, limit).await
1974 }
1975}
1976
1977#[async_trait]
1978impl<Types, S, P> ExplorerDataSource<Types> for FetchingDataSource<Types, S, P>
1979where
1980 Types: NodeType,
1981 Payload<Types>: QueryablePayload<Types>,
1982 Header<Types>: QueryableHeader<Types> + explorer::traits::ExplorerHeader<Types>,
1983 crate::Transaction<Types>: explorer::traits::ExplorerTransaction<Types>,
1984 S: VersionedDataSource + 'static,
1985 for<'a> S::ReadOnly<'a>: ExplorerStorage<Types>,
1986 P: Send + Sync,
1987{
1988 async fn get_block_summaries(
1989 &self,
1990 request: explorer::query_data::GetBlockSummariesRequest<Types>,
1991 ) -> Result<
1992 Vec<explorer::query_data::BlockSummary<Types>>,
1993 explorer::query_data::GetBlockSummariesError,
1994 > {
1995 let mut tx = self.read().await.map_err(|err| QueryError::Error {
1996 message: err.to_string(),
1997 })?;
1998 tx.get_block_summaries(request).await
1999 }
2000
2001 async fn get_block_detail(
2002 &self,
2003 request: explorer::query_data::BlockIdentifier<Types>,
2004 ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
2005 {
2006 let mut tx = self.read().await.map_err(|err| QueryError::Error {
2007 message: err.to_string(),
2008 })?;
2009 tx.get_block_detail(request).await
2010 }
2011
2012 async fn get_transaction_summaries(
2013 &self,
2014 request: explorer::query_data::GetTransactionSummariesRequest<Types>,
2015 ) -> Result<
2016 Vec<explorer::query_data::TransactionSummary<Types>>,
2017 explorer::query_data::GetTransactionSummariesError,
2018 > {
2019 let mut tx = self.read().await.map_err(|err| QueryError::Error {
2020 message: err.to_string(),
2021 })?;
2022 tx.get_transaction_summaries(request).await
2023 }
2024
2025 async fn get_transaction_detail(
2026 &self,
2027 request: explorer::query_data::TransactionIdentifier<Types>,
2028 ) -> Result<
2029 explorer::query_data::TransactionDetailResponse<Types>,
2030 explorer::query_data::GetTransactionDetailError,
2031 > {
2032 let mut tx = self.read().await.map_err(|err| QueryError::Error {
2033 message: err.to_string(),
2034 })?;
2035 tx.get_transaction_detail(request).await
2036 }
2037
2038 async fn get_explorer_summary(
2039 &self,
2040 ) -> Result<
2041 explorer::query_data::ExplorerSummary<Types>,
2042 explorer::query_data::GetExplorerSummaryError,
2043 > {
2044 let mut tx = self.read().await.map_err(|err| QueryError::Error {
2045 message: err.to_string(),
2046 })?;
2047 tx.get_explorer_summary().await
2048 }
2049
2050 async fn get_search_results(
2051 &self,
2052 query: TaggedBase64,
2053 ) -> Result<
2054 explorer::query_data::SearchResult<Types>,
2055 explorer::query_data::GetSearchResultsError,
2056 > {
2057 let mut tx = self.read().await.map_err(|err| QueryError::Error {
2058 message: err.to_string(),
2059 })?;
2060 tx.get_search_results(query).await
2061 }
2062}
2063
2064pub trait AvailabilityProvider<Types: NodeType>:
2066 Provider<Types, request::LeafRequest<Types>>
2067 + Provider<Types, request::LeafRangeRequest<Types>>
2068 + Provider<Types, request::PayloadRequest>
2069 + Provider<Types, request::BlockRangeRequest>
2070 + Provider<Types, request::VidCommonRequest>
2071 + Provider<Types, request::VidCommonRangeRequest>
2072 + Sync
2073 + 'static
2074{
2075}
2076impl<Types: NodeType, P> AvailabilityProvider<Types> for P where
2077 P: Provider<Types, request::LeafRequest<Types>>
2078 + Provider<Types, request::LeafRangeRequest<Types>>
2079 + Provider<Types, request::PayloadRequest>
2080 + Provider<Types, request::BlockRangeRequest>
2081 + Provider<Types, request::VidCommonRequest>
2082 + Provider<Types, request::VidCommonRangeRequest>
2083 + Sync
2084 + 'static
2085{
2086}
2087
2088trait FetchRequest: Copy + Debug + Send + Sync + 'static {
2089 fn might_exist(self, _heights: Heights) -> bool {
2100 true
2101 }
2102}
2103
2104#[async_trait]
2110trait Fetchable<Types>: Clone + Send + Sync + 'static
2111where
2112 Types: NodeType,
2113 Header<Types>: QueryableHeader<Types>,
2114 Payload<Types>: QueryablePayload<Types>,
2115{
2116 type Request: FetchRequest;
2118
2119 fn satisfies(&self, req: Self::Request) -> bool;
2121
2122 async fn active_fetch<S, P>(
2141 tx: &mut impl AvailabilityStorage<Types>,
2142 fetcher: Arc<Fetcher<Types, S, P>>,
2143 req: Self::Request,
2144 ) -> anyhow::Result<()>
2145 where
2146 S: VersionedDataSource + 'static,
2147 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
2148 for<'a> S::ReadOnly<'a>:
2149 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
2150 P: AvailabilityProvider<Types>;
2151
2152 async fn passive_fetch(notifiers: &Notifiers<Types>, req: Self::Request) -> PassiveFetch<Self>;
2154
2155 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
2160 where
2161 S: AvailabilityStorage<Types>;
2162}
2163
2164type PassiveFetch<T> = BoxFuture<'static, Option<T>>;
2165
2166#[async_trait]
2167trait RangedFetchable<Types>: Fetchable<Types, Request = Self::RangedRequest> + HeightIndexed
2168where
2169 Types: NodeType,
2170 Header<Types>: QueryableHeader<Types>,
2171 Payload<Types>: QueryablePayload<Types>,
2172{
2173 type RangedRequest: FetchRequest + From<usize> + Send;
2174
2175 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
2177 where
2178 S: AvailabilityStorage<Types>,
2179 R: RangeBounds<usize> + Send + 'static;
2180}
2181
2182trait Storable<Types: NodeType>: Clone {
2184 fn debug_name(&self) -> String;
2186
2187 fn notify(&self, notifiers: &Notifiers<Types>) -> impl Send + Future<Output = ()>;
2189
2190 fn store(
2192 &self,
2193 storage: &mut impl UpdateAvailabilityStorage<Types>,
2194 leaf_only: bool,
2195 ) -> impl Send + Future<Output = anyhow::Result<()>>;
2196}
2197
2198impl<Types: NodeType> HeightIndexed
2199 for (LeafQueryData<Types>, Option<[CertificatePair<Types>; 2]>)
2200{
2201 fn height(&self) -> u64 {
2202 self.0.height()
2203 }
2204}
2205
2206impl<Types: NodeType> Storable<Types>
2207 for (LeafQueryData<Types>, Option<[CertificatePair<Types>; 2]>)
2208{
2209 fn debug_name(&self) -> String {
2210 format!("leaf {} with QC chain", self.0.height())
2211 }
2212
2213 async fn notify(&self, notifiers: &Notifiers<Types>) {
2214 self.0.notify(notifiers).await;
2215 }
2216
2217 async fn store(
2218 &self,
2219 storage: &mut impl UpdateAvailabilityStorage<Types>,
2220 _leaf_only: bool,
2221 ) -> anyhow::Result<()> {
2222 storage
2223 .insert_leaf_with_qc_chain(&self.0, self.1.clone())
2224 .await
2225 }
2226}
2227
2228fn range_chunks<R>(range: R, chunk_size: usize) -> impl Iterator<Item = Range<usize>>
2230where
2231 R: RangeBounds<usize>,
2232{
2233 let Range { mut start, end } = range_to_bounds(range);
2235 std::iter::from_fn(move || {
2236 let chunk_end = min(start + chunk_size, end);
2237 if chunk_end == start {
2238 return None;
2239 }
2240
2241 let chunk = start..chunk_end;
2242 start = chunk_end;
2243 Some(chunk)
2244 })
2245}
2246
2247#[allow(dead_code)]
2253fn range_chunks_aligned<R>(range: R, alignment: usize) -> impl Iterator<Item = Range<usize>>
2254where
2255 R: RangeBounds<usize>,
2256{
2257 let Range { mut start, end } = range_to_bounds(range);
2259
2260 let first = if start.is_multiple_of(alignment) {
2262 None
2263 } else {
2264 let chunk_end = min(start.next_multiple_of(alignment), end);
2267 let chunk = start..chunk_end;
2268
2269 start = chunk_end;
2271 Some(chunk)
2272 };
2273
2274 first.into_iter().chain(range_chunks(start..end, alignment))
2275}
2276
2277fn range_to_bounds(range: impl RangeBounds<usize>) -> Range<usize> {
2279 let start = match range.start_bound() {
2280 Bound::Included(i) => *i,
2281 Bound::Excluded(i) => *i + 1,
2282 Bound::Unbounded => 0,
2283 };
2284 let end = match range.end_bound() {
2285 Bound::Included(i) => *i + 1,
2286 Bound::Excluded(i) => *i,
2287 Bound::Unbounded => usize::MAX,
2288 };
2289 Range { start, end }
2290}
2291
2292fn range_chunks_rev(
2303 start: Bound<usize>,
2304 end: usize,
2305 chunk_size: usize,
2306) -> impl Iterator<Item = Range<usize>> {
2307 let start = match start {
2309 Bound::Included(i) => i,
2310 Bound::Excluded(i) => i + 1,
2311 Bound::Unbounded => 0,
2312 };
2313 let mut end = end + 1;
2315
2316 std::iter::from_fn(move || {
2317 let chunk_start = max(start, end.saturating_sub(chunk_size));
2318 if end <= chunk_start {
2319 return None;
2320 }
2321
2322 let chunk = chunk_start..end;
2323 end = chunk_start;
2324 Some(chunk)
2325 })
2326}
2327
2328fn range_chunks_aligned_rev(
2344 start: Bound<usize>,
2345 end: usize,
2346 alignment: usize,
2347) -> impl Iterator<Item = Range<usize>> {
2348 let start = match start {
2350 Bound::Included(i) => i,
2351 Bound::Excluded(i) => i + 1,
2352 Bound::Unbounded => 0,
2353 };
2354 let mut end = end + 1;
2356
2357 let first = if end.is_multiple_of(alignment) {
2359 None
2360 } else {
2361 let next_multiple = end.next_multiple_of(alignment);
2364 let prev_multiple = next_multiple - alignment;
2365 let chunk_start = max(prev_multiple, start);
2366 let chunk = chunk_start..end;
2367
2368 end = chunk_start;
2370 Some(chunk)
2371 };
2372
2373 first
2374 .into_iter()
2375 .chain(range_chunks_rev(Bound::Included(start), end - 1, alignment))
2376}
2377
2378trait ResultExt<T, E> {
2379 fn ok_or_trace(self) -> Option<T>
2380 where
2381 E: Display;
2382}
2383
2384impl<T, E> ResultExt<T, E> for Result<T, E> {
2385 fn ok_or_trace(self) -> Option<T>
2386 where
2387 E: Display,
2388 {
2389 match self {
2390 Ok(t) => Some(t),
2391 Err(err) => {
2392 tracing::info!(
2393 "error loading resource from local storage, will try to fetch: {err:#}"
2394 );
2395 None
2396 },
2397 }
2398 }
2399}
2400
2401#[derive(Debug)]
2402struct ScannerMetrics {
2403 running: Box<dyn Gauge>,
2405 current_scan: Box<dyn Gauge>,
2407 scanned_blocks: Box<dyn Gauge>,
2409 scanned_vid: Box<dyn Gauge>,
2411 missing_blocks: Box<dyn Gauge>,
2413 missing_vid: Box<dyn Gauge>,
2415}
2416
2417impl ScannerMetrics {
2418 fn new(metrics: &PrometheusMetrics) -> Self {
2419 let group = metrics.subgroup("scanner".into());
2420 Self {
2421 running: group.create_gauge("running".into(), None),
2422 current_scan: group.create_gauge("current".into(), None),
2423 scanned_blocks: group.create_gauge("scanned_blocks".into(), None),
2424 scanned_vid: group.create_gauge("scanned_vid".into(), None),
2425 missing_blocks: group.create_gauge("missing_blocks".into(), None),
2426 missing_vid: group.create_gauge("missing_vid".into(), None),
2427 }
2428 }
2429}
2430
2431#[derive(Debug)]
2432struct AggregatorMetrics {
2433 height: Box<dyn Gauge>,
2435}
2436
2437impl AggregatorMetrics {
2438 fn new(metrics: &PrometheusMetrics) -> Self {
2439 let group = metrics.subgroup("aggregator".into());
2440 Self {
2441 height: group.create_gauge("height".into(), None),
2442 }
2443 }
2444}
2445
2446#[derive(Debug)]
2447struct SyncStatusMetrics {
2448 current_range_start: Box<dyn Gauge>,
2449 current_range_end: Box<dyn Gauge>,
2450 current_start_time: Box<dyn Gauge>,
2451 avg_rate: Box<dyn Histogram>,
2452 ranges_scanned: Box<dyn Counter>,
2453 running: Box<dyn Gauge>,
2454}
2455
2456impl SyncStatusMetrics {
2457 fn new(metrics: &PrometheusMetrics, size: usize) -> Self {
2458 let group = metrics.subgroup("sync_status".into());
2459 group.create_gauge("range_size".into(), None).set(size);
2460
2461 Self {
2462 current_range_start: group.create_gauge("current_range_start".into(), None),
2463 current_range_end: group.create_gauge("current_range_end".into(), None),
2464 current_start_time: group
2465 .create_gauge("current_range_start_time".into(), Some("s".into())),
2466 avg_rate: group
2467 .create_histogram("avg_time_per_block_scanned".into(), Some("ms".into())),
2468 ranges_scanned: group.create_counter("ranges_scanned".into(), None),
2469 running: group.create_gauge("running".into(), None),
2470 }
2471 }
2472
2473 fn start_range(&self, range: &Range<usize>) -> SyncStatusRangeMetrics<'_> {
2474 let start = Utc::now();
2475 self.current_range_start.set(range.start);
2476 self.current_range_end.set(range.end);
2477 self.current_start_time.set(start.timestamp() as usize);
2478 self.running.set(1);
2479 SyncStatusRangeMetrics {
2480 size: range.end - range.start,
2481 start,
2482 metrics: self,
2483 }
2484 }
2485}
2486
2487#[must_use]
2488#[derive(Debug)]
2489struct SyncStatusRangeMetrics<'a> {
2490 size: usize,
2491 start: DateTime<Utc>,
2492 metrics: &'a SyncStatusMetrics,
2493}
2494
2495impl<'a> SyncStatusRangeMetrics<'a> {
2496 fn end(self) {
2497 let elapsed = Utc::now() - self.start;
2498 self.metrics
2499 .avg_rate
2500 .add_point((elapsed.num_milliseconds() as f64) / (self.size as f64));
2501 self.metrics.ranges_scanned.add(1);
2502 self.metrics.running.set(0);
2503 }
2504}
2505
2506#[derive(Debug)]
2507struct CachedSyncStatus {
2508 last_updated: Instant,
2509 ttl: Duration,
2510 cached: Option<SyncStatusQueryData>,
2511}
2512
2513impl CachedSyncStatus {
2514 fn new(ttl: Duration) -> Self {
2515 Self {
2516 last_updated: Instant::now(),
2517 ttl,
2518 cached: None,
2519 }
2520 }
2521
2522 fn try_get(&self) -> Option<&SyncStatusQueryData> {
2524 if self.last_updated.elapsed() > self.ttl {
2525 return None;
2527 }
2528 self.cached.as_ref()
2529 }
2530
2531 fn update(&mut self, value: SyncStatusQueryData) {
2533 self.last_updated = Instant::now();
2534 self.cached = Some(value);
2535 }
2536}
2537
2538fn passive<T>(
2542 req: impl Debug + Send + 'static,
2543 fut: impl Future<Output = Option<T>> + Send + 'static,
2544) -> Fetch<T>
2545where
2546 T: Send + 'static,
2547{
2548 Fetch::Pending(
2549 fut.then(move |opt| async move {
2550 match opt {
2551 Some(t) => t,
2552 None => {
2553 panic!("notifier dropped without satisfying request {req:?}");
2568 },
2569 }
2570 })
2571 .boxed(),
2572 )
2573}
2574
2575async fn select_some<T>(
2577 a: impl Future<Output = Option<T>> + Unpin,
2578 b: impl Future<Output = Option<T>> + Unpin,
2579) -> Option<T> {
2580 match future::select(a, b).await {
2581 Either::Left((Some(a), _)) => Some(a),
2583 Either::Right((Some(b), _)) => Some(b),
2584
2585 Either::Left((None, b)) => b.await,
2587 Either::Right((None, a)) => a.await,
2588 }
2589}
2590
2591#[cfg(test)]
2592mod test {
2593 use hotshot_example_types::node_types::TEST_VERSIONS;
2594
2595 use super::*;
2596 use crate::{
2597 data_source::{
2598 sql::testing::TmpDb,
2599 storage::{SqlStorage, StorageConnectionType},
2600 },
2601 fetching::provider::NoFetching,
2602 testing::{consensus::MockSqlDataSource, mocks::MockTypes},
2603 };
2604
2605 #[test]
2606 fn test_range_chunks() {
2607 assert_eq!(
2609 range_chunks(0..=4, 2).collect::<Vec<_>>(),
2610 [0..2, 2..4, 4..5]
2611 );
2612
2613 assert_eq!(
2615 range_chunks(0..=5, 2).collect::<Vec<_>>(),
2616 [0..2, 2..4, 4..6]
2617 );
2618
2619 assert_eq!(
2621 range_chunks(0..5, 2).collect::<Vec<_>>(),
2622 [0..2, 2..4, 4..5]
2623 );
2624
2625 assert_eq!(
2627 range_chunks(0..6, 2).collect::<Vec<_>>(),
2628 [0..2, 2..4, 4..6]
2629 );
2630
2631 assert_eq!(
2633 range_chunks(0.., 2).take(5).collect::<Vec<_>>(),
2634 [0..2, 2..4, 4..6, 6..8, 8..10]
2635 );
2636 }
2637
2638 #[test]
2639 fn test_range_chunks_aligned() {
2640 #![allow(clippy::single_range_in_vec_init)]
2641
2642 assert_eq!(
2644 range_chunks_aligned(2..5, 2).collect::<Vec<_>>(),
2645 [2..4, 4..5]
2646 );
2647
2648 assert_eq!(
2650 range_chunks_aligned(1..4, 2).collect::<Vec<_>>(),
2651 [1..2, 2..4]
2652 );
2653
2654 assert_eq!(range_chunks_aligned(1..3, 10).collect::<Vec<_>>(), [1..3]);
2656
2657 assert_eq!(
2659 range_chunks_aligned(1.., 2).take(5).collect::<Vec<_>>(),
2660 [1..2, 2..4, 4..6, 6..8, 8..10]
2661 );
2662 }
2663
2664 #[test]
2665 fn test_range_chunks_rev() {
2666 assert_eq!(
2668 range_chunks_rev(Bound::Included(0), 4, 2).collect::<Vec<_>>(),
2669 [3..5, 1..3, 0..1]
2670 );
2671
2672 assert_eq!(
2674 range_chunks_rev(Bound::Included(0), 5, 2).collect::<Vec<_>>(),
2675 [4..6, 2..4, 0..2]
2676 );
2677
2678 assert_eq!(
2680 range_chunks_rev(Bound::Excluded(0), 5, 2).collect::<Vec<_>>(),
2681 [4..6, 2..4, 1..2]
2682 );
2683
2684 assert_eq!(
2686 range_chunks_rev(Bound::Excluded(0), 4, 2).collect::<Vec<_>>(),
2687 [3..5, 1..3]
2688 );
2689 }
2690
2691 #[test]
2692 fn test_range_chunks_aligned_rev() {
2693 #![allow(clippy::single_range_in_vec_init)]
2694
2695 assert_eq!(
2697 range_chunks_aligned_rev(Bound::Included(1), 3, 2).collect::<Vec<_>>(),
2698 [2..4, 1..2]
2699 );
2700
2701 assert_eq!(
2703 range_chunks_aligned_rev(Bound::Included(0), 2, 2).collect::<Vec<_>>(),
2704 [2..3, 0..2]
2705 );
2706
2707 assert_eq!(
2709 range_chunks_aligned_rev(Bound::Excluded(0), 3, 10).collect::<Vec<_>>(),
2710 [1..4]
2711 );
2712 }
2713
2714 async fn test_sync_status(chunk_size: usize, present_ranges: &[(usize, usize)]) {
2715 let block_height = present_ranges.last().unwrap().1;
2716 let storage = TmpDb::init().await;
2717 let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
2718 .await
2719 .unwrap();
2720 let ds = MockSqlDataSource::builder(db, NoFetching)
2721 .with_sync_status_chunk_size(chunk_size)
2722 .with_sync_status_ttl(Duration::ZERO)
2723 .build()
2724 .await
2725 .unwrap();
2726
2727 let mut leaves: Vec<LeafQueryData<MockTypes>> = vec![
2729 LeafQueryData::<MockTypes>::genesis(
2730 &Default::default(),
2731 &Default::default(),
2732 TEST_VERSIONS.test,
2733 )
2734 .await,
2735 ];
2736 for i in 1..block_height {
2737 let mut leaf = leaves[i - 1].clone();
2738 leaf.leaf.block_header_mut().block_number = i as u64;
2739 leaves.push(leaf);
2740 }
2741
2742 {
2744 let mut tx = ds.write().await.unwrap();
2745
2746 for &(start, end) in present_ranges {
2747 for leaf in &leaves[start..end] {
2748 tracing::info!(height = leaf.height(), "insert leaf");
2749 tx.insert_leaf(leaf).await.unwrap();
2750 }
2751 }
2752
2753 if present_ranges[0].0 > 0 {
2754 tx.save_pruned_height((present_ranges[0].0 - 1) as u64)
2755 .await
2756 .unwrap();
2757 }
2758
2759 tx.commit().await.unwrap();
2760 }
2761
2762 let sync_status = ds.sync_status().await.unwrap().leaves;
2763
2764 let present: usize = present_ranges.iter().map(|(start, end)| end - start).sum();
2766 assert_eq!(
2767 sync_status.missing,
2768 block_height - present - present_ranges[0].0
2769 );
2770
2771 let mut ranges = sync_status.ranges.into_iter();
2773 let mut prev = 0;
2774 for &(start, end) in present_ranges {
2775 if start != prev {
2776 let range = ranges.next().unwrap();
2777 assert_eq!(
2778 range,
2779 SyncStatusRange {
2780 start: prev,
2781 end: start,
2782 status: if prev == 0 {
2783 SyncStatus::Pruned
2784 } else {
2785 SyncStatus::Missing
2786 },
2787 }
2788 );
2789 }
2790 let range = ranges.next().unwrap();
2791 assert_eq!(
2792 range,
2793 SyncStatusRange {
2794 start,
2795 end,
2796 status: SyncStatus::Present,
2797 }
2798 );
2799 prev = end;
2800 }
2801
2802 if prev != block_height {
2803 let range = ranges.next().unwrap();
2804 assert_eq!(
2805 range,
2806 SyncStatusRange {
2807 start: prev,
2808 end: block_height,
2809 status: SyncStatus::Missing,
2810 }
2811 );
2812 }
2813
2814 assert_eq!(ranges.next(), None);
2815 }
2816
2817 #[tokio::test]
2818 #[test_log::test]
2819 async fn test_sync_status_multiple_chunks() {
2820 test_sync_status(10, &[(0, 1), (3, 5), (8, 10)]).await;
2821 }
2822
2823 #[tokio::test]
2824 #[test_log::test]
2825 async fn test_sync_status_multiple_chunks_present_range_overlapping_chunk() {
2826 test_sync_status(5, &[(1, 4)]).await;
2827 }
2828
2829 #[tokio::test]
2830 #[test_log::test]
2831 async fn test_sync_status_multiple_chunks_missing_range_overlapping_chunk() {
2832 test_sync_status(5, &[(0, 1), (4, 5)]).await;
2833 }
2834
2835 #[tokio::test]
2836 #[test_log::test]
2837 async fn test_load_range_incomplete() {
2838 let storage = TmpDb::init().await;
2839 let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
2840 .await
2841 .unwrap();
2842 {
2843 let mut tx = db.write().await.unwrap();
2844 tx.insert_leaf(
2845 &LeafQueryData::<MockTypes>::genesis(
2846 &Default::default(),
2847 &Default::default(),
2848 TEST_VERSIONS.test,
2849 )
2850 .await,
2851 )
2852 .await
2853 .unwrap();
2854 tx.insert_block(
2855 &BlockQueryData::<MockTypes>::genesis(
2856 &Default::default(),
2857 &Default::default(),
2858 TEST_VERSIONS.test.base,
2859 )
2860 .await,
2861 )
2862 .await
2863 .unwrap();
2864 tx.insert_vid(
2865 &VidCommonQueryData::<MockTypes>::genesis(
2866 &Default::default(),
2867 &Default::default(),
2868 TEST_VERSIONS.test.base,
2869 )
2870 .await,
2871 None,
2872 )
2873 .await
2874 .unwrap();
2875 tx.commit().await.unwrap();
2876 }
2877
2878 let mut tx = db.read().await.unwrap();
2879 let req = RangeRequest { start: 0, end: 100 };
2880
2881 let err = <NonEmptyRange<BlockQueryData<MockTypes>>>::load(&mut tx, req)
2882 .await
2883 .unwrap_err();
2884 tracing::info!("loading partial block range failed as expected: {err:#}");
2885 assert!(matches!(err, QueryError::Missing));
2886
2887 let err =
2888 <NonEmptyRange<LeafQueryData<MockTypes>> as Fetchable<MockTypes>>::load(&mut tx, req)
2889 .await
2890 .unwrap_err();
2891 tracing::info!("loading partial leaf range failed as expected: {err:#}");
2892 assert!(matches!(err, QueryError::Missing));
2893
2894 let err = <NonEmptyRange<VidCommonQueryData<MockTypes>>>::load(&mut tx, req)
2895 .await
2896 .unwrap_err();
2897 tracing::info!("loading partial VID common range failed as expected: {err:#}");
2898 assert!(matches!(err, QueryError::Missing));
2899 }
2900}