hotshot_query_service/data_source/
fetching.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Asynchronous retrieval of missing data.
14//!
15//! [`FetchingDataSource`] combines a local storage implementation with a remote data availability
16//! provider to create a data sources which caches data locally, but which is capable of fetching
17//! missing data from a remote source, either proactively or on demand.
18//!
19//! This implementation supports three kinds of data fetching.
20//!
21//! # Proactive Fetching
22//!
23//! Proactive fetching means actively scanning the local database for missing objects and
24//! proactively retrieving them from a remote provider, even if those objects have yet to be
25//! requested by a client. Doing this increases the chance of success and decreases latency when a
26//! client does eventually ask for those objects. This is also the mechanism by which a query
27//! service joining a network late, or having been offline for some time, is able to catch up with
28//! the events on the network that it missed.
29//!
30//! Proactive fetching is implemented by a background task which performs periodic scans of the
31//! database, identifying and retrieving missing objects. This task is generally low priority, since
32//! missing objects are rare, and it will take care not to monopolize resources that could be used
33//! to serve requests.
34//!
35//! # Active Fetching
36//!
37//! Active fetching means reaching out to a remote data availability provider to retrieve a missing
38//! resource, upon receiving a request for that resource from a client. Not every request for a
39//! missing resource triggers an active fetch. To avoid spamming peers with requests for missing
40//! data, we only actively fetch resources that are known to exist somewhere. This means we can
41//! actively fetch leaves and headers when we are requested a leaf or header by height, whose height
42//! is less than the current chain height. We can fetch a block when the corresponding header exists
43//! (corresponding based on height, hash, or payload hash) or can be actively fetched.
44//!
45//! # Passive Fetching
46//!
47//! For requests that cannot be actively fetched (for example, a block requested by hash, where we
48//! do not have a header proving that a block with that hash exists), we use passive fetching. This
49//! essentially means waiting passively until the query service receives an object that satisfies
50//! the request. This object may be received because it was actively fetched in responsive to a
51//! different request for the same object, one that permitted an active fetch. Or it may have been
52//! fetched [proactively](#proactive-fetching).
53
54use std::{
55    cmp::{max, min},
56    fmt::{Debug, Display},
57    iter::repeat_with,
58    marker::PhantomData,
59    ops::{Bound, Range, RangeBounds},
60    sync::Arc,
61    time::{Duration, Instant},
62};
63
64use anyhow::{Context, bail};
65use async_lock::Semaphore;
66use async_trait::async_trait;
67use backoff::{ExponentialBackoff, ExponentialBackoffBuilder, backoff::Backoff};
68use chrono::{DateTime, Utc};
69use derivative::Derivative;
70use futures::{
71    channel::oneshot,
72    future::{self, BoxFuture, Either, Future, FutureExt, join_all},
73    stream::{self, BoxStream, StreamExt},
74};
75use hotshot_types::{
76    data::VidShare,
77    simple_certificate::CertificatePair,
78    traits::{
79        metrics::{Counter, Gauge, Histogram, Metrics},
80        node_implementation::NodeType,
81    },
82};
83use jf_merkle_tree_compat::{MerkleTreeScheme, prelude::MerkleProof};
84use tagged_base64::TaggedBase64;
85use tokio::{spawn, sync::Mutex, time::sleep};
86use tracing::Instrument;
87
88use super::{
89    Transaction, VersionedDataSource,
90    notifier::Notifier,
91    storage::{
92        Aggregate, AggregatesStorage, AvailabilityStorage, ExplorerStorage,
93        MerklizedStateHeightStorage, MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage,
94        UpdateAvailabilityStorage,
95        pruning::{PruneStorage, PrunedHeightDataSource, PrunedHeightStorage},
96    },
97};
98use crate::{
99    Header, Payload, QueryError, QueryResult,
100    availability::{
101        AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction, Fetch,
102        FetchStream, HeaderQueryData, LeafId, LeafQueryData, NamespaceId, PayloadMetadata,
103        PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash,
104        UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
105    },
106    data_source::fetching::{leaf::RangeRequest, vid::VidCommonRangeFetcher},
107    explorer::{self, ExplorerDataSource},
108    fetching::{self, NonEmptyRange, Provider, request},
109    merklized_state::{
110        MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
111    },
112    metrics::PrometheusMetrics,
113    node::{
114        NodeDataSource, SyncStatus, SyncStatusQueryData, SyncStatusRange, TimeWindowQueryData,
115        WindowStart,
116    },
117    status::{HasMetrics, StatusDataSource},
118    task::BackgroundTask,
119    types::HeightIndexed,
120};
121
122mod block;
123mod header;
124mod leaf;
125mod transaction;
126mod vid;
127
128use self::{
129    block::{PayloadFetcher, PayloadRangeFetcher},
130    leaf::{LeafFetcher, LeafRangeFetcher},
131    transaction::TransactionRequest,
132    vid::{VidCommonFetcher, VidCommonRequest},
133};
134
135/// Builder for [`FetchingDataSource`] with configuration.
136pub struct Builder<Types, S, P> {
137    storage: S,
138    provider: P,
139    backoff: ExponentialBackoffBuilder,
140    rate_limit: usize,
141    range_chunk_size: usize,
142    proactive_interval: Duration,
143    proactive_range_chunk_size: usize,
144    sync_status_chunk_size: usize,
145    active_fetch_delay: Duration,
146    chunk_fetch_delay: Duration,
147    proactive_fetching: bool,
148    aggregator: bool,
149    aggregator_chunk_size: Option<usize>,
150    leaf_only: bool,
151    sync_status_ttl: Duration,
152    _types: PhantomData<Types>,
153}
154
155impl<Types, S, P> Builder<Types, S, P> {
156    /// Construct a new builder with the given storage and fetcher and the default options.
157    pub fn new(storage: S, provider: P) -> Self {
158        let mut default_backoff = ExponentialBackoffBuilder::default();
159        default_backoff
160            .with_initial_interval(Duration::from_secs(1))
161            .with_multiplier(2.)
162            .with_max_interval(Duration::from_secs(32))
163            .with_max_elapsed_time(Some(Duration::from_secs(64)));
164
165        Self {
166            storage,
167            provider,
168            backoff: default_backoff,
169            rate_limit: 32,
170            range_chunk_size: 25,
171            proactive_interval: Duration::from_hours(8),
172            proactive_range_chunk_size: 100,
173            sync_status_chunk_size: 100_000,
174            active_fetch_delay: Duration::from_millis(50),
175            chunk_fetch_delay: Duration::from_millis(100),
176            proactive_fetching: true,
177            aggregator: true,
178            aggregator_chunk_size: None,
179            leaf_only: false,
180            sync_status_ttl: Duration::from_mins(5),
181            _types: Default::default(),
182        }
183    }
184
185    pub fn leaf_only(mut self) -> Self {
186        self.leaf_only = true;
187        self
188    }
189
190    /// Set the minimum delay between retries of failed operations.
191    pub fn with_min_retry_interval(mut self, interval: Duration) -> Self {
192        self.backoff.with_initial_interval(interval);
193        self
194    }
195
196    /// Set the maximum delay between retries of failed operations.
197    pub fn with_max_retry_interval(mut self, interval: Duration) -> Self {
198        self.backoff.with_max_interval(interval);
199        self
200    }
201
202    /// Set the multiplier for exponential backoff when retrying failed requests.
203    pub fn with_retry_multiplier(mut self, multiplier: f64) -> Self {
204        self.backoff.with_multiplier(multiplier);
205        self
206    }
207
208    /// Set the randomization factor for randomized backoff when retrying failed requests.
209    pub fn with_retry_randomization_factor(mut self, factor: f64) -> Self {
210        self.backoff.with_randomization_factor(factor);
211        self
212    }
213
214    /// Set the maximum time to retry failed operations before giving up.
215    pub fn with_retry_timeout(mut self, timeout: Duration) -> Self {
216        self.backoff.with_max_elapsed_time(Some(timeout));
217        self
218    }
219
220    /// Set the maximum number of simultaneous fetches.
221    pub fn with_rate_limit(mut self, with_rate_limit: usize) -> Self {
222        self.rate_limit = with_rate_limit;
223        self
224    }
225
226    /// Set the number of items to process at a time when loading a range or stream.
227    ///
228    /// This determines:
229    /// * The number of objects to load from storage in a single request
230    /// * The number of objects to buffer in memory per request/stream
231    /// * The number of concurrent notification subscriptions per request/stream
232    pub fn with_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
233        self.range_chunk_size = range_chunk_size;
234        self
235    }
236
237    /// Set the time interval between proactive fetching scans.
238    ///
239    /// See [proactive fetching](self#proactive-fetching).
240    pub fn with_proactive_interval(mut self, interval: Duration) -> Self {
241        self.proactive_interval = interval;
242        self
243    }
244
245    /// Set the number of items to process at a time when scanning for proactive fetching.
246    ///
247    /// This is similar to [`Self::with_range_chunk_size`], but only affects the chunk size for
248    /// proactive fetching scans, not for normal subscription streams. This can be useful to tune
249    /// the proactive scanner to be more or less greedy with persistent storage resources.
250    pub fn with_proactive_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
251        self.proactive_range_chunk_size = range_chunk_size;
252        self
253    }
254
255    /// Set the number of items to process in a single transaction when scanning the database for
256    /// missing objects.
257    pub fn with_sync_status_chunk_size(mut self, chunk_size: usize) -> Self {
258        self.sync_status_chunk_size = chunk_size;
259        self
260    }
261
262    /// Duration to cache sync status results for.
263    ///
264    /// Computing the sync status is expensive, and it typically doesn't change that quickly. Thus,
265    /// it makes sense to cache the results whenever we do compute it, and return those cached
266    /// results if they are not too old.
267    pub fn with_sync_status_ttl(mut self, ttl: Duration) -> Self {
268        self.sync_status_ttl = ttl;
269        self
270    }
271
272    /// Add a delay between active fetches in proactive scans.
273    ///
274    /// This can be used to limit the rate at which this query service makes requests to other query
275    /// services during proactive scans. This is useful if the query service has a lot of blocks to
276    /// catch up on, as without a delay, scanning can be extremely burdensome on the peer.
277    pub fn with_active_fetch_delay(mut self, active_fetch_delay: Duration) -> Self {
278        self.active_fetch_delay = active_fetch_delay;
279        self
280    }
281
282    /// Adds a delay between chunk fetches during proactive scans.
283    ///
284    /// In a proactive scan, we retrieve a range of objects from a provider or local storage (e.g., a database).
285    /// Without a delay between fetching these chunks, the process can become very CPU-intensive, especially
286    /// when chunks are retrieved from local storage. While there is already a delay for active fetches
287    /// (`active_fetch_delay`), situations may arise when subscribed to an old stream that fetches most of the data
288    /// from local storage.
289    ///
290    /// This additional delay helps to limit constant maximum CPU usage
291    /// and ensures that local storage remains accessible to all processes,
292    /// not just the proactive scanner.
293    pub fn with_chunk_fetch_delay(mut self, chunk_fetch_delay: Duration) -> Self {
294        self.chunk_fetch_delay = chunk_fetch_delay;
295        self
296    }
297
298    /// Run without [proactive fetching](self#proactive-fetching).
299    ///
300    /// This can reduce load on the CPU and the database, but increases the probability that
301    /// requests will fail due to missing resources. If resources are constrained, it is recommended
302    /// to run with rare proactive fetching (see
303    /// [`with_major_scan_interval`](Self::with_major_scan_interval),
304    /// [`with_minor_scan_interval`](Self::with_minor_scan_interval)), rather than disabling it
305    /// entirely.
306    pub fn disable_proactive_fetching(mut self) -> Self {
307        self.proactive_fetching = false;
308        self
309    }
310
311    /// Run without an aggregator.
312    ///
313    /// This can reduce load on the CPU and the database, but it will cause aggregate statistics
314    /// (such as transaction counts) not to update.
315    pub fn disable_aggregator(mut self) -> Self {
316        self.aggregator = false;
317        self
318    }
319
320    /// Set the number of items to process at a time when computing aggregate statistics.
321    ///
322    /// This is similar to [`Self::with_range_chunk_size`], but only affects the chunk size for
323    /// the aggregator task, not for normal subscription streams. This can be useful to tune
324    /// the aggregator to be more or less greedy with persistent storage resources.
325    ///
326    /// By default (i.e. if this method is not called) the proactive range chunk size will be set to
327    /// whatever the normal range chunk size is.
328    pub fn with_aggregator_chunk_size(mut self, chunk_size: usize) -> Self {
329        self.aggregator_chunk_size = Some(chunk_size);
330        self
331    }
332
333    pub fn is_leaf_only(&self) -> bool {
334        self.leaf_only
335    }
336}
337
338impl<Types, S, P> Builder<Types, S, P>
339where
340    Types: NodeType,
341    Payload<Types>: QueryablePayload<Types>,
342    Header<Types>: QueryableHeader<Types>,
343    S: PruneStorage + VersionedDataSource + HasMetrics + 'static,
344    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
345        + PrunedHeightStorage
346        + NodeStorage<Types>
347        + AggregatesStorage<Types>,
348    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
349    P: AvailabilityProvider<Types>,
350{
351    /// Build a [`FetchingDataSource`] with these options.
352    pub async fn build(self) -> anyhow::Result<FetchingDataSource<Types, S, P>> {
353        FetchingDataSource::new(self).await
354    }
355}
356
357/// The most basic kind of data source.
358///
359/// A data source is constructed modularly by combining a [storage](super::storage) implementation
360/// with a [Fetcher](crate::fetching::Fetcher). The former allows the query service to store the
361/// data it has persistently in an easily accessible storage medium, such as the local file system
362/// or a database. This allows it to answer queries efficiently and to maintain its state across
363/// restarts. The latter allows the query service to fetch data that is missing from its storage
364/// from an external data availability provider, such as the Tiramisu DA network or another instance
365/// of the query service.
366///
367/// These two components of a data source are combined in [`FetchingDataSource`], which is the
368/// lowest level kind of data source available. It simply uses the storage implementation to fetch
369/// data when available, and fills in everything else using the fetcher. Various kinds of data
370/// sources can be constructed out of [`FetchingDataSource`] by changing the storage and fetcher
371/// implementations used, and more complex data sources can be built on top using data source
372/// combinators.
373#[derive(Derivative)]
374#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, P: Debug"))]
375pub struct FetchingDataSource<Types, S, P>
376where
377    Types: NodeType,
378{
379    // The fetcher manages retrieval of resources from both local storage and a remote provider. It
380    // encapsulates the data which may need to be shared with a long-lived task or future that
381    // implements the asynchronous fetching of a particular object. This is why it gets its own
382    // type, wrapped in an [`Arc`] for easy, efficient cloning.
383    fetcher: Arc<Fetcher<Types, S, P>>,
384    // The proactive scanner task. This is only saved here so that we can cancel it on drop.
385    scanner: Option<BackgroundTask>,
386    // The aggregator task, which derives aggregate statistics from a block stream.
387    aggregator: Option<BackgroundTask>,
388    pruner: Pruner<Types, S>,
389}
390
391#[derive(Derivative)]
392#[derivative(Clone(bound = ""), Debug(bound = "S: Debug,   "))]
393pub struct Pruner<Types, S>
394where
395    Types: NodeType,
396{
397    handle: Option<BackgroundTask>,
398    _types: PhantomData<(Types, S)>,
399}
400
401impl<Types, S> Pruner<Types, S>
402where
403    Types: NodeType,
404    Header<Types>: QueryableHeader<Types>,
405    Payload<Types>: QueryablePayload<Types>,
406    S: PruneStorage + Send + Sync + 'static,
407{
408    async fn new(storage: Arc<S>, backoff: ExponentialBackoff) -> Self {
409        let cfg = storage.get_pruning_config();
410        let Some(cfg) = cfg else {
411            return Self {
412                handle: None,
413                _types: Default::default(),
414            };
415        };
416
417        let future = async move {
418            for i in 1.. {
419                // Delay before we start the pruner run to avoid a useless and expensive prune
420                // immediately on startup.
421                sleep(cfg.interval()).await;
422
423                tracing::warn!("starting pruner run {i} ");
424                Self::prune(storage.clone(), &backoff).await;
425            }
426        };
427
428        let task = BackgroundTask::spawn("pruner", future);
429
430        Self {
431            handle: Some(task),
432            _types: Default::default(),
433        }
434    }
435
436    async fn prune(storage: Arc<S>, backoff: &ExponentialBackoff) {
437        // We loop until the whole run pruner run is complete
438        let mut pruner = S::Pruner::default();
439        'run: loop {
440            let mut backoff = backoff.clone();
441            backoff.reset();
442            'batch: loop {
443                match storage.prune(&mut pruner).await {
444                    Ok(Some(height)) => {
445                        tracing::warn!("Pruned to height {height}");
446                        break 'batch;
447                    },
448                    Ok(None) => {
449                        tracing::warn!("pruner run complete.");
450                        break 'run;
451                    },
452                    Err(e) => {
453                        tracing::warn!("error pruning batch: {e:#}");
454                        if let Some(delay) = backoff.next_backoff() {
455                            sleep(delay).await;
456                        } else {
457                            tracing::error!("pruning run failed after too many errors: {e:#}");
458                            break 'run;
459                        }
460                    },
461                }
462            }
463        }
464    }
465}
466
467impl<Types, S, P> FetchingDataSource<Types, S, P>
468where
469    Types: NodeType,
470    Payload<Types>: QueryablePayload<Types>,
471    Header<Types>: QueryableHeader<Types>,
472    S: VersionedDataSource + PruneStorage + HasMetrics + 'static,
473    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
474    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
475        + NodeStorage<Types>
476        + PrunedHeightStorage
477        + AggregatesStorage<Types>,
478    P: AvailabilityProvider<Types>,
479{
480    /// Build a [`FetchingDataSource`] with the given `storage` and `provider`.
481    pub fn builder(storage: S, provider: P) -> Builder<Types, S, P> {
482        Builder::new(storage, provider)
483    }
484
485    async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
486        let leaf_only = builder.is_leaf_only();
487        let aggregator = builder.aggregator;
488        let aggregator_chunk_size = builder
489            .aggregator_chunk_size
490            .unwrap_or(builder.range_chunk_size);
491        let proactive_fetching = builder.proactive_fetching;
492        let proactive_interval = builder.proactive_interval;
493        let proactive_range_chunk_size = builder.proactive_range_chunk_size;
494        let backoff = builder.backoff.build();
495        let scanner_metrics = ScannerMetrics::new(builder.storage.metrics());
496        let aggregator_metrics = AggregatorMetrics::new(builder.storage.metrics());
497
498        let fetcher = Arc::new(Fetcher::new(builder).await?);
499        let scanner = if proactive_fetching && !leaf_only {
500            Some(BackgroundTask::spawn(
501                "proactive scanner",
502                fetcher.clone().proactive_scan(
503                    proactive_interval,
504                    proactive_range_chunk_size,
505                    scanner_metrics,
506                ),
507            ))
508        } else {
509            None
510        };
511
512        let aggregator = if aggregator && !leaf_only {
513            Some(BackgroundTask::spawn(
514                "aggregator",
515                fetcher
516                    .clone()
517                    .aggregate(aggregator_chunk_size, aggregator_metrics),
518            ))
519        } else {
520            None
521        };
522
523        let storage = fetcher.storage.clone();
524
525        let pruner = Pruner::new(storage, backoff).await;
526        let ds = Self {
527            fetcher,
528            scanner,
529            pruner,
530            aggregator,
531        };
532
533        Ok(ds)
534    }
535
536    /// Get a copy of the (shared) inner storage
537    pub fn inner(&self) -> Arc<S> {
538        self.fetcher.storage.clone()
539    }
540}
541
542impl<Types, S, P> AsRef<S> for FetchingDataSource<Types, S, P>
543where
544    Types: NodeType,
545{
546    fn as_ref(&self) -> &S {
547        &self.fetcher.storage
548    }
549}
550
551impl<Types, S, P> HasMetrics for FetchingDataSource<Types, S, P>
552where
553    Types: NodeType,
554    S: HasMetrics,
555{
556    fn metrics(&self) -> &PrometheusMetrics {
557        self.as_ref().metrics()
558    }
559}
560
561#[async_trait]
562impl<Types, S, P> StatusDataSource for FetchingDataSource<Types, S, P>
563where
564    Types: NodeType,
565    Header<Types>: QueryableHeader<Types>,
566    S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
567    for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
568    P: Send + Sync,
569{
570    async fn block_height(&self) -> QueryResult<usize> {
571        let mut tx = self.read().await.map_err(|err| QueryError::Error {
572            message: err.to_string(),
573        })?;
574        tx.block_height().await
575    }
576}
577
578#[async_trait]
579impl<Types, S, P> PrunedHeightDataSource for FetchingDataSource<Types, S, P>
580where
581    Types: NodeType,
582    S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
583    for<'a> S::ReadOnly<'a>: PrunedHeightStorage,
584    P: Send + Sync,
585{
586    async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
587        let mut tx = self.read().await?;
588        tx.load_pruned_height().await
589    }
590}
591
592#[async_trait]
593impl<Types, S, P> AvailabilityDataSource<Types> for FetchingDataSource<Types, S, P>
594where
595    Types: NodeType,
596    Header<Types>: QueryableHeader<Types>,
597    Payload<Types>: QueryablePayload<Types>,
598    S: VersionedDataSource + 'static,
599    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
600    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
601    P: AvailabilityProvider<Types>,
602{
603    async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
604    where
605        ID: Into<LeafId<Types>> + Send + Sync,
606    {
607        self.fetcher.get(id.into()).await
608    }
609
610    async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
611    where
612        ID: Into<BlockId<Types>> + Send + Sync,
613    {
614        self.fetcher
615            .get::<HeaderQueryData<_>>(id.into())
616            .await
617            .map(|h| h.header)
618    }
619
620    async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
621    where
622        ID: Into<BlockId<Types>> + Send + Sync,
623    {
624        self.fetcher.get(id.into()).await
625    }
626
627    async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
628    where
629        ID: Into<BlockId<Types>> + Send + Sync,
630    {
631        self.fetcher.get(id.into()).await
632    }
633
634    async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
635    where
636        ID: Into<BlockId<Types>> + Send + Sync,
637    {
638        self.fetcher.get(id.into()).await
639    }
640
641    async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
642    where
643        ID: Into<BlockId<Types>> + Send + Sync,
644    {
645        self.fetcher.get(VidCommonRequest::from(id.into())).await
646    }
647
648    async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
649    where
650        ID: Into<BlockId<Types>> + Send + Sync,
651    {
652        self.fetcher.get(VidCommonRequest::from(id.into())).await
653    }
654
655    async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
656    where
657        R: RangeBounds<usize> + Send + 'static,
658    {
659        self.fetcher.clone().get_range(range)
660    }
661
662    async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
663    where
664        R: RangeBounds<usize> + Send + 'static,
665    {
666        self.fetcher.clone().get_range(range)
667    }
668
669    async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
670    where
671        R: RangeBounds<usize> + Send + 'static,
672    {
673        let leaves: FetchStream<LeafQueryData<Types>> = self.fetcher.clone().get_range(range);
674
675        leaves
676            .map(|fetch| fetch.map(|leaf| leaf.leaf.block_header().clone()))
677            .boxed()
678    }
679
680    async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
681    where
682        R: RangeBounds<usize> + Send + 'static,
683    {
684        self.fetcher.clone().get_range(range)
685    }
686
687    async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
688    where
689        R: RangeBounds<usize> + Send + 'static,
690    {
691        self.fetcher.clone().get_range(range)
692    }
693
694    async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
695    where
696        R: RangeBounds<usize> + Send + 'static,
697    {
698        self.fetcher.clone().get_range(range)
699    }
700
701    async fn get_vid_common_metadata_range<R>(
702        &self,
703        range: R,
704    ) -> FetchStream<VidCommonMetadata<Types>>
705    where
706        R: RangeBounds<usize> + Send + 'static,
707    {
708        self.fetcher.clone().get_range(range)
709    }
710
711    async fn get_leaf_range_rev(
712        &self,
713        start: Bound<usize>,
714        end: usize,
715    ) -> FetchStream<LeafQueryData<Types>> {
716        self.fetcher.clone().get_range_rev(start, end)
717    }
718
719    async fn get_block_range_rev(
720        &self,
721        start: Bound<usize>,
722        end: usize,
723    ) -> FetchStream<BlockQueryData<Types>> {
724        self.fetcher.clone().get_range_rev(start, end)
725    }
726
727    async fn get_payload_range_rev(
728        &self,
729        start: Bound<usize>,
730        end: usize,
731    ) -> FetchStream<PayloadQueryData<Types>> {
732        self.fetcher.clone().get_range_rev(start, end)
733    }
734
735    async fn get_payload_metadata_range_rev(
736        &self,
737        start: Bound<usize>,
738        end: usize,
739    ) -> FetchStream<PayloadMetadata<Types>> {
740        self.fetcher.clone().get_range_rev(start, end)
741    }
742
743    async fn get_vid_common_range_rev(
744        &self,
745        start: Bound<usize>,
746        end: usize,
747    ) -> FetchStream<VidCommonQueryData<Types>> {
748        self.fetcher.clone().get_range_rev(start, end)
749    }
750
751    async fn get_vid_common_metadata_range_rev(
752        &self,
753        start: Bound<usize>,
754        end: usize,
755    ) -> FetchStream<VidCommonMetadata<Types>> {
756        self.fetcher.clone().get_range_rev(start, end)
757    }
758
759    async fn get_block_containing_transaction(
760        &self,
761        h: TransactionHash<Types>,
762    ) -> Fetch<BlockWithTransaction<Types>> {
763        self.fetcher.clone().get(TransactionRequest::from(h)).await
764    }
765}
766
767impl<Types, S, P> UpdateAvailabilityData<Types> for FetchingDataSource<Types, S, P>
768where
769    Types: NodeType,
770    Header<Types>: QueryableHeader<Types>,
771    Payload<Types>: QueryablePayload<Types>,
772    S: VersionedDataSource + 'static,
773    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
774    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
775    P: AvailabilityProvider<Types>,
776{
777    async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
778        let height = info.height() as usize;
779
780        // Save the new decided leaf.
781        self.fetcher
782            .store(&(info.leaf.clone(), info.qc_chain))
783            .await;
784
785        // Trigger a fetch of the parent leaf, if we don't already have it.
786        leaf::trigger_fetch_for_parent(&self.fetcher, &info.leaf);
787
788        // Store and notify the block data and VID common, if available. Spawn a fetch to retrieve
789        // it, if not.
790        //
791        // Note a special case here: if the data was not available in the decide event, but _is_
792        // available locally in the database, without having to spawn a fetch for it, we _must_
793        // notify now. Thus, we must pattern match to distinguish `Fetch::Ready`/`Fetch::Pending`.
794        //
795        // Why? As soon as we inserted the leaf, the corresponding object may become available, if
796        // we already had an identical payload/VID common in the database, from a different block.
797        // Then calling `get()` will not spawn a fetch/notification, and existing fetches waiting
798        // for the newly decided object to arrive will miss it. Thus, if `get()` returned a `Ready`
799        // object, it is our responsibility, as the task processing newly decided objects, to make
800        // sure those fetches get notified.
801        let block = match info.block {
802            Some(block) => Some(block),
803            None => match self.fetcher.get::<BlockQueryData<Types>>(height).await {
804                Fetch::Ready(block) => Some(block),
805                Fetch::Pending(fut) => {
806                    let span = tracing::info_span!("fetch missing block", height);
807                    spawn(
808                        async move {
809                            tracing::info!("fetching missing block");
810                            fut.await;
811                        }
812                        .instrument(span),
813                    );
814                    None
815                },
816            },
817        };
818        if let Some(block) = &block {
819            self.fetcher.store(block).await;
820        }
821        let vid = match info.vid_common {
822            Some(vid) => Some(vid),
823            None => match self.fetcher.get::<VidCommonQueryData<Types>>(height).await {
824                Fetch::Ready(vid) => Some(vid),
825                Fetch::Pending(fut) => {
826                    let span = tracing::info_span!("fetch missing VID common", height);
827                    spawn(
828                        async move {
829                            tracing::info!("fetching missing VID common");
830                            fut.await;
831                        }
832                        .instrument(span),
833                    );
834                    None
835                },
836            },
837        };
838        if let Some(vid) = &vid {
839            self.fetcher.store(&(vid.clone(), info.vid_share)).await;
840        }
841
842        // Send notifications for the new objects after storing all of them. This ensures that as
843        // soon as a fetch for any of these objects resolves, the corresponding data will
844        // immediately be available. This isn't strictly required for correctness; after all,
845        // objects can generally be fetched as asynchronously as we want. But this is the most
846        // intuitive behavior to provide when possible.
847        info.leaf.notify(&self.fetcher.notifiers).await;
848        if let Some(block) = &block {
849            block.notify(&self.fetcher.notifiers).await;
850        }
851        if let Some(vid) = &vid {
852            vid.notify(&self.fetcher.notifiers).await;
853        }
854
855        Ok(())
856    }
857}
858
859impl<Types, S, P> VersionedDataSource for FetchingDataSource<Types, S, P>
860where
861    Types: NodeType,
862    S: VersionedDataSource + Send + Sync,
863    P: Send + Sync,
864{
865    type Transaction<'a>
866        = S::Transaction<'a>
867    where
868        Self: 'a;
869    type ReadOnly<'a>
870        = S::ReadOnly<'a>
871    where
872        Self: 'a;
873
874    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
875        self.fetcher.write().await
876    }
877
878    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
879        self.fetcher.read().await
880    }
881}
882
883/// Asynchronous retrieval and storage of [`Fetchable`] resources.
884#[derive(Debug)]
885struct Fetcher<Types, S, P>
886where
887    Types: NodeType,
888{
889    storage: Arc<S>,
890    notifiers: Notifiers<Types>,
891    provider: Arc<P>,
892    leaf_fetcher: Arc<LeafFetcher<Types, S, P>>,
893    leaf_range_fetcher: Arc<LeafRangeFetcher<Types, S, P>>,
894    payload_fetcher: Option<Arc<PayloadFetcher<Types, S, P>>>,
895    payload_range_fetcher: Option<Arc<PayloadRangeFetcher<Types, S, P>>>,
896    vid_common_fetcher: Option<Arc<VidCommonFetcher<Types, S, P>>>,
897    vid_common_range_fetcher: Option<Arc<VidCommonRangeFetcher<Types, S, P>>>,
898    range_chunk_size: usize,
899    sync_status_chunk_size: usize,
900    // Duration to sleep after each active fetch,
901    active_fetch_delay: Duration,
902    // Duration to sleep after each chunk fetched
903    chunk_fetch_delay: Duration,
904    // Exponential backoff when retrying failed operations.
905    backoff: ExponentialBackoff,
906    // Semaphore limiting the number of simultaneous DB accesses we can have from tasks spawned to
907    // retry failed loads.
908    retry_semaphore: Arc<Semaphore>,
909    leaf_only: bool,
910    sync_status_metrics: SyncStatusMetrics,
911    sync_status: Mutex<CachedSyncStatus>,
912}
913
914impl<Types, S, P> VersionedDataSource for Fetcher<Types, S, P>
915where
916    Types: NodeType,
917    S: VersionedDataSource + Send + Sync,
918    P: Send + Sync,
919{
920    type Transaction<'a>
921        = S::Transaction<'a>
922    where
923        Self: 'a;
924    type ReadOnly<'a>
925        = S::ReadOnly<'a>
926    where
927        Self: 'a;
928
929    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
930        self.storage.write().await
931    }
932
933    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
934        self.storage.read().await
935    }
936}
937
938impl<Types, S, P> Fetcher<Types, S, P>
939where
940    Types: NodeType,
941    Header<Types>: QueryableHeader<Types>,
942    S: VersionedDataSource + HasMetrics + Sync,
943    for<'a> S::ReadOnly<'a>: PrunedHeightStorage + NodeStorage<Types>,
944{
945    pub async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
946        let retry_semaphore = Arc::new(Semaphore::new(builder.rate_limit));
947        let backoff = builder.backoff.build();
948
949        let (payload_fetcher, payload_range_fetcher, vid_common_fetcher, vid_common_range_fetcher) =
950            if builder.is_leaf_only() {
951                (None, None, None, None)
952            } else {
953                (
954                    Some(Arc::new(fetching::Fetcher::new(
955                        retry_semaphore.clone(),
956                        backoff.clone(),
957                    ))),
958                    Some(Arc::new(fetching::Fetcher::new(
959                        retry_semaphore.clone(),
960                        backoff.clone(),
961                    ))),
962                    Some(Arc::new(fetching::Fetcher::new(
963                        retry_semaphore.clone(),
964                        backoff.clone(),
965                    ))),
966                    Some(Arc::new(fetching::Fetcher::new(
967                        retry_semaphore.clone(),
968                        backoff.clone(),
969                    ))),
970                )
971            };
972        let leaf_fetcher = fetching::Fetcher::new(retry_semaphore.clone(), backoff.clone());
973        let leaf_range_fetcher = fetching::Fetcher::new(retry_semaphore.clone(), backoff.clone());
974
975        let leaf_only = builder.leaf_only;
976        let sync_status_metrics =
977            SyncStatusMetrics::new(builder.storage.metrics(), builder.sync_status_chunk_size);
978
979        Ok(Self {
980            storage: Arc::new(builder.storage),
981            notifiers: Default::default(),
982            provider: Arc::new(builder.provider),
983            leaf_fetcher: Arc::new(leaf_fetcher),
984            leaf_range_fetcher: Arc::new(leaf_range_fetcher),
985            payload_fetcher,
986            payload_range_fetcher,
987            vid_common_fetcher,
988            vid_common_range_fetcher,
989            range_chunk_size: builder.range_chunk_size,
990            sync_status_chunk_size: builder.sync_status_chunk_size,
991            active_fetch_delay: builder.active_fetch_delay,
992            chunk_fetch_delay: builder.chunk_fetch_delay,
993            backoff,
994            retry_semaphore,
995            leaf_only,
996            sync_status_metrics,
997            sync_status: Mutex::new(CachedSyncStatus::new(builder.sync_status_ttl)),
998        })
999    }
1000}
1001
1002impl<Types, S, P> Fetcher<Types, S, P>
1003where
1004    Types: NodeType,
1005    Header<Types>: QueryableHeader<Types>,
1006    Payload<Types>: QueryablePayload<Types>,
1007    S: VersionedDataSource + 'static,
1008    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1009    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
1010    P: AvailabilityProvider<Types>,
1011{
1012    async fn get<T>(self: &Arc<Self>, req: impl Into<T::Request> + Send) -> Fetch<T>
1013    where
1014        T: Fetchable<Types>,
1015    {
1016        let req = req.into();
1017
1018        // Subscribe to notifications before we check storage for the requested object. This ensures
1019        // that this operation will always eventually succeed as long as the requested object
1020        // actually exists (or will exist). We will either find it in our local storage and succeed
1021        // immediately, or (if it exists) someone will *later* come and add it to storage, at which
1022        // point we will get a notification causing this passive fetch to resolve.
1023        //
1024        // Note the "someone" who later fetches the object and adds it to storage may be an active
1025        // fetch triggered by this very requests, in cases where that is possible, but it need not
1026        // be.
1027        let passive_fetch = T::passive_fetch(&self.notifiers, req).await;
1028
1029        match self.try_get(req).await {
1030            Ok(Some(obj)) => return Fetch::Ready(obj),
1031            Ok(None) => return passive(req, passive_fetch),
1032            Err(err) => {
1033                tracing::warn!(
1034                    ?req,
1035                    "unable to fetch object; spawning a task to retry: {err:#}"
1036                );
1037            },
1038        }
1039
1040        // We'll use this channel to get the object back if we successfully load it on retry.
1041        let (send, recv) = oneshot::channel();
1042
1043        let fetcher = self.clone();
1044        let mut backoff = fetcher.backoff.clone();
1045        let span = tracing::warn_span!("get retry", ?req);
1046        spawn(
1047            async move {
1048                backoff.reset();
1049                let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1050                loop {
1051                    let res = {
1052                        // Limit the number of simultaneous retry tasks hitting the database. When
1053                        // the database is down, we might have a lot of these tasks running, and if
1054                        // they all hit the DB at once, they are only going to make things worse.
1055                        let _guard = fetcher.retry_semaphore.acquire().await;
1056                        fetcher.try_get(req).await
1057                    };
1058                    match res {
1059                        Ok(Some(obj)) => {
1060                            // If the object was immediately available after all, signal the
1061                            // original fetch. We probably just temporarily couldn't access it due
1062                            // to database errors.
1063                            tracing::info!(?req, "object was ready after retries");
1064                            send.send(obj).ok();
1065                            break;
1066                        },
1067                        Ok(None) => {
1068                            // The object was not immediately available after all, but we have
1069                            // successfully spawned a fetch for it if possible. The spawned fetch
1070                            // will notify the original request once it completes.
1071                            tracing::info!(?req, "spawned fetch after retries");
1072                            break;
1073                        },
1074                        Err(err) => {
1075                            tracing::warn!(
1076                                ?req,
1077                                ?delay,
1078                                "unable to fetch object, will retry: {err:#}"
1079                            );
1080                            sleep(delay).await;
1081                            if let Some(next_delay) = backoff.next_backoff() {
1082                                delay = next_delay;
1083                            }
1084                        },
1085                    }
1086                }
1087            }
1088            .instrument(span),
1089        );
1090
1091        // Wait for the object to be fetched, either from the local database on retry or from
1092        // another provider eventually.
1093        passive(req, select_some(passive_fetch, recv.map(Result::ok)))
1094    }
1095
1096    /// Try to get an object from local storage or initialize a fetch if it is missing.
1097    ///
1098    /// There are three possible scenarios in this function, indicated by the return type:
1099    /// * `Ok(Some(obj))`: the requested object was available locally and successfully retrieved
1100    ///   from the database; no fetch was spawned
1101    /// * `Ok(None)`: the requested object was not available locally, but a fetch was successfully
1102    ///   spawned if possible (in other words, if a fetch was not spawned, it was determined that
1103    ///   the requested object is not fetchable)
1104    /// * `Err(_)`: it could not be determined whether the object was available locally or whether
1105    ///   it could be fetched; no fetch was spawned even though the object may be fetchable
1106    async fn try_get<T>(self: &Arc<Self>, req: T::Request) -> anyhow::Result<Option<T>>
1107    where
1108        T: Fetchable<Types>,
1109    {
1110        let mut tx = self.read().await.context("opening read transaction")?;
1111        match T::load(&mut tx, req).await {
1112            Ok(t) => Ok(Some(t)),
1113            Err(QueryError::Missing | QueryError::NotFound) => {
1114                // We successfully queried the database, but the object wasn't there. Try to
1115                // fetch it.
1116                tracing::debug!(?req, "object missing from local storage, will try to fetch");
1117                self.fetch::<T>(&mut tx, req).await?;
1118                Ok(None)
1119            },
1120            Err(err) => {
1121                // An error occurred while querying the database. We don't know if we need to fetch
1122                // the object or not. Return an error so we can try again.
1123                bail!("failed to fetch resource {req:?} from local storage: {err:#}");
1124            },
1125        }
1126    }
1127
1128    /// Get a range of objects from local storage or a provider.
1129    ///
1130    /// Convert a finite stream of fallible local storage lookups into a (possibly infinite) stream
1131    /// of infallible fetches. Objects in `range` are loaded from local storage. Any gaps or missing
1132    /// objects are filled by fetching from a provider. Items in the resulting stream are futures
1133    /// that will never fail to produce a resource, although they may block indefinitely if the
1134    /// resource needs to be fetched.
1135    ///
1136    /// Objects are loaded and fetched in chunks, which strikes a good balance of limiting the total
1137    /// number of storage and network requests, while also keeping the amount of simultaneous
1138    /// resource consumption bounded.
1139    fn get_range<R, T>(self: Arc<Self>, range: R) -> BoxStream<'static, Fetch<T>>
1140    where
1141        R: RangeBounds<usize> + Send + 'static,
1142        T: RangedFetchable<Types>,
1143    {
1144        let chunk_size = self.range_chunk_size;
1145        self.get_range_with_chunk_size(chunk_size, range)
1146    }
1147
1148    /// Same as [`Self::get_range`], but uses the given chunk size instead of the default.
1149    fn get_range_with_chunk_size<R, T>(
1150        self: Arc<Self>,
1151        chunk_size: usize,
1152        range: R,
1153    ) -> BoxStream<'static, Fetch<T>>
1154    where
1155        R: RangeBounds<usize> + Send + 'static,
1156        T: RangedFetchable<Types>,
1157    {
1158        let chunk_fetch_delay = self.chunk_fetch_delay;
1159        let active_fetch_delay = self.active_fetch_delay;
1160
1161        stream::iter(range_chunks(range, chunk_size))
1162            .then(move |chunk| {
1163                let self_clone = self.clone();
1164                async move {
1165                    {
1166                        let chunk = self_clone.get_chunk(chunk).await;
1167
1168                        // Introduce a delay (`chunk_fetch_delay`) between fetching chunks. This
1169                        // helps to limit constant high CPU usage when fetching long range of data,
1170                        // especially for older streams that fetch most of the data from local
1171                        // storage.
1172                        sleep(chunk_fetch_delay).await;
1173                        stream::iter(chunk)
1174                    }
1175                }
1176            })
1177            .flatten()
1178            .then(move |f| async move {
1179                match f {
1180                    // Introduce a delay (`active_fetch_delay`) for active fetches to reduce load on
1181                    // the catchup provider. The delay applies between pending fetches, not between
1182                    // chunks.
1183                    Fetch::Pending(_) => sleep(active_fetch_delay).await,
1184                    Fetch::Ready(_) => (),
1185                };
1186                f
1187            })
1188            .boxed()
1189    }
1190
1191    /// Same as [`Self::get_range`], but yields objects in reverse order by height.
1192    ///
1193    /// Note that unlike [`Self::get_range`], which accepts any range and yields an infinite stream
1194    /// if the range has no upper bound, this function requires there to be a defined upper bound,
1195    /// otherwise we don't know where the reversed stream should _start_. The `end` bound given here
1196    /// is inclusive; i.e. the first item yielded by the stream will have height `end`.
1197    fn get_range_rev<T>(
1198        self: Arc<Self>,
1199        start: Bound<usize>,
1200        end: usize,
1201    ) -> BoxStream<'static, Fetch<T>>
1202    where
1203        T: RangedFetchable<Types>,
1204    {
1205        let chunk_size = self.range_chunk_size;
1206        self.get_range_with_chunk_size_rev(chunk_size, start, end)
1207    }
1208
1209    /// Same as [`Self::get_range_rev`], but uses the given chunk size instead of the default.
1210    fn get_range_with_chunk_size_rev<T>(
1211        self: Arc<Self>,
1212        chunk_size: usize,
1213        start: Bound<usize>,
1214        end: usize,
1215    ) -> BoxStream<'static, Fetch<T>>
1216    where
1217        T: RangedFetchable<Types>,
1218    {
1219        let chunk_fetch_delay = self.chunk_fetch_delay;
1220        let active_fetch_delay = self.active_fetch_delay;
1221
1222        stream::iter(range_chunks_rev(start, end, chunk_size))
1223            .then(move |chunk| {
1224                let self_clone = self.clone();
1225                async move {
1226                    {
1227                        let chunk = self_clone.get_chunk(chunk).await;
1228
1229                        // Introduce a delay (`chunk_fetch_delay`) between fetching chunks. This
1230                        // helps to limit constant high CPU usage when fetching long range of data,
1231                        // especially for older streams that fetch most of the data from local
1232                        // storage
1233                        sleep(chunk_fetch_delay).await;
1234                        stream::iter(chunk.into_iter().rev())
1235                    }
1236                }
1237            })
1238            .flatten()
1239            .then(move |f| async move {
1240                match f {
1241                    // Introduce a delay (`active_fetch_delay`) for active fetches to reduce load on
1242                    // the catchup provider. The delay applies between pending fetches, not between
1243                    // chunks.
1244                    Fetch::Pending(_) => sleep(active_fetch_delay).await,
1245                    Fetch::Ready(_) => (),
1246                };
1247                f
1248            })
1249            .boxed()
1250    }
1251
1252    /// Get a range of objects from local storage or a provider.
1253    ///
1254    /// This method is similar to `get_range`, except that:
1255    /// * It fetches all desired objects together, as a single chunk
1256    /// * It loads the object or triggers fetches right now rather than providing a lazy stream
1257    ///   which only fetches objects when polled.
1258    async fn get_chunk<T>(self: &Arc<Self>, chunk: Range<usize>) -> Vec<Fetch<T>>
1259    where
1260        T: RangedFetchable<Types>,
1261    {
1262        // Subscribe to notifications first. As in [`get`](Self::get), this ensures we won't miss
1263        // any notifications sent in between checking local storage and triggering a fetch if
1264        // necessary.
1265        let passive_fetches = join_all(
1266            chunk
1267                .clone()
1268                .map(|i| T::passive_fetch(&self.notifiers, i.into())),
1269        )
1270        .await;
1271
1272        match self.try_get_chunk(&chunk).await {
1273            Ok(objs) => {
1274                // Convert to fetches. Objects which are not immediately available (`None` in the
1275                // chunk) become passive fetches awaiting a notification of availability.
1276                return objs
1277                    .into_iter()
1278                    .zip(passive_fetches)
1279                    .enumerate()
1280                    .map(move |(i, (obj, passive_fetch))| match obj {
1281                        Some(obj) => Fetch::Ready(obj),
1282                        None => passive(T::Request::from(chunk.start + i), passive_fetch),
1283                    })
1284                    .collect();
1285            },
1286            Err(err) => {
1287                tracing::warn!(
1288                    ?chunk,
1289                    "unable to fetch chunk; spawning a task to retry: {err:#}"
1290                );
1291            },
1292        }
1293
1294        // We'll use these channels to get the objects back that we successfully load on retry.
1295        let (send, recv): (Vec<_>, Vec<_>) =
1296            repeat_with(oneshot::channel).take(chunk.len()).unzip();
1297
1298        {
1299            let fetcher = self.clone();
1300            let mut backoff = fetcher.backoff.clone();
1301            let chunk = chunk.clone();
1302            let span = tracing::warn_span!("get_chunk retry", ?chunk);
1303            spawn(
1304                async move {
1305                    backoff.reset();
1306                    let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1307                    loop {
1308                        let res = {
1309                            // Limit the number of simultaneous retry tasks hitting the database.
1310                            // When the database is down, we might have a lot of these tasks
1311                            // running, and if they all hit the DB at once, they are only going to
1312                            // make things worse.
1313                            let _guard = fetcher.retry_semaphore.acquire().await;
1314                            fetcher.try_get_chunk(&chunk).await
1315                        };
1316                        match res {
1317                            Ok(objs) => {
1318                                for (i, (obj, sender)) in objs.into_iter().zip(send).enumerate() {
1319                                    if let Some(obj) = obj {
1320                                        // If the object was immediately available after all, signal
1321                                        // the original fetch. We probably just temporarily couldn't
1322                                        // access it due to database errors.
1323                                        tracing::info!(?chunk, i, "object was ready after retries");
1324                                        sender.send(obj).ok();
1325                                    } else {
1326                                        // The object was not immediately available after all, but
1327                                        // we have successfully spawned a fetch for it if possible.
1328                                        // The spawned fetch will notify the original request once
1329                                        // it completes.
1330                                        tracing::info!(?chunk, i, "spawned fetch after retries");
1331                                    }
1332                                }
1333                                break;
1334                            },
1335                            Err(err) => {
1336                                tracing::warn!(
1337                                    ?chunk,
1338                                    ?delay,
1339                                    "unable to fetch chunk, will retry: {err:#}"
1340                                );
1341                                sleep(delay).await;
1342                                if let Some(next_delay) = backoff.next_backoff() {
1343                                    delay = next_delay;
1344                                }
1345                            },
1346                        }
1347                    }
1348                }
1349                .instrument(span),
1350            );
1351        }
1352
1353        // Wait for the objects to be fetched, either from the local database on retry or from
1354        // another provider eventually.
1355        passive_fetches
1356            .into_iter()
1357            .zip(recv)
1358            .enumerate()
1359            .map(move |(i, (passive_fetch, recv))| {
1360                passive(
1361                    T::Request::from(chunk.start + i),
1362                    select_some(passive_fetch, recv.map(Result::ok)),
1363                )
1364            })
1365            .collect()
1366    }
1367
1368    /// Try to get a range of objects from local storage, initializing fetches if any are missing.
1369    ///
1370    /// If this function succeeded, then for each object in the requested range, either:
1371    /// * the object was available locally, and corresponds to `Some(_)` object in the result
1372    /// * the object was not available locally (and corresponds to `None` in the result), but a
1373    ///   fetch was successfully spawned if possible (in other words, if a fetch was not spawned, it
1374    ///   was determined that the requested object is not fetchable)
1375    ///
1376    /// This function will fail if it could not be determined which objects in the requested range
1377    /// are available locally, or if, for any missing object, it could not be determined whether
1378    /// that object is fetchable. In this case, there may be no fetch spawned for certain objects in
1379    /// the requested range, even if those objects are actually fetchable.
1380    async fn try_get_chunk<T>(
1381        self: &Arc<Self>,
1382        chunk: &Range<usize>,
1383    ) -> anyhow::Result<Vec<Option<T>>>
1384    where
1385        T: RangedFetchable<Types>,
1386    {
1387        let mut tx = self.read().await.context("opening read transaction")?;
1388        let ts = T::load_range(&mut tx, chunk.clone())
1389            .await
1390            .context(format!("when fetching items in range {chunk:?}"))?;
1391
1392        // Log and discard error information; we want a list of Option where None indicates an
1393        // object that needs to be fetched. Note that we don't use `FetchRequest::might_exist` to
1394        // silence the logs here when an object is missing that is not expected to exist at all.
1395        // When objects are not expected to exist, `load_range` should just return a truncated list
1396        // rather than returning `Err` objects, so if there are errors in here they are unexpected
1397        // and we do want to log them.
1398        let ts = ts.into_iter().filter_map(ResultExt::ok_or_trace);
1399
1400        // Kick off a fetch for each missing object.
1401        let mut results = Vec::with_capacity(chunk.len());
1402        for t in ts {
1403            // Fetch missing objects that should come before `t`.
1404            while chunk.start + results.len() < t.height() as usize {
1405                tracing::debug!(
1406                    "item {} in chunk not available, will be fetched",
1407                    results.len()
1408                );
1409                self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1410                    .await?;
1411                results.push(None);
1412            }
1413
1414            results.push(Some(t));
1415        }
1416        // Fetch missing objects from the end of the range.
1417        while results.len() < chunk.len() {
1418            self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1419                .await?;
1420            results.push(None);
1421        }
1422
1423        Ok(results)
1424    }
1425
1426    /// Spawn an active fetch for the requested object, if possible.
1427    ///
1428    /// On success, either an active fetch for `req` has been spawned, or it has been determined
1429    /// that `req` is not fetchable. Fails if it cannot be determined (e.g. due to errors in the
1430    /// local database) whether `req` is fetchable or not.
1431    async fn fetch<T>(
1432        self: &Arc<Self>,
1433        tx: &mut <Self as VersionedDataSource>::ReadOnly<'_>,
1434        req: T::Request,
1435    ) -> anyhow::Result<()>
1436    where
1437        T: Fetchable<Types>,
1438    {
1439        tracing::debug!("fetching resource {req:?}");
1440
1441        // Trigger an active fetch from a remote provider if possible.
1442        let heights = Heights::load(tx)
1443            .await
1444            .context("failed to load heights; cannot definitively say object might exist")?;
1445        if req.might_exist(heights) {
1446            T::active_fetch(tx, self.clone(), req).await?;
1447        } else {
1448            tracing::debug!("not fetching object {req:?} that cannot exist at {heights:?}");
1449        }
1450        Ok(())
1451    }
1452
1453    /// Proactively search for and retrieve missing objects.
1454    ///
1455    /// This function will proactively identify and retrieve blocks and leaves which are missing
1456    /// from storage. It will run until cancelled, thus, it is meant to be spawned as a background
1457    /// task rather than called synchronously.
1458    async fn proactive_scan(
1459        self: Arc<Self>,
1460        interval: Duration,
1461        chunk_size: usize,
1462        metrics: ScannerMetrics,
1463    ) {
1464        for i in 0.. {
1465            let span = tracing::warn_span!("proactive scan", i);
1466            metrics.running.set(1);
1467            metrics.current_scan.set(i);
1468            async {
1469                let sync_status = {
1470                    match self.sync_status().await {
1471                        Ok(st) => st,
1472                        Err(err) => {
1473                            tracing::warn!(
1474                                "unable to load sync status, scan will be skipped: {err:#}"
1475                            );
1476                            return;
1477                        },
1478                    }
1479                };
1480                tracing::info!(?sync_status, "starting scan");
1481                metrics.missing_blocks.set(sync_status.blocks.missing);
1482                metrics.missing_vid.set(sync_status.vid_common.missing);
1483
1484                // Fetch missing blocks. This will also trigger a fetch for the corresponding
1485                // missing leaves.
1486                for range in sync_status.blocks.ranges {
1487                    metrics.scanned_blocks.set(range.start);
1488                    if range.status != SyncStatus::Missing {
1489                        metrics.scanned_blocks.set(range.end);
1490                        continue;
1491                    }
1492
1493                    tracing::info!(?range, "fetching missing block range");
1494
1495                    // Break the range into manageable, aligned chunks (which improves cacheability
1496                    // for the upstream server).
1497                    //
1498                    // We iterate in reverse order because leaves are inherently fetched in reverse,
1499                    // since we cannot (actively) fetch a leaf until we have the subsequent leaf,
1500                    // which tells us what the hash of its parent should be.
1501                    for chunk in range_chunks_aligned_rev(
1502                        Bound::Included(range.start),
1503                        range.end - 1,
1504                        chunk_size,
1505                    ) {
1506                        tracing::info!(?chunk, "fetching missing block chunk");
1507
1508                        // Fetching the payload metadata is enough to trigger an active fetch of the
1509                        // corresponding leaf and the full block if they are missing.
1510                        self.get::<NonEmptyRange<BlockQueryData<Types>>>(RangeRequest {
1511                            start: chunk.start as u64,
1512                            end: chunk.end as u64,
1513                        })
1514                        .await
1515                        .await;
1516
1517                        metrics
1518                            .missing_blocks
1519                            .update((chunk.start as i64) - (chunk.end as i64));
1520                        metrics.scanned_blocks.set(chunk.end);
1521                    }
1522                }
1523
1524                // Do the same for VID.
1525                for range in sync_status.vid_common.ranges {
1526                    metrics.scanned_vid.set(range.start);
1527                    if range.status != SyncStatus::Missing {
1528                        metrics.scanned_vid.set(range.end);
1529                        continue;
1530                    }
1531
1532                    tracing::info!(?range, "fetching missing VID range");
1533                    for chunk in range_chunks_aligned_rev(
1534                        Bound::Included(range.start),
1535                        range.end - 1,
1536                        chunk_size,
1537                    ) {
1538                        tracing::info!(?chunk, "fetching missing VID chunk");
1539                        self.get::<NonEmptyRange<VidCommonQueryData<Types>>>(RangeRequest {
1540                            start: chunk.start as u64,
1541                            end: chunk.end as u64,
1542                        })
1543                        .await
1544                        .await;
1545
1546                        metrics
1547                            .missing_vid
1548                            .update((chunk.start as i64) - (chunk.end as i64));
1549                        metrics.scanned_vid.set(chunk.end);
1550                    }
1551                }
1552
1553                tracing::info!("completed proactive scan, will scan again in {interval:?}");
1554
1555                // Reset metrics.
1556                metrics.running.set(0);
1557            }
1558            .instrument(span)
1559            .await;
1560
1561            sleep(interval).await;
1562        }
1563    }
1564}
1565
1566impl<Types, S, P> Fetcher<Types, S, P>
1567where
1568    Types: NodeType,
1569    Header<Types>: QueryableHeader<Types>,
1570    S: VersionedDataSource + 'static,
1571    for<'a> S::ReadOnly<'a>: NodeStorage<Types> + PrunedHeightStorage,
1572    P: Send + Sync,
1573{
1574    async fn sync_status(&self) -> anyhow::Result<SyncStatusQueryData> {
1575        // Check the cache first. This prevents the expensive sync_status queries from being run too
1576        // often, and also ensures that if two tasks try to get the sync status at the same time,
1577        // only one will actually compute it; the other will find the cache populated by the time it
1578        // gets a lock on the mutex.
1579        let mut cache = self.sync_status.lock().await;
1580        if let Some(sync_status) = cache.try_get() {
1581            return Ok(sync_status.clone());
1582        }
1583        tracing::debug!("updating sync status");
1584
1585        let heights = {
1586            let mut tx = self
1587                .read()
1588                .await
1589                .context("opening transaction to load heights")?;
1590            Heights::load(&mut tx).await.context("loading heights")?
1591        };
1592
1593        let mut res = SyncStatusQueryData {
1594            pruned_height: heights.pruned_height.map(|h| h as usize),
1595            ..Default::default()
1596        };
1597        let start = if let Some(height) = res.pruned_height {
1598            // Add an initial range for pruned data.
1599            let range = SyncStatusRange {
1600                status: SyncStatus::Pruned,
1601                start: 0,
1602                end: height + 1,
1603            };
1604            res.blocks.ranges.push(range);
1605            res.leaves.ranges.push(range);
1606            res.vid_common.ranges.push(range);
1607
1608            height + 1
1609        } else {
1610            0
1611        };
1612
1613        // Break the range into manageable chunks, so we don't hold any one database transaction
1614        // open for too long.
1615        for chunk in range_chunks(
1616            start..(heights.height as usize),
1617            self.sync_status_chunk_size,
1618        ) {
1619            tracing::debug!(chunk.start, chunk.end, "checking sync status in sub-range");
1620            let metrics = self.sync_status_metrics.start_range(&chunk);
1621            let mut tx = self
1622                .read()
1623                .await
1624                .context("opening transaction to sync status range")?;
1625            let range_status = tx
1626                .sync_status_for_range(chunk.start, chunk.end)
1627                .await
1628                .context(format!("checking sync status in sub-range {chunk:?}"))?;
1629            tracing::debug!(
1630                chunk.start,
1631                chunk.end,
1632                ?range_status,
1633                "found sync status for range"
1634            );
1635
1636            res.blocks.extend(range_status.blocks);
1637            res.leaves.extend(range_status.leaves);
1638            res.vid_common.extend(range_status.vid_common);
1639            metrics.end();
1640        }
1641
1642        cache.update(res.clone());
1643        Ok(res)
1644    }
1645}
1646
1647impl<Types, S, P> Fetcher<Types, S, P>
1648where
1649    Types: NodeType,
1650    Header<Types>: QueryableHeader<Types>,
1651    Payload<Types>: QueryablePayload<Types>,
1652    S: VersionedDataSource + 'static,
1653    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
1654    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
1655        + NodeStorage<Types>
1656        + PrunedHeightStorage
1657        + AggregatesStorage<Types>,
1658    P: AvailabilityProvider<Types>,
1659{
1660    #[tracing::instrument(skip_all)]
1661    async fn aggregate(self: Arc<Self>, chunk_size: usize, metrics: AggregatorMetrics) {
1662        loop {
1663            let prev_aggregate = loop {
1664                let mut tx = match self.read().await {
1665                    Ok(tx) => tx,
1666                    Err(err) => {
1667                        tracing::error!("unable to open read tx: {err:#}");
1668                        sleep(Duration::from_secs(5)).await;
1669                        continue;
1670                    },
1671                };
1672                match tx.load_prev_aggregate().await {
1673                    Ok(agg) => break agg,
1674                    Err(err) => {
1675                        tracing::error!("unable to load previous aggregate: {err:#}");
1676                        sleep(Duration::from_secs(5)).await;
1677                        continue;
1678                    },
1679                }
1680            };
1681
1682            let (start, mut prev_aggregate) = match prev_aggregate {
1683                Some(aggregate) => (aggregate.height as usize + 1, aggregate),
1684                None => (0, Aggregate::default()),
1685            };
1686
1687            tracing::info!(start, "starting aggregator");
1688            metrics.height.set(start);
1689
1690            let mut blocks = self
1691                .clone()
1692                .get_range_with_chunk_size::<_, PayloadMetadata<Types>>(chunk_size, start..)
1693                .then(Fetch::resolve)
1694                .ready_chunks(chunk_size)
1695                .boxed();
1696            while let Some(chunk) = blocks.next().await {
1697                let Some(last) = chunk.last() else {
1698                    // This is not supposed to happen, but if the chunk is empty, just skip it.
1699                    tracing::warn!("ready_chunks returned an empty chunk");
1700                    continue;
1701                };
1702                let height = last.height();
1703                let num_blocks = chunk.len();
1704                tracing::debug!(
1705                    num_blocks,
1706                    height,
1707                    "updating aggregate statistics for chunk"
1708                );
1709                loop {
1710                    let res = async {
1711                        let mut tx = self.write().await.context("opening transaction")?;
1712                        let aggregate =
1713                            tx.update_aggregates(prev_aggregate.clone(), &chunk).await?;
1714                        tx.commit().await.context("committing transaction")?;
1715                        prev_aggregate = aggregate;
1716                        anyhow::Result::<_>::Ok(())
1717                    }
1718                    .await;
1719                    match res {
1720                        Ok(()) => {
1721                            break;
1722                        },
1723                        Err(err) => {
1724                            tracing::warn!(
1725                                num_blocks,
1726                                height,
1727                                "failed to update aggregates for chunk: {err:#}"
1728                            );
1729                            sleep(Duration::from_secs(1)).await;
1730                        },
1731                    }
1732                }
1733                metrics.height.set(height as usize);
1734            }
1735            tracing::warn!("aggregator block stream ended unexpectedly; will restart");
1736        }
1737    }
1738}
1739
1740impl<Types, S, P> Fetcher<Types, S, P>
1741where
1742    Types: NodeType,
1743    S: VersionedDataSource,
1744    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1745{
1746    /// Store an object and notify anyone waiting on this object that it is available.
1747    async fn store_and_notify<T>(&self, obj: &T)
1748    where
1749        T: Storable<Types>,
1750    {
1751        self.store(obj).await;
1752
1753        // Send a notification about the newly received object. It is important that we do this
1754        // _after_ our attempt to store the object in local storage, otherwise there is a potential
1755        // missed notification deadlock:
1756        // * we send the notification
1757        // * a task calls [`get`](Self::get) or [`get_chunk`](Self::get_chunk), finds that the
1758        //   requested object is not in storage, and begins waiting for a notification
1759        // * we store the object. This ensures that no other task will be triggered to fetch it,
1760        //   which means no one will ever notify the waiting task.
1761        //
1762        // Note that we send the notification regardless of whether the store actually succeeded or
1763        // not. This is to avoid _another_ subtle deadlock: if we failed to notify just because we
1764        // failed to store, some fetches might not resolve, even though the object in question has
1765        // actually been fetched. This should actually be ok, because as long as the object is not
1766        // in storage, eventually some other task will come along and fetch, store, and notify about
1767        // it. However, this is certainly not ideal, since we could resolve those pending fetches
1768        // right now, and it causes bigger problems when the fetch that fails to resolve is the
1769        // proactive scanner task, who is often the one that would eventually come along and
1770        // re-fetch the object.
1771        //
1772        // The key thing to note is that it does no harm to notify even if we fail to store: at best
1773        // we wake some tasks up sooner; at worst, anyone who misses the notification still
1774        // satisfies the invariant that we only wait on notifications for objects which are not in
1775        // storage, and eventually some other task will come along, find the object missing from
1776        // storage, and re-fetch it.
1777        obj.notify(&self.notifiers).await;
1778    }
1779
1780    async fn store<T>(&self, obj: &T)
1781    where
1782        T: Storable<Types>,
1783    {
1784        let try_store = || async {
1785            let mut tx = self.storage.write().await?;
1786            obj.clone().store(&mut tx, self.leaf_only).await?;
1787            tx.commit().await
1788        };
1789
1790        // Store the object in local storage, so we can avoid fetching it in the future.
1791        let mut backoff = self.backoff.clone();
1792        backoff.reset();
1793        loop {
1794            let Err(err) = try_store().await else {
1795                break;
1796            };
1797            // It is unfortunate if this fails, but we can still proceed by notifying with the
1798            // object that we fetched, keeping it in memory. Log the error, retry a few times, and
1799            // eventually move on.
1800            tracing::warn!(
1801                obj = obj.debug_name(),
1802                "failed to store fetched object: {err:#}"
1803            );
1804
1805            let Some(delay) = backoff.next_backoff() else {
1806                break;
1807            };
1808            tracing::info!(?delay, "retrying failed operation");
1809            sleep(delay).await;
1810        }
1811    }
1812}
1813
1814#[derive(Debug)]
1815struct Notifiers<Types>
1816where
1817    Types: NodeType,
1818{
1819    block: Notifier<BlockQueryData<Types>>,
1820    leaf: Notifier<LeafQueryData<Types>>,
1821    vid_common: Notifier<VidCommonQueryData<Types>>,
1822}
1823
1824impl<Types> Default for Notifiers<Types>
1825where
1826    Types: NodeType,
1827{
1828    fn default() -> Self {
1829        Self {
1830            block: Notifier::new(),
1831            leaf: Notifier::new(),
1832            vid_common: Notifier::new(),
1833        }
1834    }
1835}
1836
1837#[derive(Clone, Copy, Debug)]
1838struct Heights {
1839    height: u64,
1840    pruned_height: Option<u64>,
1841}
1842
1843impl Heights {
1844    async fn load<Types, T>(tx: &mut T) -> anyhow::Result<Self>
1845    where
1846        Types: NodeType,
1847        Header<Types>: QueryableHeader<Types>,
1848        T: NodeStorage<Types> + PrunedHeightStorage + Send,
1849    {
1850        let height = tx.block_height().await.context("loading block height")? as u64;
1851        let pruned_height = tx
1852            .load_pruned_height()
1853            .await
1854            .context("loading pruned height")?;
1855        Ok(Self {
1856            height,
1857            pruned_height,
1858        })
1859    }
1860
1861    fn might_exist(self, h: u64) -> bool {
1862        h < self.height && self.pruned_height.is_none_or(|ph| h > ph)
1863    }
1864}
1865
1866#[async_trait]
1867impl<Types, S, P, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
1868    for FetchingDataSource<Types, S, P>
1869where
1870    Types: NodeType,
1871    S: VersionedDataSource + 'static,
1872    for<'a> S::ReadOnly<'a>: MerklizedStateStorage<Types, State, ARITY>,
1873    P: Send + Sync,
1874    State: MerklizedState<Types, ARITY> + 'static,
1875    <State as MerkleTreeScheme>::Commitment: Send,
1876{
1877    async fn get_path(
1878        &self,
1879        snapshot: Snapshot<Types, State, ARITY>,
1880        key: State::Key,
1881    ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
1882        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1883            message: err.to_string(),
1884        })?;
1885        tx.get_path(snapshot, key).await
1886    }
1887}
1888
1889#[async_trait]
1890impl<Types, S, P> MerklizedStateHeightPersistence for FetchingDataSource<Types, S, P>
1891where
1892    Types: NodeType,
1893    Header<Types>: QueryableHeader<Types>,
1894    Payload<Types>: QueryablePayload<Types>,
1895    S: VersionedDataSource + 'static,
1896    for<'a> S::ReadOnly<'a>: MerklizedStateHeightStorage,
1897    P: Send + Sync,
1898{
1899    async fn get_last_state_height(&self) -> QueryResult<usize> {
1900        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1901            message: err.to_string(),
1902        })?;
1903        tx.get_last_state_height().await
1904    }
1905}
1906
1907#[async_trait]
1908impl<Types, S, P> NodeDataSource<Types> for FetchingDataSource<Types, S, P>
1909where
1910    Types: NodeType,
1911    Header<Types>: QueryableHeader<Types>,
1912    S: VersionedDataSource + 'static,
1913    for<'a> S::ReadOnly<'a>: NodeStorage<Types> + PrunedHeightStorage,
1914    P: Send + Sync,
1915{
1916    async fn block_height(&self) -> QueryResult<usize> {
1917        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1918            message: err.to_string(),
1919        })?;
1920        tx.block_height().await
1921    }
1922
1923    async fn count_transactions_in_range(
1924        &self,
1925        range: impl RangeBounds<usize> + Send,
1926        namespace: Option<NamespaceId<Types>>,
1927    ) -> QueryResult<usize> {
1928        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1929            message: err.to_string(),
1930        })?;
1931        tx.count_transactions_in_range(range, namespace).await
1932    }
1933
1934    async fn payload_size_in_range(
1935        &self,
1936        range: impl RangeBounds<usize> + Send,
1937        namespace: Option<NamespaceId<Types>>,
1938    ) -> QueryResult<usize> {
1939        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1940            message: err.to_string(),
1941        })?;
1942        tx.payload_size_in_range(range, namespace).await
1943    }
1944
1945    async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
1946    where
1947        ID: Into<BlockId<Types>> + Send + Sync,
1948    {
1949        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1950            message: err.to_string(),
1951        })?;
1952        tx.vid_share(id).await
1953    }
1954
1955    async fn sync_status(&self) -> QueryResult<SyncStatusQueryData> {
1956        self.fetcher
1957            .sync_status()
1958            .await
1959            .map_err(|err| QueryError::Error {
1960                message: format!("{err:#}"),
1961            })
1962    }
1963
1964    async fn get_header_window(
1965        &self,
1966        start: impl Into<WindowStart<Types>> + Send + Sync,
1967        end: u64,
1968        limit: usize,
1969    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
1970        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1971            message: err.to_string(),
1972        })?;
1973        tx.get_header_window(start, end, limit).await
1974    }
1975}
1976
1977#[async_trait]
1978impl<Types, S, P> ExplorerDataSource<Types> for FetchingDataSource<Types, S, P>
1979where
1980    Types: NodeType,
1981    Payload<Types>: QueryablePayload<Types>,
1982    Header<Types>: QueryableHeader<Types> + explorer::traits::ExplorerHeader<Types>,
1983    crate::Transaction<Types>: explorer::traits::ExplorerTransaction<Types>,
1984    S: VersionedDataSource + 'static,
1985    for<'a> S::ReadOnly<'a>: ExplorerStorage<Types>,
1986    P: Send + Sync,
1987{
1988    async fn get_block_summaries(
1989        &self,
1990        request: explorer::query_data::GetBlockSummariesRequest<Types>,
1991    ) -> Result<
1992        Vec<explorer::query_data::BlockSummary<Types>>,
1993        explorer::query_data::GetBlockSummariesError,
1994    > {
1995        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1996            message: err.to_string(),
1997        })?;
1998        tx.get_block_summaries(request).await
1999    }
2000
2001    async fn get_block_detail(
2002        &self,
2003        request: explorer::query_data::BlockIdentifier<Types>,
2004    ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
2005    {
2006        let mut tx = self.read().await.map_err(|err| QueryError::Error {
2007            message: err.to_string(),
2008        })?;
2009        tx.get_block_detail(request).await
2010    }
2011
2012    async fn get_transaction_summaries(
2013        &self,
2014        request: explorer::query_data::GetTransactionSummariesRequest<Types>,
2015    ) -> Result<
2016        Vec<explorer::query_data::TransactionSummary<Types>>,
2017        explorer::query_data::GetTransactionSummariesError,
2018    > {
2019        let mut tx = self.read().await.map_err(|err| QueryError::Error {
2020            message: err.to_string(),
2021        })?;
2022        tx.get_transaction_summaries(request).await
2023    }
2024
2025    async fn get_transaction_detail(
2026        &self,
2027        request: explorer::query_data::TransactionIdentifier<Types>,
2028    ) -> Result<
2029        explorer::query_data::TransactionDetailResponse<Types>,
2030        explorer::query_data::GetTransactionDetailError,
2031    > {
2032        let mut tx = self.read().await.map_err(|err| QueryError::Error {
2033            message: err.to_string(),
2034        })?;
2035        tx.get_transaction_detail(request).await
2036    }
2037
2038    async fn get_explorer_summary(
2039        &self,
2040    ) -> Result<
2041        explorer::query_data::ExplorerSummary<Types>,
2042        explorer::query_data::GetExplorerSummaryError,
2043    > {
2044        let mut tx = self.read().await.map_err(|err| QueryError::Error {
2045            message: err.to_string(),
2046        })?;
2047        tx.get_explorer_summary().await
2048    }
2049
2050    async fn get_search_results(
2051        &self,
2052        query: TaggedBase64,
2053    ) -> Result<
2054        explorer::query_data::SearchResult<Types>,
2055        explorer::query_data::GetSearchResultsError,
2056    > {
2057        let mut tx = self.read().await.map_err(|err| QueryError::Error {
2058            message: err.to_string(),
2059        })?;
2060        tx.get_search_results(query).await
2061    }
2062}
2063
2064/// A provider which can be used as a fetcher by the availability service.
2065pub trait AvailabilityProvider<Types: NodeType>:
2066    Provider<Types, request::LeafRequest<Types>>
2067    + Provider<Types, request::LeafRangeRequest<Types>>
2068    + Provider<Types, request::PayloadRequest>
2069    + Provider<Types, request::BlockRangeRequest>
2070    + Provider<Types, request::VidCommonRequest>
2071    + Provider<Types, request::VidCommonRangeRequest>
2072    + Sync
2073    + 'static
2074{
2075}
2076impl<Types: NodeType, P> AvailabilityProvider<Types> for P where
2077    P: Provider<Types, request::LeafRequest<Types>>
2078        + Provider<Types, request::LeafRangeRequest<Types>>
2079        + Provider<Types, request::PayloadRequest>
2080        + Provider<Types, request::BlockRangeRequest>
2081        + Provider<Types, request::VidCommonRequest>
2082        + Provider<Types, request::VidCommonRangeRequest>
2083        + Sync
2084        + 'static
2085{
2086}
2087
2088trait FetchRequest: Copy + Debug + Send + Sync + 'static {
2089    /// Indicate whether it is possible this object could exist.
2090    ///
2091    /// This can filter out requests quickly for objects that cannot possibly exist, such as
2092    /// requests for objects with a height greater than the current block height. Not only does this
2093    /// let us fail faster for such requests (without touching storage at all), it also helps keep
2094    /// logging quieter when we fail to fetch an object because the user made a bad request, while
2095    /// still being fairly loud when we fail to fetch an object that might have really existed.
2096    ///
2097    /// This method is conservative: it returns `true` if it cannot tell whether the given object
2098    /// could exist or not.
2099    fn might_exist(self, _heights: Heights) -> bool {
2100        true
2101    }
2102}
2103
2104/// Objects which can be fetched from a remote DA provider and cached in local storage.
2105///
2106/// This trait lets us abstract over leaves, blocks, and other types that can be fetched. Thus, the
2107/// logistics of fetching are shared between all objects, and only the low-level particulars are
2108/// type-specific.
2109#[async_trait]
2110trait Fetchable<Types>: Clone + Send + Sync + 'static
2111where
2112    Types: NodeType,
2113    Header<Types>: QueryableHeader<Types>,
2114    Payload<Types>: QueryablePayload<Types>,
2115{
2116    /// A succinct specification of the object to be fetched.
2117    type Request: FetchRequest;
2118
2119    /// Does this object satisfy the given request?
2120    fn satisfies(&self, req: Self::Request) -> bool;
2121
2122    /// Spawn a task to fetch the object from a remote provider, if possible.
2123    ///
2124    /// An active fetch will only be triggered if:
2125    /// * There is not already an active fetch in progress for the same object
2126    /// * The requested object is known to exist. For example, we will fetch a leaf by height but
2127    ///   not by hash, since we can't guarantee that a leaf with an arbitrary hash exists. Note that
2128    ///   this function assumes `req.might_exist()` has already been checked before calling it, and
2129    ///   so may do unnecessary work if the caller does not ensure this.
2130    ///
2131    /// If we do trigger an active fetch for an object, any passive listeners for the object will be
2132    /// notified once it has been retrieved. If we do not trigger an active fetch for an object,
2133    /// this function does nothing. In either case, as long as the requested object does in fact
2134    /// exist, we will eventually receive it passively, since we will eventually receive all blocks
2135    /// and leaves that are ever produced. Active fetching merely helps us receive certain objects
2136    /// sooner.
2137    ///
2138    /// This function fails if it _might_ be possible to actively fetch the requested object, but we
2139    /// were unable to do so (e.g. due to errors in the database).
2140    async fn active_fetch<S, P>(
2141        tx: &mut impl AvailabilityStorage<Types>,
2142        fetcher: Arc<Fetcher<Types, S, P>>,
2143        req: Self::Request,
2144    ) -> anyhow::Result<()>
2145    where
2146        S: VersionedDataSource + 'static,
2147        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
2148        for<'a> S::ReadOnly<'a>:
2149            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
2150        P: AvailabilityProvider<Types>;
2151
2152    /// Wait for someone else to fetch the object.
2153    async fn passive_fetch(notifiers: &Notifiers<Types>, req: Self::Request) -> PassiveFetch<Self>;
2154
2155    /// Load an object from local storage.
2156    ///
2157    /// This function assumes `req.might_exist()` has already been checked before calling it, and so
2158    /// may do unnecessary work if the caller does not ensure this.
2159    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
2160    where
2161        S: AvailabilityStorage<Types>;
2162}
2163
2164type PassiveFetch<T> = BoxFuture<'static, Option<T>>;
2165
2166#[async_trait]
2167trait RangedFetchable<Types>: Fetchable<Types, Request = Self::RangedRequest> + HeightIndexed
2168where
2169    Types: NodeType,
2170    Header<Types>: QueryableHeader<Types>,
2171    Payload<Types>: QueryablePayload<Types>,
2172{
2173    type RangedRequest: FetchRequest + From<usize> + Send;
2174
2175    /// Load a range of these objects from local storage.
2176    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
2177    where
2178        S: AvailabilityStorage<Types>,
2179        R: RangeBounds<usize> + Send + 'static;
2180}
2181
2182/// An object which can be stored in the database.
2183trait Storable<Types: NodeType>: Clone {
2184    /// The name of this object, for debugging purposes.
2185    fn debug_name(&self) -> String;
2186
2187    /// Notify anyone waiting for this object that it has become available.
2188    fn notify(&self, notifiers: &Notifiers<Types>) -> impl Send + Future<Output = ()>;
2189
2190    /// Store the object in the local database.
2191    fn store(
2192        &self,
2193        storage: &mut impl UpdateAvailabilityStorage<Types>,
2194        leaf_only: bool,
2195    ) -> impl Send + Future<Output = anyhow::Result<()>>;
2196}
2197
2198impl<Types: NodeType> HeightIndexed
2199    for (LeafQueryData<Types>, Option<[CertificatePair<Types>; 2]>)
2200{
2201    fn height(&self) -> u64 {
2202        self.0.height()
2203    }
2204}
2205
2206impl<Types: NodeType> Storable<Types>
2207    for (LeafQueryData<Types>, Option<[CertificatePair<Types>; 2]>)
2208{
2209    fn debug_name(&self) -> String {
2210        format!("leaf {} with QC chain", self.0.height())
2211    }
2212
2213    async fn notify(&self, notifiers: &Notifiers<Types>) {
2214        self.0.notify(notifiers).await;
2215    }
2216
2217    async fn store(
2218        &self,
2219        storage: &mut impl UpdateAvailabilityStorage<Types>,
2220        _leaf_only: bool,
2221    ) -> anyhow::Result<()> {
2222        storage
2223            .insert_leaf_with_qc_chain(&self.0, self.1.clone())
2224            .await
2225    }
2226}
2227
2228/// Break a range into fixed-size chunks.
2229fn range_chunks<R>(range: R, chunk_size: usize) -> impl Iterator<Item = Range<usize>>
2230where
2231    R: RangeBounds<usize>,
2232{
2233    // Transform range to explicit start (inclusive) and end (exclusive) bounds.
2234    let Range { mut start, end } = range_to_bounds(range);
2235    std::iter::from_fn(move || {
2236        let chunk_end = min(start + chunk_size, end);
2237        if chunk_end == start {
2238            return None;
2239        }
2240
2241        let chunk = start..chunk_end;
2242        start = chunk_end;
2243        Some(chunk)
2244    })
2245}
2246
2247/// Break a range into fixed-alignment chunks.
2248///
2249/// Each chunk is of size `alignment`, and starts on a multiple of `alignment`, with the possible
2250/// exception of the first chunk (which may be misaligned and small) and the last (which may be
2251/// small).
2252#[allow(dead_code)]
2253fn range_chunks_aligned<R>(range: R, alignment: usize) -> impl Iterator<Item = Range<usize>>
2254where
2255    R: RangeBounds<usize>,
2256{
2257    // Transform range to explicit start (inclusive) and end (exclusive) bounds.
2258    let Range { mut start, end } = range_to_bounds(range);
2259
2260    // If necessary, generate a partial first chunk to force the remaining chunks into alignment.
2261    let first = if start.is_multiple_of(alignment) {
2262        None
2263    } else {
2264        // The partial first chunk ends at the next multiple of the alignment, or at the end of the
2265        // overall range, whichever comes first.
2266        let chunk_end = min(start.next_multiple_of(alignment), end);
2267        let chunk = start..chunk_end;
2268
2269        // Start the series of aligned chunks at the end of the partial first chunk.
2270        start = chunk_end;
2271        Some(chunk)
2272    };
2273
2274    first.into_iter().chain(range_chunks(start..end, alignment))
2275}
2276
2277/// Transform a range to explicit start (inclusive) and end (exclusive) bounds.
2278fn range_to_bounds(range: impl RangeBounds<usize>) -> Range<usize> {
2279    let start = match range.start_bound() {
2280        Bound::Included(i) => *i,
2281        Bound::Excluded(i) => *i + 1,
2282        Bound::Unbounded => 0,
2283    };
2284    let end = match range.end_bound() {
2285        Bound::Included(i) => *i + 1,
2286        Bound::Excluded(i) => *i,
2287        Bound::Unbounded => usize::MAX,
2288    };
2289    Range { start, end }
2290}
2291
2292/// Break a range into fixed-size chunks, starting from the end and moving towards the start.
2293///
2294/// While the chunks are yielded in reverse order, from `end` to `start`, each individual chunk is
2295/// in the usual ascending order. That is, the first chunk ends with `end` and the last chunk starts
2296/// with `start`.
2297///
2298/// Note that unlike [`range_chunks`], which accepts any range and yields an infinite iterator if
2299/// the range has no upper bound, this function requires there to be a defined upper bound,
2300/// otherwise we don't know where the reversed iterator should _start_. The `end` bound given here
2301/// is inclusive; i.e. the end of the first chunk yielded by the stream will be exactly `end`.
2302fn range_chunks_rev(
2303    start: Bound<usize>,
2304    end: usize,
2305    chunk_size: usize,
2306) -> impl Iterator<Item = Range<usize>> {
2307    // Transform the start bound to be inclusive.
2308    let start = match start {
2309        Bound::Included(i) => i,
2310        Bound::Excluded(i) => i + 1,
2311        Bound::Unbounded => 0,
2312    };
2313    // Transform the end bound to be exclusive.
2314    let mut end = end + 1;
2315
2316    std::iter::from_fn(move || {
2317        let chunk_start = max(start, end.saturating_sub(chunk_size));
2318        if end <= chunk_start {
2319            return None;
2320        }
2321
2322        let chunk = chunk_start..end;
2323        end = chunk_start;
2324        Some(chunk)
2325    })
2326}
2327
2328/// Break a range into fixed-alignment chunks, starting from the end and moving towards the start.
2329///
2330/// Each chunk is of size `alignment`, and starts on a multiple of `alignment` (that is, the lower
2331/// bound an _exclusive_ upper bound of each chunk are multiples of `alignment`), with the possible
2332/// exception of the first chunk (the last chunk in numerical order, which may be small) and the
2333/// last (which may be misaligned and small).
2334///
2335/// While the chunks are yielded in reverse order, from `end` to `start`, each individual chunk is
2336/// in the usual ascending order. That is, the first chunk ends with `end` and the last chunk starts
2337/// with `start`.
2338///
2339/// Note that unlike [`range_chunks_aligned`], which accepts any range and yields an infinite
2340/// iterator if the range has no upper bound, this function requires there to be a defined upper
2341/// bound, otherwise we don't know where the reversed iterator should _start_. The `end` bound given
2342/// here is inclusive; i.e. the end of the first chunk yielded by the stream will be exactly `end`.
2343fn range_chunks_aligned_rev(
2344    start: Bound<usize>,
2345    end: usize,
2346    alignment: usize,
2347) -> impl Iterator<Item = Range<usize>> {
2348    // Transform the start bound to be inclusive.
2349    let start = match start {
2350        Bound::Included(i) => i,
2351        Bound::Excluded(i) => i + 1,
2352        Bound::Unbounded => 0,
2353    };
2354    // Transform the end bound to be exclusive.
2355    let mut end = end + 1;
2356
2357    // If necessary, generate a partial first chunk to force the remaining chunks into alignment.
2358    let first = if end.is_multiple_of(alignment) {
2359        None
2360    } else {
2361        // The partial first chunk starts at the previous multiple of the alignment, or at the start
2362        // of the overall range, whichever comes first.
2363        let next_multiple = end.next_multiple_of(alignment);
2364        let prev_multiple = next_multiple - alignment;
2365        let chunk_start = max(prev_multiple, start);
2366        let chunk = chunk_start..end;
2367
2368        // Start the reverse series of aligned chunks at the start of the partial first chunk.
2369        end = chunk_start;
2370        Some(chunk)
2371    };
2372
2373    first
2374        .into_iter()
2375        .chain(range_chunks_rev(Bound::Included(start), end - 1, alignment))
2376}
2377
2378trait ResultExt<T, E> {
2379    fn ok_or_trace(self) -> Option<T>
2380    where
2381        E: Display;
2382}
2383
2384impl<T, E> ResultExt<T, E> for Result<T, E> {
2385    fn ok_or_trace(self) -> Option<T>
2386    where
2387        E: Display,
2388    {
2389        match self {
2390            Ok(t) => Some(t),
2391            Err(err) => {
2392                tracing::info!(
2393                    "error loading resource from local storage, will try to fetch: {err:#}"
2394                );
2395                None
2396            },
2397        }
2398    }
2399}
2400
2401#[derive(Debug)]
2402struct ScannerMetrics {
2403    /// Whether a scan is currently running (1) or not (0).
2404    running: Box<dyn Gauge>,
2405    /// The current number that is running.
2406    current_scan: Box<dyn Gauge>,
2407    /// Number of blocks processed in the current scan.
2408    scanned_blocks: Box<dyn Gauge>,
2409    /// Number of VID entries processed in the current scan.
2410    scanned_vid: Box<dyn Gauge>,
2411    /// The number of missing blocks discovered and not yet resolved in the current scan.
2412    missing_blocks: Box<dyn Gauge>,
2413    /// The number of missing VID entries discovered and not yet resolved in the current scan.
2414    missing_vid: Box<dyn Gauge>,
2415}
2416
2417impl ScannerMetrics {
2418    fn new(metrics: &PrometheusMetrics) -> Self {
2419        let group = metrics.subgroup("scanner".into());
2420        Self {
2421            running: group.create_gauge("running".into(), None),
2422            current_scan: group.create_gauge("current".into(), None),
2423            scanned_blocks: group.create_gauge("scanned_blocks".into(), None),
2424            scanned_vid: group.create_gauge("scanned_vid".into(), None),
2425            missing_blocks: group.create_gauge("missing_blocks".into(), None),
2426            missing_vid: group.create_gauge("missing_vid".into(), None),
2427        }
2428    }
2429}
2430
2431#[derive(Debug)]
2432struct AggregatorMetrics {
2433    /// The block height for which aggregate statistics are currently available.
2434    height: Box<dyn Gauge>,
2435}
2436
2437impl AggregatorMetrics {
2438    fn new(metrics: &PrometheusMetrics) -> Self {
2439        let group = metrics.subgroup("aggregator".into());
2440        Self {
2441            height: group.create_gauge("height".into(), None),
2442        }
2443    }
2444}
2445
2446#[derive(Debug)]
2447struct SyncStatusMetrics {
2448    current_range_start: Box<dyn Gauge>,
2449    current_range_end: Box<dyn Gauge>,
2450    current_start_time: Box<dyn Gauge>,
2451    avg_rate: Box<dyn Histogram>,
2452    ranges_scanned: Box<dyn Counter>,
2453    running: Box<dyn Gauge>,
2454}
2455
2456impl SyncStatusMetrics {
2457    fn new(metrics: &PrometheusMetrics, size: usize) -> Self {
2458        let group = metrics.subgroup("sync_status".into());
2459        group.create_gauge("range_size".into(), None).set(size);
2460
2461        Self {
2462            current_range_start: group.create_gauge("current_range_start".into(), None),
2463            current_range_end: group.create_gauge("current_range_end".into(), None),
2464            current_start_time: group
2465                .create_gauge("current_range_start_time".into(), Some("s".into())),
2466            avg_rate: group
2467                .create_histogram("avg_time_per_block_scanned".into(), Some("ms".into())),
2468            ranges_scanned: group.create_counter("ranges_scanned".into(), None),
2469            running: group.create_gauge("running".into(), None),
2470        }
2471    }
2472
2473    fn start_range(&self, range: &Range<usize>) -> SyncStatusRangeMetrics<'_> {
2474        let start = Utc::now();
2475        self.current_range_start.set(range.start);
2476        self.current_range_end.set(range.end);
2477        self.current_start_time.set(start.timestamp() as usize);
2478        self.running.set(1);
2479        SyncStatusRangeMetrics {
2480            size: range.end - range.start,
2481            start,
2482            metrics: self,
2483        }
2484    }
2485}
2486
2487#[must_use]
2488#[derive(Debug)]
2489struct SyncStatusRangeMetrics<'a> {
2490    size: usize,
2491    start: DateTime<Utc>,
2492    metrics: &'a SyncStatusMetrics,
2493}
2494
2495impl<'a> SyncStatusRangeMetrics<'a> {
2496    fn end(self) {
2497        let elapsed = Utc::now() - self.start;
2498        self.metrics
2499            .avg_rate
2500            .add_point((elapsed.num_milliseconds() as f64) / (self.size as f64));
2501        self.metrics.ranges_scanned.add(1);
2502        self.metrics.running.set(0);
2503    }
2504}
2505
2506#[derive(Debug)]
2507struct CachedSyncStatus {
2508    last_updated: Instant,
2509    ttl: Duration,
2510    cached: Option<SyncStatusQueryData>,
2511}
2512
2513impl CachedSyncStatus {
2514    fn new(ttl: Duration) -> Self {
2515        Self {
2516            last_updated: Instant::now(),
2517            ttl,
2518            cached: None,
2519        }
2520    }
2521
2522    /// Return the cached sync status, if present and fresh.
2523    fn try_get(&self) -> Option<&SyncStatusQueryData> {
2524        if self.last_updated.elapsed() > self.ttl {
2525            // Cached value is stale.
2526            return None;
2527        }
2528        self.cached.as_ref()
2529    }
2530
2531    /// Refresh the cache with an updated value.
2532    fn update(&mut self, value: SyncStatusQueryData) {
2533        self.last_updated = Instant::now();
2534        self.cached = Some(value);
2535    }
2536}
2537
2538/// Turn a fallible passive fetch future into an infallible "fetch".
2539///
2540/// Basically, we ignore failures due to a channel sender being dropped, which should never happen.
2541fn passive<T>(
2542    req: impl Debug + Send + 'static,
2543    fut: impl Future<Output = Option<T>> + Send + 'static,
2544) -> Fetch<T>
2545where
2546    T: Send + 'static,
2547{
2548    Fetch::Pending(
2549        fut.then(move |opt| async move {
2550            match opt {
2551                Some(t) => t,
2552                None => {
2553                    // If `passive_fetch` returns `None`, it means the notifier was dropped without
2554                    // ever sending a notification. In this case, the correct behavior is actually
2555                    // to block forever (unless the `Fetch` itself is dropped), since the semantics
2556                    // of `Fetch` are to never fail. This is analogous to fetching an object which
2557                    // doesn't actually exist: the `Fetch` will never return.
2558                    //
2559                    // However, for ease of debugging, and since this is never expected to happen in
2560                    // normal usage, we panic instead. This should only happen in two cases:
2561                    // * The server was shut down (dropping the notifier) without cleaning up some
2562                    //   background tasks. This will not affect runtime behavior, but should be
2563                    //   fixed if it happens.
2564                    // * There is a very unexpected runtime bug resulting in the notifier being
2565                    //   dropped. If this happens, things are very broken in any case, and it is
2566                    //   better to panic loudly than simply block forever.
2567                    panic!("notifier dropped without satisfying request {req:?}");
2568                },
2569            }
2570        })
2571        .boxed(),
2572    )
2573}
2574
2575/// Get the result of the first future to return `Some`, if either do.
2576async fn select_some<T>(
2577    a: impl Future<Output = Option<T>> + Unpin,
2578    b: impl Future<Output = Option<T>> + Unpin,
2579) -> Option<T> {
2580    match future::select(a, b).await {
2581        // If the first future resolves with `Some`, immediately return the result.
2582        Either::Left((Some(a), _)) => Some(a),
2583        Either::Right((Some(b), _)) => Some(b),
2584
2585        // If the first future resolves with `None`, wait for the result of the second future.
2586        Either::Left((None, b)) => b.await,
2587        Either::Right((None, a)) => a.await,
2588    }
2589}
2590
2591#[cfg(test)]
2592mod test {
2593    use hotshot_example_types::node_types::TEST_VERSIONS;
2594
2595    use super::*;
2596    use crate::{
2597        data_source::{
2598            sql::testing::TmpDb,
2599            storage::{SqlStorage, StorageConnectionType},
2600        },
2601        fetching::provider::NoFetching,
2602        testing::{consensus::MockSqlDataSource, mocks::MockTypes},
2603    };
2604
2605    #[test]
2606    fn test_range_chunks() {
2607        // Inclusive bounds, partial last chunk.
2608        assert_eq!(
2609            range_chunks(0..=4, 2).collect::<Vec<_>>(),
2610            [0..2, 2..4, 4..5]
2611        );
2612
2613        // Inclusive bounds, complete last chunk.
2614        assert_eq!(
2615            range_chunks(0..=5, 2).collect::<Vec<_>>(),
2616            [0..2, 2..4, 4..6]
2617        );
2618
2619        // Exclusive bounds, partial last chunk.
2620        assert_eq!(
2621            range_chunks(0..5, 2).collect::<Vec<_>>(),
2622            [0..2, 2..4, 4..5]
2623        );
2624
2625        // Exclusive bounds, complete last chunk.
2626        assert_eq!(
2627            range_chunks(0..6, 2).collect::<Vec<_>>(),
2628            [0..2, 2..4, 4..6]
2629        );
2630
2631        // Unbounded.
2632        assert_eq!(
2633            range_chunks(0.., 2).take(5).collect::<Vec<_>>(),
2634            [0..2, 2..4, 4..6, 6..8, 8..10]
2635        );
2636    }
2637
2638    #[test]
2639    fn test_range_chunks_aligned() {
2640        #![allow(clippy::single_range_in_vec_init)]
2641
2642        // Aligned first chunk, partial last chunk.
2643        assert_eq!(
2644            range_chunks_aligned(2..5, 2).collect::<Vec<_>>(),
2645            [2..4, 4..5]
2646        );
2647
2648        // Misaligned first chunk, complete last chunk.
2649        assert_eq!(
2650            range_chunks_aligned(1..4, 2).collect::<Vec<_>>(),
2651            [1..2, 2..4]
2652        );
2653
2654        // Incomplete chunk.
2655        assert_eq!(range_chunks_aligned(1..3, 10).collect::<Vec<_>>(), [1..3]);
2656
2657        // Unbounded.
2658        assert_eq!(
2659            range_chunks_aligned(1.., 2).take(5).collect::<Vec<_>>(),
2660            [1..2, 2..4, 4..6, 6..8, 8..10]
2661        );
2662    }
2663
2664    #[test]
2665    fn test_range_chunks_rev() {
2666        // Inclusive bounds, partial last chunk.
2667        assert_eq!(
2668            range_chunks_rev(Bound::Included(0), 4, 2).collect::<Vec<_>>(),
2669            [3..5, 1..3, 0..1]
2670        );
2671
2672        // Inclusive bounds, complete last chunk.
2673        assert_eq!(
2674            range_chunks_rev(Bound::Included(0), 5, 2).collect::<Vec<_>>(),
2675            [4..6, 2..4, 0..2]
2676        );
2677
2678        // Exclusive bounds, partial last chunk.
2679        assert_eq!(
2680            range_chunks_rev(Bound::Excluded(0), 5, 2).collect::<Vec<_>>(),
2681            [4..6, 2..4, 1..2]
2682        );
2683
2684        // Exclusive bounds, complete last chunk.
2685        assert_eq!(
2686            range_chunks_rev(Bound::Excluded(0), 4, 2).collect::<Vec<_>>(),
2687            [3..5, 1..3]
2688        );
2689    }
2690
2691    #[test]
2692    fn test_range_chunks_aligned_rev() {
2693        #![allow(clippy::single_range_in_vec_init)]
2694
2695        // Aligned first chunk, partial last chunk.
2696        assert_eq!(
2697            range_chunks_aligned_rev(Bound::Included(1), 3, 2).collect::<Vec<_>>(),
2698            [2..4, 1..2]
2699        );
2700
2701        // Misaligned first chunk, complete last chunk.
2702        assert_eq!(
2703            range_chunks_aligned_rev(Bound::Included(0), 2, 2).collect::<Vec<_>>(),
2704            [2..3, 0..2]
2705        );
2706
2707        // Incomplete chunk.
2708        assert_eq!(
2709            range_chunks_aligned_rev(Bound::Excluded(0), 3, 10).collect::<Vec<_>>(),
2710            [1..4]
2711        );
2712    }
2713
2714    async fn test_sync_status(chunk_size: usize, present_ranges: &[(usize, usize)]) {
2715        let block_height = present_ranges.last().unwrap().1;
2716        let storage = TmpDb::init().await;
2717        let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
2718            .await
2719            .unwrap();
2720        let ds = MockSqlDataSource::builder(db, NoFetching)
2721            .with_sync_status_chunk_size(chunk_size)
2722            .with_sync_status_ttl(Duration::ZERO)
2723            .build()
2724            .await
2725            .unwrap();
2726
2727        // Generate some mock leaves to insert.
2728        let mut leaves: Vec<LeafQueryData<MockTypes>> = vec![
2729            LeafQueryData::<MockTypes>::genesis(
2730                &Default::default(),
2731                &Default::default(),
2732                TEST_VERSIONS.test,
2733            )
2734            .await,
2735        ];
2736        for i in 1..block_height {
2737            let mut leaf = leaves[i - 1].clone();
2738            leaf.leaf.block_header_mut().block_number = i as u64;
2739            leaves.push(leaf);
2740        }
2741
2742        // Set up.
2743        {
2744            let mut tx = ds.write().await.unwrap();
2745
2746            for &(start, end) in present_ranges {
2747                for leaf in &leaves[start..end] {
2748                    tracing::info!(height = leaf.height(), "insert leaf");
2749                    tx.insert_leaf(leaf).await.unwrap();
2750                }
2751            }
2752
2753            if present_ranges[0].0 > 0 {
2754                tx.save_pruned_height((present_ranges[0].0 - 1) as u64)
2755                    .await
2756                    .unwrap();
2757            }
2758
2759            tx.commit().await.unwrap();
2760        }
2761
2762        let sync_status = ds.sync_status().await.unwrap().leaves;
2763
2764        // Verify missing.
2765        let present: usize = present_ranges.iter().map(|(start, end)| end - start).sum();
2766        assert_eq!(
2767            sync_status.missing,
2768            block_height - present - present_ranges[0].0
2769        );
2770
2771        // Verify ranges.
2772        let mut ranges = sync_status.ranges.into_iter();
2773        let mut prev = 0;
2774        for &(start, end) in present_ranges {
2775            if start != prev {
2776                let range = ranges.next().unwrap();
2777                assert_eq!(
2778                    range,
2779                    SyncStatusRange {
2780                        start: prev,
2781                        end: start,
2782                        status: if prev == 0 {
2783                            SyncStatus::Pruned
2784                        } else {
2785                            SyncStatus::Missing
2786                        },
2787                    }
2788                );
2789            }
2790            let range = ranges.next().unwrap();
2791            assert_eq!(
2792                range,
2793                SyncStatusRange {
2794                    start,
2795                    end,
2796                    status: SyncStatus::Present,
2797                }
2798            );
2799            prev = end;
2800        }
2801
2802        if prev != block_height {
2803            let range = ranges.next().unwrap();
2804            assert_eq!(
2805                range,
2806                SyncStatusRange {
2807                    start: prev,
2808                    end: block_height,
2809                    status: SyncStatus::Missing,
2810                }
2811            );
2812        }
2813
2814        assert_eq!(ranges.next(), None);
2815    }
2816
2817    #[tokio::test]
2818    #[test_log::test]
2819    async fn test_sync_status_multiple_chunks() {
2820        test_sync_status(10, &[(0, 1), (3, 5), (8, 10)]).await;
2821    }
2822
2823    #[tokio::test]
2824    #[test_log::test]
2825    async fn test_sync_status_multiple_chunks_present_range_overlapping_chunk() {
2826        test_sync_status(5, &[(1, 4)]).await;
2827    }
2828
2829    #[tokio::test]
2830    #[test_log::test]
2831    async fn test_sync_status_multiple_chunks_missing_range_overlapping_chunk() {
2832        test_sync_status(5, &[(0, 1), (4, 5)]).await;
2833    }
2834
2835    #[tokio::test]
2836    #[test_log::test]
2837    async fn test_load_range_incomplete() {
2838        let storage = TmpDb::init().await;
2839        let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
2840            .await
2841            .unwrap();
2842        {
2843            let mut tx = db.write().await.unwrap();
2844            tx.insert_leaf(
2845                &LeafQueryData::<MockTypes>::genesis(
2846                    &Default::default(),
2847                    &Default::default(),
2848                    TEST_VERSIONS.test,
2849                )
2850                .await,
2851            )
2852            .await
2853            .unwrap();
2854            tx.insert_block(
2855                &BlockQueryData::<MockTypes>::genesis(
2856                    &Default::default(),
2857                    &Default::default(),
2858                    TEST_VERSIONS.test.base,
2859                )
2860                .await,
2861            )
2862            .await
2863            .unwrap();
2864            tx.insert_vid(
2865                &VidCommonQueryData::<MockTypes>::genesis(
2866                    &Default::default(),
2867                    &Default::default(),
2868                    TEST_VERSIONS.test.base,
2869                )
2870                .await,
2871                None,
2872            )
2873            .await
2874            .unwrap();
2875            tx.commit().await.unwrap();
2876        }
2877
2878        let mut tx = db.read().await.unwrap();
2879        let req = RangeRequest { start: 0, end: 100 };
2880
2881        let err = <NonEmptyRange<BlockQueryData<MockTypes>>>::load(&mut tx, req)
2882            .await
2883            .unwrap_err();
2884        tracing::info!("loading partial block range failed as expected: {err:#}");
2885        assert!(matches!(err, QueryError::Missing));
2886
2887        let err =
2888            <NonEmptyRange<LeafQueryData<MockTypes>> as Fetchable<MockTypes>>::load(&mut tx, req)
2889                .await
2890                .unwrap_err();
2891        tracing::info!("loading partial leaf range failed as expected: {err:#}");
2892        assert!(matches!(err, QueryError::Missing));
2893
2894        let err = <NonEmptyRange<VidCommonQueryData<MockTypes>>>::load(&mut tx, req)
2895            .await
2896            .unwrap_err();
2897        tracing::info!("loading partial VID common range failed as expected: {err:#}");
2898        assert!(matches!(err, QueryError::Missing));
2899    }
2900}