Skip to main content

hotshot_query_service/data_source/
fetching.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Asynchronous retrieval of missing data.
14//!
15//! [`FetchingDataSource`] combines a local storage implementation with a remote data availability
16//! provider to create a data sources which caches data locally, but which is capable of fetching
17//! missing data from a remote source, either proactively or on demand.
18//!
19//! This implementation supports three kinds of data fetching.
20//!
21//! # Proactive Fetching
22//!
23//! Proactive fetching means actively scanning the local database for missing objects and
24//! proactively retrieving them from a remote provider, even if those objects have yet to be
25//! requested by a client. Doing this increases the chance of success and decreases latency when a
26//! client does eventually ask for those objects. This is also the mechanism by which a query
27//! service joining a network late, or having been offline for some time, is able to catch up with
28//! the events on the network that it missed.
29//!
30//! Proactive fetching is implemented by a background task which performs periodic scans of the
31//! database, identifying and retrieving missing objects. This task is generally low priority, since
32//! missing objects are rare, and it will take care not to monopolize resources that could be used
33//! to serve requests.
34//!
35//! # Active Fetching
36//!
37//! Active fetching means reaching out to a remote data availability provider to retrieve a missing
38//! resource, upon receiving a request for that resource from a client. Not every request for a
39//! missing resource triggers an active fetch. To avoid spamming peers with requests for missing
40//! data, we only actively fetch resources that are known to exist somewhere. This means we can
41//! actively fetch leaves and headers when we are requested a leaf or header by height, whose height
42//! is less than the current chain height. We can fetch a block when the corresponding header exists
43//! (corresponding based on height, hash, or payload hash) or can be actively fetched.
44//!
45//! # Passive Fetching
46//!
47//! For requests that cannot be actively fetched (for example, a block requested by hash, where we
48//! do not have a header proving that a block with that hash exists), we use passive fetching. This
49//! essentially means waiting passively until the query service receives an object that satisfies
50//! the request. This object may be received because it was actively fetched in responsive to a
51//! different request for the same object, one that permitted an active fetch. Or it may have been
52//! fetched [proactively](#proactive-fetching).
53
54use std::{
55    cmp::{max, min},
56    fmt::{Debug, Display},
57    iter::repeat_with,
58    marker::PhantomData,
59    ops::{Bound, Range, RangeBounds},
60    sync::Arc,
61    time::{Duration, Instant},
62};
63
64use anyhow::{Context, bail};
65use async_lock::Semaphore;
66use async_trait::async_trait;
67use backoff::{ExponentialBackoff, ExponentialBackoffBuilder, backoff::Backoff};
68use chrono::{DateTime, Utc};
69use derivative::Derivative;
70use futures::{
71    channel::oneshot,
72    future::{self, BoxFuture, Either, Future, FutureExt, join_all},
73    stream::{self, BoxStream, StreamExt},
74};
75use hotshot_types::{
76    data::VidShare,
77    simple_certificate::CertificatePair,
78    traits::{
79        metrics::{Counter, Gauge, Histogram, Metrics},
80        node_implementation::NodeType,
81    },
82};
83use jf_merkle_tree_compat::{MerkleTreeScheme, prelude::MerkleProof};
84use tagged_base64::TaggedBase64;
85use tokio::{spawn, sync::Mutex, time::sleep};
86use tracing::Instrument;
87
88use super::{
89    Transaction, VersionedDataSource,
90    notifier::Notifier,
91    storage::{
92        Aggregate, AggregatesStorage, AvailabilityStorage, ExplorerStorage,
93        MerklizedStateHeightStorage, MerklizedStateStorage, NodeStorage, UpdateAggregatesStorage,
94        UpdateAvailabilityStorage,
95        pruning::{PruneStorage, PrunedHeightDataSource, PrunedHeightStorage},
96    },
97};
98use crate::{
99    Header, Payload, QueryError, QueryResult,
100    availability::{
101        AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction,
102        Certificate2, Fetch, FetchStream, HeaderQueryData, LeafId, LeafQueryData, NamespaceId,
103        PayloadMetadata, PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash,
104        UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
105    },
106    data_source::fetching::{leaf::RangeRequest, vid::VidCommonRangeFetcher},
107    explorer::{self, ExplorerDataSource},
108    fetching::{self, NonEmptyRange, Provider, request},
109    merklized_state::{
110        MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
111    },
112    metrics::PrometheusMetrics,
113    node::{
114        NodeDataSource, SyncStatus, SyncStatusQueryData, SyncStatusRange, TimeWindowQueryData,
115        WindowStart,
116    },
117    status::{HasMetrics, StatusDataSource},
118    task::BackgroundTask,
119    types::HeightIndexed,
120};
121
122mod block;
123mod header;
124mod leaf;
125mod transaction;
126mod vid;
127
128use self::{
129    block::{PayloadFetcher, PayloadRangeFetcher},
130    leaf::{LeafFetcher, LeafRangeFetcher},
131    transaction::TransactionRequest,
132    vid::{VidCommonFetcher, VidCommonRequest},
133};
134
135/// Builder for [`FetchingDataSource`] with configuration.
136pub struct Builder<Types, S, P> {
137    storage: S,
138    provider: P,
139    backoff: ExponentialBackoffBuilder,
140    rate_limit: usize,
141    range_chunk_size: usize,
142    proactive_interval: Duration,
143    proactive_range_chunk_size: usize,
144    sync_status_chunk_size: usize,
145    active_fetch_delay: Duration,
146    chunk_fetch_delay: Duration,
147    proactive_fetching: bool,
148    aggregator: bool,
149    aggregator_chunk_size: Option<usize>,
150    leaf_only: bool,
151    sync_status_ttl: Duration,
152    _types: PhantomData<Types>,
153}
154
155impl<Types, S, P> Builder<Types, S, P> {
156    /// Construct a new builder with the given storage and fetcher and the default options.
157    pub fn new(storage: S, provider: P) -> Self {
158        let mut default_backoff = ExponentialBackoffBuilder::default();
159        default_backoff
160            .with_initial_interval(Duration::from_secs(1))
161            .with_multiplier(2.)
162            .with_max_interval(Duration::from_secs(32))
163            .with_max_elapsed_time(Some(Duration::from_secs(64)));
164
165        Self {
166            storage,
167            provider,
168            backoff: default_backoff,
169            rate_limit: 32,
170            range_chunk_size: 25,
171            proactive_interval: Duration::from_hours(8),
172            proactive_range_chunk_size: 100,
173            sync_status_chunk_size: 100_000,
174            active_fetch_delay: Duration::from_millis(50),
175            chunk_fetch_delay: Duration::from_millis(100),
176            proactive_fetching: true,
177            aggregator: true,
178            aggregator_chunk_size: None,
179            leaf_only: false,
180            sync_status_ttl: Duration::from_mins(5),
181            _types: Default::default(),
182        }
183    }
184
185    pub fn leaf_only(mut self) -> Self {
186        self.leaf_only = true;
187        self
188    }
189
190    /// Set the minimum delay between retries of failed operations.
191    pub fn with_min_retry_interval(mut self, interval: Duration) -> Self {
192        self.backoff.with_initial_interval(interval);
193        self
194    }
195
196    /// Set the maximum delay between retries of failed operations.
197    pub fn with_max_retry_interval(mut self, interval: Duration) -> Self {
198        self.backoff.with_max_interval(interval);
199        self
200    }
201
202    /// Set the multiplier for exponential backoff when retrying failed requests.
203    pub fn with_retry_multiplier(mut self, multiplier: f64) -> Self {
204        self.backoff.with_multiplier(multiplier);
205        self
206    }
207
208    /// Set the randomization factor for randomized backoff when retrying failed requests.
209    pub fn with_retry_randomization_factor(mut self, factor: f64) -> Self {
210        self.backoff.with_randomization_factor(factor);
211        self
212    }
213
214    /// Set the maximum time to retry failed operations before giving up.
215    pub fn with_retry_timeout(mut self, timeout: Duration) -> Self {
216        self.backoff.with_max_elapsed_time(Some(timeout));
217        self
218    }
219
220    /// Set the maximum number of simultaneous fetches.
221    pub fn with_rate_limit(mut self, with_rate_limit: usize) -> Self {
222        self.rate_limit = with_rate_limit;
223        self
224    }
225
226    /// Set the number of items to process at a time when loading a range or stream.
227    ///
228    /// This determines:
229    /// * The number of objects to load from storage in a single request
230    /// * The number of objects to buffer in memory per request/stream
231    /// * The number of concurrent notification subscriptions per request/stream
232    pub fn with_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
233        self.range_chunk_size = range_chunk_size;
234        self
235    }
236
237    /// Set the time interval between proactive fetching scans.
238    ///
239    /// See [proactive fetching](self#proactive-fetching).
240    pub fn with_proactive_interval(mut self, interval: Duration) -> Self {
241        self.proactive_interval = interval;
242        self
243    }
244
245    /// Set the number of items to process at a time when scanning for proactive fetching.
246    ///
247    /// This is similar to [`Self::with_range_chunk_size`], but only affects the chunk size for
248    /// proactive fetching scans, not for normal subscription streams. This can be useful to tune
249    /// the proactive scanner to be more or less greedy with persistent storage resources.
250    pub fn with_proactive_range_chunk_size(mut self, range_chunk_size: usize) -> Self {
251        self.proactive_range_chunk_size = range_chunk_size;
252        self
253    }
254
255    /// Set the number of items to process in a single transaction when scanning the database for
256    /// missing objects.
257    pub fn with_sync_status_chunk_size(mut self, chunk_size: usize) -> Self {
258        self.sync_status_chunk_size = chunk_size;
259        self
260    }
261
262    /// Duration to cache sync status results for.
263    ///
264    /// Computing the sync status is expensive, and it typically doesn't change that quickly. Thus,
265    /// it makes sense to cache the results whenever we do compute it, and return those cached
266    /// results if they are not too old.
267    pub fn with_sync_status_ttl(mut self, ttl: Duration) -> Self {
268        self.sync_status_ttl = ttl;
269        self
270    }
271
272    /// Add a delay between active fetches in proactive scans.
273    ///
274    /// This can be used to limit the rate at which this query service makes requests to other query
275    /// services during proactive scans. This is useful if the query service has a lot of blocks to
276    /// catch up on, as without a delay, scanning can be extremely burdensome on the peer.
277    pub fn with_active_fetch_delay(mut self, active_fetch_delay: Duration) -> Self {
278        self.active_fetch_delay = active_fetch_delay;
279        self
280    }
281
282    /// Adds a delay between chunk fetches during proactive scans.
283    ///
284    /// In a proactive scan, we retrieve a range of objects from a provider or local storage (e.g., a database).
285    /// Without a delay between fetching these chunks, the process can become very CPU-intensive, especially
286    /// when chunks are retrieved from local storage. While there is already a delay for active fetches
287    /// (`active_fetch_delay`), situations may arise when subscribed to an old stream that fetches most of the data
288    /// from local storage.
289    ///
290    /// This additional delay helps to limit constant maximum CPU usage
291    /// and ensures that local storage remains accessible to all processes,
292    /// not just the proactive scanner.
293    pub fn with_chunk_fetch_delay(mut self, chunk_fetch_delay: Duration) -> Self {
294        self.chunk_fetch_delay = chunk_fetch_delay;
295        self
296    }
297
298    /// Run without [proactive fetching](self#proactive-fetching).
299    ///
300    /// This can reduce load on the CPU and the database, but increases the probability that
301    /// requests will fail due to missing resources. If resources are constrained, it is recommended
302    /// to run with rare proactive fetching (see
303    /// [`with_major_scan_interval`](Self::with_major_scan_interval),
304    /// [`with_minor_scan_interval`](Self::with_minor_scan_interval)), rather than disabling it
305    /// entirely.
306    pub fn disable_proactive_fetching(mut self) -> Self {
307        self.proactive_fetching = false;
308        self
309    }
310
311    /// Run without an aggregator.
312    ///
313    /// This can reduce load on the CPU and the database, but it will cause aggregate statistics
314    /// (such as transaction counts) not to update.
315    pub fn disable_aggregator(mut self) -> Self {
316        self.aggregator = false;
317        self
318    }
319
320    /// Set the number of items to process at a time when computing aggregate statistics.
321    ///
322    /// This is similar to [`Self::with_range_chunk_size`], but only affects the chunk size for
323    /// the aggregator task, not for normal subscription streams. This can be useful to tune
324    /// the aggregator to be more or less greedy with persistent storage resources.
325    ///
326    /// By default (i.e. if this method is not called) the proactive range chunk size will be set to
327    /// whatever the normal range chunk size is.
328    pub fn with_aggregator_chunk_size(mut self, chunk_size: usize) -> Self {
329        self.aggregator_chunk_size = Some(chunk_size);
330        self
331    }
332
333    pub fn is_leaf_only(&self) -> bool {
334        self.leaf_only
335    }
336}
337
338impl<Types, S, P> Builder<Types, S, P>
339where
340    Types: NodeType,
341    Payload<Types>: QueryablePayload<Types>,
342    Header<Types>: QueryableHeader<Types>,
343    S: PruneStorage + VersionedDataSource + HasMetrics + 'static,
344    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
345        + PrunedHeightStorage
346        + NodeStorage<Types>
347        + AggregatesStorage<Types>,
348    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
349    P: AvailabilityProvider<Types>,
350{
351    /// Build a [`FetchingDataSource`] with these options.
352    pub async fn build(self) -> anyhow::Result<FetchingDataSource<Types, S, P>> {
353        FetchingDataSource::new(self).await
354    }
355}
356
357/// The most basic kind of data source.
358///
359/// A data source is constructed modularly by combining a [storage](super::storage) implementation
360/// with a [Fetcher](crate::fetching::Fetcher). The former allows the query service to store the
361/// data it has persistently in an easily accessible storage medium, such as the local file system
362/// or a database. This allows it to answer queries efficiently and to maintain its state across
363/// restarts. The latter allows the query service to fetch data that is missing from its storage
364/// from an external data availability provider, such as the Tiramisu DA network or another instance
365/// of the query service.
366///
367/// These two components of a data source are combined in [`FetchingDataSource`], which is the
368/// lowest level kind of data source available. It simply uses the storage implementation to fetch
369/// data when available, and fills in everything else using the fetcher. Various kinds of data
370/// sources can be constructed out of [`FetchingDataSource`] by changing the storage and fetcher
371/// implementations used, and more complex data sources can be built on top using data source
372/// combinators.
373#[derive(Derivative)]
374#[derivative(Clone(bound = ""), Debug(bound = "S: Debug, P: Debug"))]
375pub struct FetchingDataSource<Types, S, P>
376where
377    Types: NodeType,
378{
379    // The fetcher manages retrieval of resources from both local storage and a remote provider. It
380    // encapsulates the data which may need to be shared with a long-lived task or future that
381    // implements the asynchronous fetching of a particular object. This is why it gets its own
382    // type, wrapped in an [`Arc`] for easy, efficient cloning.
383    fetcher: Arc<Fetcher<Types, S, P>>,
384    // The proactive scanner task. This is only saved here so that we can cancel it on drop.
385    scanner: Option<BackgroundTask>,
386    // The aggregator task, which derives aggregate statistics from a block stream.
387    aggregator: Option<BackgroundTask>,
388    pruner: Pruner<Types, S>,
389}
390
391#[derive(Derivative)]
392#[derivative(Clone(bound = ""), Debug(bound = "S: Debug,   "))]
393pub struct Pruner<Types, S>
394where
395    Types: NodeType,
396{
397    handle: Option<BackgroundTask>,
398    _types: PhantomData<(Types, S)>,
399}
400
401impl<Types, S> Pruner<Types, S>
402where
403    Types: NodeType,
404    Header<Types>: QueryableHeader<Types>,
405    Payload<Types>: QueryablePayload<Types>,
406    S: PruneStorage + Send + Sync + 'static,
407{
408    async fn new(storage: Arc<S>, backoff: ExponentialBackoff) -> Self {
409        let cfg = storage.get_pruning_config();
410        let Some(cfg) = cfg else {
411            return Self {
412                handle: None,
413                _types: Default::default(),
414            };
415        };
416
417        let future = async move {
418            for i in 1.. {
419                // Delay before we start the pruner run to avoid a useless and expensive prune
420                // immediately on startup.
421                sleep(cfg.interval()).await;
422
423                tracing::warn!("starting pruner run {i} ");
424                Self::prune(storage.clone(), &backoff).await;
425            }
426        };
427
428        let task = BackgroundTask::spawn("pruner", future);
429
430        Self {
431            handle: Some(task),
432            _types: Default::default(),
433        }
434    }
435
436    async fn prune(storage: Arc<S>, backoff: &ExponentialBackoff) {
437        // We loop until the whole run pruner run is complete
438        let mut pruner = S::Pruner::default();
439        'run: loop {
440            let mut backoff = backoff.clone();
441            backoff.reset();
442            'batch: loop {
443                match storage.prune(&mut pruner).await {
444                    Ok(Some(height)) => {
445                        tracing::warn!("Pruned to height {height}");
446                        break 'batch;
447                    },
448                    Ok(None) => {
449                        tracing::warn!("pruner run complete.");
450                        break 'run;
451                    },
452                    Err(e) => {
453                        tracing::warn!("error pruning batch: {e:#}");
454                        if let Some(delay) = backoff.next_backoff() {
455                            sleep(delay).await;
456                        } else {
457                            tracing::error!("pruning run failed after too many errors: {e:#}");
458                            break 'run;
459                        }
460                    },
461                }
462            }
463        }
464    }
465}
466
467impl<Types, S, P> FetchingDataSource<Types, S, P>
468where
469    Types: NodeType,
470    Payload<Types>: QueryablePayload<Types>,
471    Header<Types>: QueryableHeader<Types>,
472    S: VersionedDataSource + PruneStorage + HasMetrics + 'static,
473    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
474    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
475        + NodeStorage<Types>
476        + PrunedHeightStorage
477        + AggregatesStorage<Types>,
478    P: AvailabilityProvider<Types>,
479{
480    /// Build a [`FetchingDataSource`] with the given `storage` and `provider`.
481    pub fn builder(storage: S, provider: P) -> Builder<Types, S, P> {
482        Builder::new(storage, provider)
483    }
484
485    async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
486        let leaf_only = builder.is_leaf_only();
487        let aggregator = builder.aggregator;
488        let aggregator_chunk_size = builder
489            .aggregator_chunk_size
490            .unwrap_or(builder.range_chunk_size);
491        let proactive_fetching = builder.proactive_fetching;
492        let proactive_interval = builder.proactive_interval;
493        let proactive_range_chunk_size = builder.proactive_range_chunk_size;
494        let backoff = builder.backoff.build();
495        let scanner_metrics = ScannerMetrics::new(builder.storage.metrics());
496        let aggregator_metrics = AggregatorMetrics::new(builder.storage.metrics());
497
498        let fetcher = Arc::new(Fetcher::new(builder).await?);
499        let scanner = if proactive_fetching && !leaf_only {
500            Some(BackgroundTask::spawn(
501                "proactive scanner",
502                fetcher.clone().proactive_scan(
503                    proactive_interval,
504                    proactive_range_chunk_size,
505                    scanner_metrics,
506                ),
507            ))
508        } else {
509            None
510        };
511
512        let aggregator = if aggregator && !leaf_only {
513            Some(BackgroundTask::spawn(
514                "aggregator",
515                fetcher
516                    .clone()
517                    .aggregate(aggregator_chunk_size, aggregator_metrics),
518            ))
519        } else {
520            None
521        };
522
523        let storage = fetcher.storage.clone();
524
525        let pruner = Pruner::new(storage, backoff).await;
526        let ds = Self {
527            fetcher,
528            scanner,
529            pruner,
530            aggregator,
531        };
532
533        Ok(ds)
534    }
535
536    /// Get a copy of the (shared) inner storage
537    pub fn inner(&self) -> Arc<S> {
538        self.fetcher.storage.clone()
539    }
540}
541
542impl<Types, S, P> AsRef<S> for FetchingDataSource<Types, S, P>
543where
544    Types: NodeType,
545{
546    fn as_ref(&self) -> &S {
547        &self.fetcher.storage
548    }
549}
550
551impl<Types, S, P> HasMetrics for FetchingDataSource<Types, S, P>
552where
553    Types: NodeType,
554    S: HasMetrics,
555{
556    fn metrics(&self) -> &PrometheusMetrics {
557        self.as_ref().metrics()
558    }
559}
560
561#[async_trait]
562impl<Types, S, P> StatusDataSource for FetchingDataSource<Types, S, P>
563where
564    Types: NodeType,
565    Header<Types>: QueryableHeader<Types>,
566    S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
567    for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
568    P: Send + Sync,
569{
570    async fn block_height(&self) -> QueryResult<usize> {
571        let mut tx = self.read().await.map_err(|err| QueryError::Error {
572            message: err.to_string(),
573        })?;
574        tx.block_height().await
575    }
576}
577
578#[async_trait]
579impl<Types, S, P> PrunedHeightDataSource for FetchingDataSource<Types, S, P>
580where
581    Types: NodeType,
582    S: VersionedDataSource + HasMetrics + Send + Sync + 'static,
583    for<'a> S::ReadOnly<'a>: PrunedHeightStorage,
584    P: Send + Sync,
585{
586    async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
587        let mut tx = self.read().await?;
588        tx.load_pruned_height().await
589    }
590}
591
592#[async_trait]
593impl<Types, S, P> AvailabilityDataSource<Types> for FetchingDataSource<Types, S, P>
594where
595    Types: NodeType,
596    Header<Types>: QueryableHeader<Types>,
597    Payload<Types>: QueryablePayload<Types>,
598    S: VersionedDataSource + 'static,
599    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
600    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
601    P: AvailabilityProvider<Types>,
602{
603    async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
604    where
605        ID: Into<LeafId<Types>> + Send + Sync,
606    {
607        self.fetcher.get(id.into()).await
608    }
609
610    async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
611    where
612        ID: Into<BlockId<Types>> + Send + Sync,
613    {
614        self.fetcher
615            .get::<HeaderQueryData<_>>(id.into())
616            .await
617            .map(|h| h.header)
618    }
619
620    async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
621    where
622        ID: Into<BlockId<Types>> + Send + Sync,
623    {
624        self.fetcher.get(id.into()).await
625    }
626
627    async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
628    where
629        ID: Into<BlockId<Types>> + Send + Sync,
630    {
631        self.fetcher.get(id.into()).await
632    }
633
634    async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
635    where
636        ID: Into<BlockId<Types>> + Send + Sync,
637    {
638        self.fetcher.get(id.into()).await
639    }
640
641    async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
642    where
643        ID: Into<BlockId<Types>> + Send + Sync,
644    {
645        self.fetcher.get(VidCommonRequest::from(id.into())).await
646    }
647
648    async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
649    where
650        ID: Into<BlockId<Types>> + Send + Sync,
651    {
652        self.fetcher.get(VidCommonRequest::from(id.into())).await
653    }
654
655    async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
656    where
657        R: RangeBounds<usize> + Send + 'static,
658    {
659        self.fetcher.clone().get_range(range)
660    }
661
662    async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
663    where
664        R: RangeBounds<usize> + Send + 'static,
665    {
666        self.fetcher.clone().get_range(range)
667    }
668
669    async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
670    where
671        R: RangeBounds<usize> + Send + 'static,
672    {
673        let leaves: FetchStream<LeafQueryData<Types>> = self.fetcher.clone().get_range(range);
674
675        leaves
676            .map(|fetch| fetch.map(|leaf| leaf.leaf.block_header().clone()))
677            .boxed()
678    }
679
680    async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
681    where
682        R: RangeBounds<usize> + Send + 'static,
683    {
684        self.fetcher.clone().get_range(range)
685    }
686
687    async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
688    where
689        R: RangeBounds<usize> + Send + 'static,
690    {
691        self.fetcher.clone().get_range(range)
692    }
693
694    async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
695    where
696        R: RangeBounds<usize> + Send + 'static,
697    {
698        self.fetcher.clone().get_range(range)
699    }
700
701    async fn get_vid_common_metadata_range<R>(
702        &self,
703        range: R,
704    ) -> FetchStream<VidCommonMetadata<Types>>
705    where
706        R: RangeBounds<usize> + Send + 'static,
707    {
708        self.fetcher.clone().get_range(range)
709    }
710
711    async fn get_leaf_range_rev(
712        &self,
713        start: Bound<usize>,
714        end: usize,
715    ) -> FetchStream<LeafQueryData<Types>> {
716        self.fetcher.clone().get_range_rev(start, end)
717    }
718
719    async fn get_block_range_rev(
720        &self,
721        start: Bound<usize>,
722        end: usize,
723    ) -> FetchStream<BlockQueryData<Types>> {
724        self.fetcher.clone().get_range_rev(start, end)
725    }
726
727    async fn get_payload_range_rev(
728        &self,
729        start: Bound<usize>,
730        end: usize,
731    ) -> FetchStream<PayloadQueryData<Types>> {
732        self.fetcher.clone().get_range_rev(start, end)
733    }
734
735    async fn get_payload_metadata_range_rev(
736        &self,
737        start: Bound<usize>,
738        end: usize,
739    ) -> FetchStream<PayloadMetadata<Types>> {
740        self.fetcher.clone().get_range_rev(start, end)
741    }
742
743    async fn get_vid_common_range_rev(
744        &self,
745        start: Bound<usize>,
746        end: usize,
747    ) -> FetchStream<VidCommonQueryData<Types>> {
748        self.fetcher.clone().get_range_rev(start, end)
749    }
750
751    async fn get_vid_common_metadata_range_rev(
752        &self,
753        start: Bound<usize>,
754        end: usize,
755    ) -> FetchStream<VidCommonMetadata<Types>> {
756        self.fetcher.clone().get_range_rev(start, end)
757    }
758
759    async fn get_block_containing_transaction(
760        &self,
761        h: TransactionHash<Types>,
762    ) -> Fetch<BlockWithTransaction<Types>> {
763        self.fetcher.clone().get(TransactionRequest::from(h)).await
764    }
765
766    async fn get_cert2(&self, height: u64) -> QueryResult<Option<Certificate2<Types>>> {
767        self.fetcher.get_cert2(height).await
768    }
769}
770
771impl<Types, S, P> UpdateAvailabilityData<Types> for FetchingDataSource<Types, S, P>
772where
773    Types: NodeType,
774    Header<Types>: QueryableHeader<Types>,
775    Payload<Types>: QueryablePayload<Types>,
776    S: VersionedDataSource + 'static,
777    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
778    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
779    P: AvailabilityProvider<Types>,
780{
781    async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
782        let height = info.height() as usize;
783
784        // Save the new decided leaf.
785        self.fetcher
786            .store(&(info.leaf.clone(), info.qc_chain, info.cert2.clone()))
787            .await;
788
789        // Trigger a fetch of the parent leaf, if we don't already have it.
790        leaf::trigger_fetch_for_parent(&self.fetcher, &info.leaf);
791
792        // Store and notify the block data and VID common, if available. Spawn a fetch to retrieve
793        // it, if not.
794        //
795        // Note a special case here: if the data was not available in the decide event, but _is_
796        // available locally in the database, without having to spawn a fetch for it, we _must_
797        // notify now. Thus, we must pattern match to distinguish `Fetch::Ready`/`Fetch::Pending`.
798        //
799        // Why? As soon as we inserted the leaf, the corresponding object may become available, if
800        // we already had an identical payload/VID common in the database, from a different block.
801        // Then calling `get()` will not spawn a fetch/notification, and existing fetches waiting
802        // for the newly decided object to arrive will miss it. Thus, if `get()` returned a `Ready`
803        // object, it is our responsibility, as the task processing newly decided objects, to make
804        // sure those fetches get notified.
805        let block = match info.block {
806            Some(block) => Some(block),
807            None => match self.fetcher.get::<BlockQueryData<Types>>(height).await {
808                Fetch::Ready(block) => Some(block),
809                Fetch::Pending(fut) => {
810                    let span = tracing::info_span!("fetch missing block", height);
811                    spawn(
812                        async move {
813                            tracing::info!("fetching missing block");
814                            fut.await;
815                        }
816                        .instrument(span),
817                    );
818                    None
819                },
820            },
821        };
822        if let Some(block) = &block {
823            self.fetcher.store(block).await;
824        }
825        let vid = match info.vid_common {
826            Some(vid) => Some(vid),
827            None => match self.fetcher.get::<VidCommonQueryData<Types>>(height).await {
828                Fetch::Ready(vid) => Some(vid),
829                Fetch::Pending(fut) => {
830                    let span = tracing::info_span!("fetch missing VID common", height);
831                    spawn(
832                        async move {
833                            tracing::info!("fetching missing VID common");
834                            fut.await;
835                        }
836                        .instrument(span),
837                    );
838                    None
839                },
840            },
841        };
842        if let Some(vid) = &vid {
843            self.fetcher.store(&(vid.clone(), info.vid_share)).await;
844        }
845
846        // Send notifications for the new objects after storing all of them. This ensures that as
847        // soon as a fetch for any of these objects resolves, the corresponding data will
848        // immediately be available. This isn't strictly required for correctness; after all,
849        // objects can generally be fetched as asynchronously as we want. But this is the most
850        // intuitive behavior to provide when possible.
851        info.leaf.notify(&self.fetcher.notifiers).await;
852        if let Some(block) = &block {
853            block.notify(&self.fetcher.notifiers).await;
854        }
855        if let Some(vid) = &vid {
856            vid.notify(&self.fetcher.notifiers).await;
857        }
858
859        Ok(())
860    }
861
862    /// Append a payload for a block whose leaf was already decided without one.
863    ///
864    /// In the new protocol, decide events can arrive before VID reconstruction
865    /// has produced the block payload, so [`append`](Self::append) may persist
866    /// a leaf with no payload attached. The payload is then back-filled here
867    /// once it becomes available, leaving the rest of the block info untouched.
868    async fn append_payload(&self, block: BlockQueryData<Types>) -> anyhow::Result<()> {
869        // Write to storage and notify any pending fetchers waiting on this height.
870        self.fetcher.store(&block).await;
871        block.notify(&self.fetcher.notifiers).await;
872        Ok(())
873    }
874}
875
876impl<Types, S, P> VersionedDataSource for FetchingDataSource<Types, S, P>
877where
878    Types: NodeType,
879    S: VersionedDataSource + Send + Sync,
880    P: Send + Sync,
881{
882    type Transaction<'a>
883        = S::Transaction<'a>
884    where
885        Self: 'a;
886    type ReadOnly<'a>
887        = S::ReadOnly<'a>
888    where
889        Self: 'a;
890
891    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
892        self.fetcher.write().await
893    }
894
895    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
896        self.fetcher.read().await
897    }
898}
899
900/// Asynchronous retrieval and storage of [`Fetchable`] resources.
901#[derive(Debug)]
902struct Fetcher<Types, S, P>
903where
904    Types: NodeType,
905{
906    storage: Arc<S>,
907    notifiers: Notifiers<Types>,
908    provider: Arc<P>,
909    leaf_fetcher: Arc<LeafFetcher<Types, S, P>>,
910    leaf_range_fetcher: Arc<LeafRangeFetcher<Types, S, P>>,
911    payload_fetcher: Option<Arc<PayloadFetcher<Types, S, P>>>,
912    payload_range_fetcher: Option<Arc<PayloadRangeFetcher<Types, S, P>>>,
913    vid_common_fetcher: Option<Arc<VidCommonFetcher<Types, S, P>>>,
914    vid_common_range_fetcher: Option<Arc<VidCommonRangeFetcher<Types, S, P>>>,
915    range_chunk_size: usize,
916    sync_status_chunk_size: usize,
917    // Duration to sleep after each active fetch,
918    active_fetch_delay: Duration,
919    // Duration to sleep after each chunk fetched
920    chunk_fetch_delay: Duration,
921    // Exponential backoff when retrying failed operations.
922    backoff: ExponentialBackoff,
923    // Semaphore limiting the number of simultaneous DB accesses we can have from tasks spawned to
924    // retry failed loads.
925    retry_semaphore: Arc<Semaphore>,
926    leaf_only: bool,
927    sync_status_metrics: SyncStatusMetrics,
928    sync_status: Mutex<CachedSyncStatus>,
929}
930
931impl<Types, S, P> VersionedDataSource for Fetcher<Types, S, P>
932where
933    Types: NodeType,
934    S: VersionedDataSource + Send + Sync,
935    P: Send + Sync,
936{
937    type Transaction<'a>
938        = S::Transaction<'a>
939    where
940        Self: 'a;
941    type ReadOnly<'a>
942        = S::ReadOnly<'a>
943    where
944        Self: 'a;
945
946    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
947        self.storage.write().await
948    }
949
950    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
951        self.storage.read().await
952    }
953}
954
955impl<Types, S, P> Fetcher<Types, S, P>
956where
957    Types: NodeType,
958    Header<Types>: QueryableHeader<Types>,
959    S: VersionedDataSource + HasMetrics + Sync,
960    for<'a> S::ReadOnly<'a>: PrunedHeightStorage + NodeStorage<Types>,
961{
962    pub async fn new(builder: Builder<Types, S, P>) -> anyhow::Result<Self> {
963        let retry_semaphore = Arc::new(Semaphore::new(builder.rate_limit));
964        let backoff = builder.backoff.build();
965
966        let (payload_fetcher, payload_range_fetcher, vid_common_fetcher, vid_common_range_fetcher) =
967            if builder.is_leaf_only() {
968                (None, None, None, None)
969            } else {
970                (
971                    Some(Arc::new(fetching::Fetcher::new(
972                        retry_semaphore.clone(),
973                        backoff.clone(),
974                    ))),
975                    Some(Arc::new(fetching::Fetcher::new(
976                        retry_semaphore.clone(),
977                        backoff.clone(),
978                    ))),
979                    Some(Arc::new(fetching::Fetcher::new(
980                        retry_semaphore.clone(),
981                        backoff.clone(),
982                    ))),
983                    Some(Arc::new(fetching::Fetcher::new(
984                        retry_semaphore.clone(),
985                        backoff.clone(),
986                    ))),
987                )
988            };
989        let leaf_fetcher = fetching::Fetcher::new(retry_semaphore.clone(), backoff.clone());
990        let leaf_range_fetcher = fetching::Fetcher::new(retry_semaphore.clone(), backoff.clone());
991
992        let leaf_only = builder.leaf_only;
993        let sync_status_metrics =
994            SyncStatusMetrics::new(builder.storage.metrics(), builder.sync_status_chunk_size);
995
996        Ok(Self {
997            storage: Arc::new(builder.storage),
998            notifiers: Default::default(),
999            provider: Arc::new(builder.provider),
1000            leaf_fetcher: Arc::new(leaf_fetcher),
1001            leaf_range_fetcher: Arc::new(leaf_range_fetcher),
1002            payload_fetcher,
1003            payload_range_fetcher,
1004            vid_common_fetcher,
1005            vid_common_range_fetcher,
1006            range_chunk_size: builder.range_chunk_size,
1007            sync_status_chunk_size: builder.sync_status_chunk_size,
1008            active_fetch_delay: builder.active_fetch_delay,
1009            chunk_fetch_delay: builder.chunk_fetch_delay,
1010            backoff,
1011            retry_semaphore,
1012            leaf_only,
1013            sync_status_metrics,
1014            sync_status: Mutex::new(CachedSyncStatus::new(builder.sync_status_ttl)),
1015        })
1016    }
1017}
1018
1019impl<Types, S, P> Fetcher<Types, S, P>
1020where
1021    Types: NodeType,
1022    Header<Types>: QueryableHeader<Types>,
1023    Payload<Types>: QueryablePayload<Types>,
1024    S: VersionedDataSource + 'static,
1025    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1026    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
1027    P: AvailabilityProvider<Types>,
1028{
1029    async fn get<T>(self: &Arc<Self>, req: impl Into<T::Request> + Send) -> Fetch<T>
1030    where
1031        T: Fetchable<Types>,
1032    {
1033        let req = req.into();
1034
1035        // Subscribe to notifications before we check storage for the requested object. This ensures
1036        // that this operation will always eventually succeed as long as the requested object
1037        // actually exists (or will exist). We will either find it in our local storage and succeed
1038        // immediately, or (if it exists) someone will *later* come and add it to storage, at which
1039        // point we will get a notification causing this passive fetch to resolve.
1040        //
1041        // Note the "someone" who later fetches the object and adds it to storage may be an active
1042        // fetch triggered by this very requests, in cases where that is possible, but it need not
1043        // be.
1044        let passive_fetch = T::passive_fetch(&self.notifiers, req).await;
1045
1046        match self.try_get(req).await {
1047            Ok(Some(obj)) => return Fetch::Ready(obj),
1048            Ok(None) => return passive(req, passive_fetch),
1049            Err(err) => {
1050                tracing::warn!(
1051                    ?req,
1052                    "unable to fetch object; spawning a task to retry: {err:#}"
1053                );
1054            },
1055        }
1056
1057        // We'll use this channel to get the object back if we successfully load it on retry.
1058        let (send, recv) = oneshot::channel();
1059
1060        let fetcher = self.clone();
1061        let mut backoff = fetcher.backoff.clone();
1062        let span = tracing::warn_span!("get retry", ?req);
1063        spawn(
1064            async move {
1065                backoff.reset();
1066                let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1067                loop {
1068                    let res = {
1069                        // Limit the number of simultaneous retry tasks hitting the database. When
1070                        // the database is down, we might have a lot of these tasks running, and if
1071                        // they all hit the DB at once, they are only going to make things worse.
1072                        let _guard = fetcher.retry_semaphore.acquire().await;
1073                        fetcher.try_get(req).await
1074                    };
1075                    match res {
1076                        Ok(Some(obj)) => {
1077                            // If the object was immediately available after all, signal the
1078                            // original fetch. We probably just temporarily couldn't access it due
1079                            // to database errors.
1080                            tracing::info!(?req, "object was ready after retries");
1081                            send.send(obj).ok();
1082                            break;
1083                        },
1084                        Ok(None) => {
1085                            // The object was not immediately available after all, but we have
1086                            // successfully spawned a fetch for it if possible. The spawned fetch
1087                            // will notify the original request once it completes.
1088                            tracing::info!(?req, "spawned fetch after retries");
1089                            break;
1090                        },
1091                        Err(err) => {
1092                            tracing::warn!(
1093                                ?req,
1094                                ?delay,
1095                                "unable to fetch object, will retry: {err:#}"
1096                            );
1097                            sleep(delay).await;
1098                            if let Some(next_delay) = backoff.next_backoff() {
1099                                delay = next_delay;
1100                            }
1101                        },
1102                    }
1103                }
1104            }
1105            .instrument(span),
1106        );
1107
1108        // Wait for the object to be fetched, either from the local database on retry or from
1109        // another provider eventually.
1110        passive(req, select_some(passive_fetch, recv.map(Result::ok)))
1111    }
1112
1113    /// Try to get an object from local storage or initialize a fetch if it is missing.
1114    ///
1115    /// There are three possible scenarios in this function, indicated by the return type:
1116    /// * `Ok(Some(obj))`: the requested object was available locally and successfully retrieved
1117    ///   from the database; no fetch was spawned
1118    /// * `Ok(None)`: the requested object was not available locally, but a fetch was successfully
1119    ///   spawned if possible (in other words, if a fetch was not spawned, it was determined that
1120    ///   the requested object is not fetchable)
1121    /// * `Err(_)`: it could not be determined whether the object was available locally or whether
1122    ///   it could be fetched; no fetch was spawned even though the object may be fetchable
1123    async fn try_get<T>(self: &Arc<Self>, req: T::Request) -> anyhow::Result<Option<T>>
1124    where
1125        T: Fetchable<Types>,
1126    {
1127        let mut tx = self.read().await.context("opening read transaction")?;
1128        match T::load(&mut tx, req).await {
1129            Ok(t) => Ok(Some(t)),
1130            Err(QueryError::Missing | QueryError::NotFound) => {
1131                // We successfully queried the database, but the object wasn't there. Try to
1132                // fetch it.
1133                tracing::debug!(?req, "object missing from local storage, will try to fetch");
1134                self.fetch::<T>(&mut tx, req).await?;
1135                Ok(None)
1136            },
1137            Err(err) => {
1138                // An error occurred while querying the database. We don't know if we need to fetch
1139                // the object or not. Return an error so we can try again.
1140                bail!("failed to fetch resource {req:?} from local storage: {err:#}");
1141            },
1142        }
1143    }
1144
1145    /// Get a range of objects from local storage or a provider.
1146    ///
1147    /// Convert a finite stream of fallible local storage lookups into a (possibly infinite) stream
1148    /// of infallible fetches. Objects in `range` are loaded from local storage. Any gaps or missing
1149    /// objects are filled by fetching from a provider. Items in the resulting stream are futures
1150    /// that will never fail to produce a resource, although they may block indefinitely if the
1151    /// resource needs to be fetched.
1152    ///
1153    /// Objects are loaded and fetched in chunks, which strikes a good balance of limiting the total
1154    /// number of storage and network requests, while also keeping the amount of simultaneous
1155    /// resource consumption bounded.
1156    fn get_range<R, T>(self: Arc<Self>, range: R) -> BoxStream<'static, Fetch<T>>
1157    where
1158        R: RangeBounds<usize> + Send + 'static,
1159        T: RangedFetchable<Types>,
1160    {
1161        let chunk_size = self.range_chunk_size;
1162        self.get_range_with_chunk_size(chunk_size, range)
1163    }
1164
1165    /// Same as [`Self::get_range`], but uses the given chunk size instead of the default.
1166    fn get_range_with_chunk_size<R, T>(
1167        self: Arc<Self>,
1168        chunk_size: usize,
1169        range: R,
1170    ) -> BoxStream<'static, Fetch<T>>
1171    where
1172        R: RangeBounds<usize> + Send + 'static,
1173        T: RangedFetchable<Types>,
1174    {
1175        let chunk_fetch_delay = self.chunk_fetch_delay;
1176        let active_fetch_delay = self.active_fetch_delay;
1177
1178        stream::iter(range_chunks(range, chunk_size))
1179            .then(move |chunk| {
1180                let self_clone = self.clone();
1181                async move {
1182                    {
1183                        let chunk = self_clone.get_chunk(chunk).await;
1184
1185                        // Introduce a delay (`chunk_fetch_delay`) between fetching chunks. This
1186                        // helps to limit constant high CPU usage when fetching long range of data,
1187                        // especially for older streams that fetch most of the data from local
1188                        // storage.
1189                        sleep(chunk_fetch_delay).await;
1190                        stream::iter(chunk)
1191                    }
1192                }
1193            })
1194            .flatten()
1195            .then(move |f| async move {
1196                match f {
1197                    // Introduce a delay (`active_fetch_delay`) for active fetches to reduce load on
1198                    // the catchup provider. The delay applies between pending fetches, not between
1199                    // chunks.
1200                    Fetch::Pending(_) => sleep(active_fetch_delay).await,
1201                    Fetch::Ready(_) => (),
1202                };
1203                f
1204            })
1205            .boxed()
1206    }
1207
1208    /// Same as [`Self::get_range`], but yields objects in reverse order by height.
1209    ///
1210    /// Note that unlike [`Self::get_range`], which accepts any range and yields an infinite stream
1211    /// if the range has no upper bound, this function requires there to be a defined upper bound,
1212    /// otherwise we don't know where the reversed stream should _start_. The `end` bound given here
1213    /// is inclusive; i.e. the first item yielded by the stream will have height `end`.
1214    fn get_range_rev<T>(
1215        self: Arc<Self>,
1216        start: Bound<usize>,
1217        end: usize,
1218    ) -> BoxStream<'static, Fetch<T>>
1219    where
1220        T: RangedFetchable<Types>,
1221    {
1222        let chunk_size = self.range_chunk_size;
1223        self.get_range_with_chunk_size_rev(chunk_size, start, end)
1224    }
1225
1226    /// Same as [`Self::get_range_rev`], but uses the given chunk size instead of the default.
1227    fn get_range_with_chunk_size_rev<T>(
1228        self: Arc<Self>,
1229        chunk_size: usize,
1230        start: Bound<usize>,
1231        end: usize,
1232    ) -> BoxStream<'static, Fetch<T>>
1233    where
1234        T: RangedFetchable<Types>,
1235    {
1236        let chunk_fetch_delay = self.chunk_fetch_delay;
1237        let active_fetch_delay = self.active_fetch_delay;
1238
1239        stream::iter(range_chunks_rev(start, end, chunk_size))
1240            .then(move |chunk| {
1241                let self_clone = self.clone();
1242                async move {
1243                    {
1244                        let chunk = self_clone.get_chunk(chunk).await;
1245
1246                        // Introduce a delay (`chunk_fetch_delay`) between fetching chunks. This
1247                        // helps to limit constant high CPU usage when fetching long range of data,
1248                        // especially for older streams that fetch most of the data from local
1249                        // storage
1250                        sleep(chunk_fetch_delay).await;
1251                        stream::iter(chunk.into_iter().rev())
1252                    }
1253                }
1254            })
1255            .flatten()
1256            .then(move |f| async move {
1257                match f {
1258                    // Introduce a delay (`active_fetch_delay`) for active fetches to reduce load on
1259                    // the catchup provider. The delay applies between pending fetches, not between
1260                    // chunks.
1261                    Fetch::Pending(_) => sleep(active_fetch_delay).await,
1262                    Fetch::Ready(_) => (),
1263                };
1264                f
1265            })
1266            .boxed()
1267    }
1268
1269    /// Get a range of objects from local storage or a provider.
1270    ///
1271    /// This method is similar to `get_range`, except that:
1272    /// * It fetches all desired objects together, as a single chunk
1273    /// * It loads the object or triggers fetches right now rather than providing a lazy stream
1274    ///   which only fetches objects when polled.
1275    async fn get_chunk<T>(self: &Arc<Self>, chunk: Range<usize>) -> Vec<Fetch<T>>
1276    where
1277        T: RangedFetchable<Types>,
1278    {
1279        // Subscribe to notifications first. As in [`get`](Self::get), this ensures we won't miss
1280        // any notifications sent in between checking local storage and triggering a fetch if
1281        // necessary.
1282        let passive_fetches = join_all(
1283            chunk
1284                .clone()
1285                .map(|i| T::passive_fetch(&self.notifiers, i.into())),
1286        )
1287        .await;
1288
1289        match self.try_get_chunk(&chunk).await {
1290            Ok(objs) => {
1291                // Convert to fetches. Objects which are not immediately available (`None` in the
1292                // chunk) become passive fetches awaiting a notification of availability.
1293                return objs
1294                    .into_iter()
1295                    .zip(passive_fetches)
1296                    .enumerate()
1297                    .map(move |(i, (obj, passive_fetch))| match obj {
1298                        Some(obj) => Fetch::Ready(obj),
1299                        None => passive(T::Request::from(chunk.start + i), passive_fetch),
1300                    })
1301                    .collect();
1302            },
1303            Err(err) => {
1304                tracing::warn!(
1305                    ?chunk,
1306                    "unable to fetch chunk; spawning a task to retry: {err:#}"
1307                );
1308            },
1309        }
1310
1311        // We'll use these channels to get the objects back that we successfully load on retry.
1312        let (send, recv): (Vec<_>, Vec<_>) =
1313            repeat_with(oneshot::channel).take(chunk.len()).unzip();
1314
1315        {
1316            let fetcher = self.clone();
1317            let mut backoff = fetcher.backoff.clone();
1318            let chunk = chunk.clone();
1319            let span = tracing::warn_span!("get_chunk retry", ?chunk);
1320            spawn(
1321                async move {
1322                    backoff.reset();
1323                    let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
1324                    loop {
1325                        let res = {
1326                            // Limit the number of simultaneous retry tasks hitting the database.
1327                            // When the database is down, we might have a lot of these tasks
1328                            // running, and if they all hit the DB at once, they are only going to
1329                            // make things worse.
1330                            let _guard = fetcher.retry_semaphore.acquire().await;
1331                            fetcher.try_get_chunk(&chunk).await
1332                        };
1333                        match res {
1334                            Ok(objs) => {
1335                                for (i, (obj, sender)) in objs.into_iter().zip(send).enumerate() {
1336                                    if let Some(obj) = obj {
1337                                        // If the object was immediately available after all, signal
1338                                        // the original fetch. We probably just temporarily couldn't
1339                                        // access it due to database errors.
1340                                        tracing::info!(?chunk, i, "object was ready after retries");
1341                                        sender.send(obj).ok();
1342                                    } else {
1343                                        // The object was not immediately available after all, but
1344                                        // we have successfully spawned a fetch for it if possible.
1345                                        // The spawned fetch will notify the original request once
1346                                        // it completes.
1347                                        tracing::info!(?chunk, i, "spawned fetch after retries");
1348                                    }
1349                                }
1350                                break;
1351                            },
1352                            Err(err) => {
1353                                tracing::warn!(
1354                                    ?chunk,
1355                                    ?delay,
1356                                    "unable to fetch chunk, will retry: {err:#}"
1357                                );
1358                                sleep(delay).await;
1359                                if let Some(next_delay) = backoff.next_backoff() {
1360                                    delay = next_delay;
1361                                }
1362                            },
1363                        }
1364                    }
1365                }
1366                .instrument(span),
1367            );
1368        }
1369
1370        // Wait for the objects to be fetched, either from the local database on retry or from
1371        // another provider eventually.
1372        passive_fetches
1373            .into_iter()
1374            .zip(recv)
1375            .enumerate()
1376            .map(move |(i, (passive_fetch, recv))| {
1377                passive(
1378                    T::Request::from(chunk.start + i),
1379                    select_some(passive_fetch, recv.map(Result::ok)),
1380                )
1381            })
1382            .collect()
1383    }
1384
1385    /// Try to get a range of objects from local storage, initializing fetches if any are missing.
1386    ///
1387    /// If this function succeeded, then for each object in the requested range, either:
1388    /// * the object was available locally, and corresponds to `Some(_)` object in the result
1389    /// * the object was not available locally (and corresponds to `None` in the result), but a
1390    ///   fetch was successfully spawned if possible (in other words, if a fetch was not spawned, it
1391    ///   was determined that the requested object is not fetchable)
1392    ///
1393    /// This function will fail if it could not be determined which objects in the requested range
1394    /// are available locally, or if, for any missing object, it could not be determined whether
1395    /// that object is fetchable. In this case, there may be no fetch spawned for certain objects in
1396    /// the requested range, even if those objects are actually fetchable.
1397    async fn try_get_chunk<T>(
1398        self: &Arc<Self>,
1399        chunk: &Range<usize>,
1400    ) -> anyhow::Result<Vec<Option<T>>>
1401    where
1402        T: RangedFetchable<Types>,
1403    {
1404        let mut tx = self.read().await.context("opening read transaction")?;
1405        let ts = T::load_range(&mut tx, chunk.clone())
1406            .await
1407            .context(format!("when fetching items in range {chunk:?}"))?;
1408
1409        // Log and discard error information; we want a list of Option where None indicates an
1410        // object that needs to be fetched. Note that we don't use `FetchRequest::might_exist` to
1411        // silence the logs here when an object is missing that is not expected to exist at all.
1412        // When objects are not expected to exist, `load_range` should just return a truncated list
1413        // rather than returning `Err` objects, so if there are errors in here they are unexpected
1414        // and we do want to log them.
1415        let ts = ts.into_iter().filter_map(ResultExt::ok_or_trace);
1416
1417        // Kick off a fetch for each missing object.
1418        let mut results = Vec::with_capacity(chunk.len());
1419        for t in ts {
1420            // Fetch missing objects that should come before `t`.
1421            while chunk.start + results.len() < t.height() as usize {
1422                tracing::debug!(
1423                    "item {} in chunk not available, will be fetched",
1424                    results.len()
1425                );
1426                self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1427                    .await?;
1428                results.push(None);
1429            }
1430
1431            results.push(Some(t));
1432        }
1433        // Fetch missing objects from the end of the range.
1434        while results.len() < chunk.len() {
1435            self.fetch::<T>(&mut tx, (chunk.start + results.len()).into())
1436                .await?;
1437            results.push(None);
1438        }
1439
1440        Ok(results)
1441    }
1442
1443    /// Spawn an active fetch for the requested object, if possible.
1444    ///
1445    /// On success, either an active fetch for `req` has been spawned, or it has been determined
1446    /// that `req` is not fetchable. Fails if it cannot be determined (e.g. due to errors in the
1447    /// local database) whether `req` is fetchable or not.
1448    async fn fetch<T>(
1449        self: &Arc<Self>,
1450        tx: &mut <Self as VersionedDataSource>::ReadOnly<'_>,
1451        req: T::Request,
1452    ) -> anyhow::Result<()>
1453    where
1454        T: Fetchable<Types>,
1455    {
1456        tracing::debug!("fetching resource {req:?}");
1457
1458        // Trigger an active fetch from a remote provider if possible.
1459        let heights = Heights::load(tx)
1460            .await
1461            .context("failed to load heights; cannot definitively say object might exist")?;
1462        if req.might_exist(heights) {
1463            T::active_fetch(tx, self.clone(), req).await?;
1464        } else {
1465            tracing::debug!("not fetching object {req:?} that cannot exist at {heights:?}");
1466        }
1467        Ok(())
1468    }
1469
1470    /// Proactively search for and retrieve missing objects.
1471    ///
1472    /// This function will proactively identify and retrieve blocks and leaves which are missing
1473    /// from storage. It will run until cancelled, thus, it is meant to be spawned as a background
1474    /// task rather than called synchronously.
1475    async fn proactive_scan(
1476        self: Arc<Self>,
1477        interval: Duration,
1478        chunk_size: usize,
1479        metrics: ScannerMetrics,
1480    ) {
1481        for i in 0.. {
1482            let span = tracing::warn_span!("proactive scan", i);
1483            metrics.running.set(1);
1484            metrics.current_scan.set(i);
1485            async {
1486                let sync_status = {
1487                    match self.sync_status().await {
1488                        Ok(st) => st,
1489                        Err(err) => {
1490                            tracing::warn!(
1491                                "unable to load sync status, scan will be skipped: {err:#}"
1492                            );
1493                            return;
1494                        },
1495                    }
1496                };
1497                tracing::info!(?sync_status, "starting scan");
1498                metrics.missing_blocks.set(sync_status.blocks.missing);
1499                metrics.missing_vid.set(sync_status.vid_common.missing);
1500
1501                // Fetch missing blocks. This will also trigger a fetch for the corresponding
1502                // missing leaves.
1503                for range in sync_status.blocks.ranges {
1504                    metrics.scanned_blocks.set(range.start);
1505                    if range.status != SyncStatus::Missing {
1506                        metrics.scanned_blocks.set(range.end);
1507                        continue;
1508                    }
1509
1510                    tracing::info!(?range, "fetching missing block range");
1511
1512                    // Break the range into manageable, aligned chunks (which improves cacheability
1513                    // for the upstream server).
1514                    for chunk in range_chunks_aligned(range.start..range.end, chunk_size) {
1515                        tracing::info!(?chunk, "fetching missing block chunk");
1516
1517                        // Fetching the payload metadata is enough to trigger an active fetch of the
1518                        // corresponding leaf and the full block if they are missing.
1519                        self.get::<NonEmptyRange<BlockQueryData<Types>>>(RangeRequest {
1520                            start: chunk.start as u64,
1521                            end: chunk.end as u64,
1522                        })
1523                        .await
1524                        .await;
1525
1526                        metrics
1527                            .missing_blocks
1528                            .update((chunk.start as i64) - (chunk.end as i64));
1529                        metrics.scanned_blocks.set(chunk.end);
1530                    }
1531                }
1532
1533                // Do the same for VID.
1534                for range in sync_status.vid_common.ranges {
1535                    metrics.scanned_vid.set(range.start);
1536                    if range.status != SyncStatus::Missing {
1537                        metrics.scanned_vid.set(range.end);
1538                        continue;
1539                    }
1540
1541                    tracing::info!(?range, "fetching missing VID range");
1542                    for chunk in range_chunks_aligned(range.start..range.end, chunk_size) {
1543                        tracing::info!(?chunk, "fetching missing VID chunk");
1544                        self.get::<NonEmptyRange<VidCommonQueryData<Types>>>(RangeRequest {
1545                            start: chunk.start as u64,
1546                            end: chunk.end as u64,
1547                        })
1548                        .await
1549                        .await;
1550
1551                        metrics
1552                            .missing_vid
1553                            .update((chunk.start as i64) - (chunk.end as i64));
1554                        metrics.scanned_vid.set(chunk.end);
1555                    }
1556                }
1557
1558                tracing::info!("completed proactive scan, will scan again in {interval:?}");
1559
1560                // Reset metrics.
1561                metrics.running.set(0);
1562            }
1563            .instrument(span)
1564            .await;
1565
1566            sleep(interval).await;
1567        }
1568    }
1569}
1570
1571impl<Types, S, P> Fetcher<Types, S, P>
1572where
1573    Types: NodeType,
1574    Header<Types>: QueryableHeader<Types>,
1575    S: VersionedDataSource + 'static,
1576    for<'a> S::ReadOnly<'a>: NodeStorage<Types> + PrunedHeightStorage,
1577    P: Send + Sync,
1578{
1579    async fn sync_status(&self) -> anyhow::Result<SyncStatusQueryData> {
1580        // Check the cache first. This prevents the expensive sync_status queries from being run too
1581        // often, and also ensures that if two tasks try to get the sync status at the same time,
1582        // only one will actually compute it; the other will find the cache populated by the time it
1583        // gets a lock on the mutex.
1584        let mut cache = self.sync_status.lock().await;
1585        if let Some(sync_status) = cache.try_get() {
1586            return Ok(sync_status.clone());
1587        }
1588        tracing::debug!("updating sync status");
1589
1590        let heights = {
1591            let mut tx = self
1592                .read()
1593                .await
1594                .context("opening transaction to load heights")?;
1595            Heights::load(&mut tx).await.context("loading heights")?
1596        };
1597
1598        let mut res = SyncStatusQueryData {
1599            pruned_height: heights.pruned_height.map(|h| h as usize),
1600            ..Default::default()
1601        };
1602        let start = if let Some(height) = res.pruned_height {
1603            // Add an initial range for pruned data.
1604            let range = SyncStatusRange {
1605                status: SyncStatus::Pruned,
1606                start: 0,
1607                end: height + 1,
1608            };
1609            res.blocks.ranges.push(range);
1610            res.leaves.ranges.push(range);
1611            res.vid_common.ranges.push(range);
1612
1613            height + 1
1614        } else {
1615            0
1616        };
1617
1618        // Break the range into manageable chunks, so we don't hold any one database transaction
1619        // open for too long.
1620        for chunk in range_chunks(
1621            start..(heights.height as usize),
1622            self.sync_status_chunk_size,
1623        ) {
1624            tracing::debug!(chunk.start, chunk.end, "checking sync status in sub-range");
1625            let metrics = self.sync_status_metrics.start_range(&chunk);
1626            let mut tx = self
1627                .read()
1628                .await
1629                .context("opening transaction to sync status range")?;
1630            let range_status = tx
1631                .sync_status_for_range(chunk.start, chunk.end)
1632                .await
1633                .context(format!("checking sync status in sub-range {chunk:?}"))?;
1634            tracing::debug!(
1635                chunk.start,
1636                chunk.end,
1637                ?range_status,
1638                "found sync status for range"
1639            );
1640
1641            res.blocks.extend(range_status.blocks);
1642            res.leaves.extend(range_status.leaves);
1643            res.vid_common.extend(range_status.vid_common);
1644            metrics.end();
1645        }
1646
1647        cache.update(res.clone());
1648        Ok(res)
1649    }
1650}
1651
1652impl<Types, S, P> Fetcher<Types, S, P>
1653where
1654    Types: NodeType,
1655    Header<Types>: QueryableHeader<Types>,
1656    Payload<Types>: QueryablePayload<Types>,
1657    S: VersionedDataSource + 'static,
1658    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types> + UpdateAggregatesStorage<Types>,
1659    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types>
1660        + NodeStorage<Types>
1661        + PrunedHeightStorage
1662        + AggregatesStorage<Types>,
1663    P: AvailabilityProvider<Types>,
1664{
1665    #[tracing::instrument(skip_all)]
1666    async fn aggregate(self: Arc<Self>, chunk_size: usize, metrics: AggregatorMetrics) {
1667        loop {
1668            let prev_aggregate = loop {
1669                let mut tx = match self.read().await {
1670                    Ok(tx) => tx,
1671                    Err(err) => {
1672                        tracing::error!("unable to open read tx: {err:#}");
1673                        sleep(Duration::from_secs(5)).await;
1674                        continue;
1675                    },
1676                };
1677                match tx.load_prev_aggregate().await {
1678                    Ok(agg) => break agg,
1679                    Err(err) => {
1680                        tracing::error!("unable to load previous aggregate: {err:#}");
1681                        sleep(Duration::from_secs(5)).await;
1682                        continue;
1683                    },
1684                }
1685            };
1686
1687            let (start, mut prev_aggregate) = match prev_aggregate {
1688                Some(aggregate) => (aggregate.height as usize + 1, aggregate),
1689                None => (0, Aggregate::default()),
1690            };
1691
1692            tracing::info!(start, "starting aggregator");
1693            metrics.height.set(start);
1694
1695            let mut blocks = self
1696                .clone()
1697                .get_range_with_chunk_size::<_, PayloadMetadata<Types>>(chunk_size, start..)
1698                .then(Fetch::resolve)
1699                .ready_chunks(chunk_size)
1700                .boxed();
1701            while let Some(chunk) = blocks.next().await {
1702                let Some(last) = chunk.last() else {
1703                    // This is not supposed to happen, but if the chunk is empty, just skip it.
1704                    tracing::warn!("ready_chunks returned an empty chunk");
1705                    continue;
1706                };
1707                let height = last.height();
1708                let num_blocks = chunk.len();
1709                tracing::debug!(
1710                    num_blocks,
1711                    height,
1712                    "updating aggregate statistics for chunk"
1713                );
1714                loop {
1715                    let res = async {
1716                        let mut tx = self.write().await.context("opening transaction")?;
1717                        let aggregate =
1718                            tx.update_aggregates(prev_aggregate.clone(), &chunk).await?;
1719                        tx.commit().await.context("committing transaction")?;
1720                        prev_aggregate = aggregate;
1721                        anyhow::Result::<_>::Ok(())
1722                    }
1723                    .await;
1724                    match res {
1725                        Ok(()) => {
1726                            break;
1727                        },
1728                        Err(err) => {
1729                            tracing::warn!(
1730                                num_blocks,
1731                                height,
1732                                "failed to update aggregates for chunk: {err:#}"
1733                            );
1734                            sleep(Duration::from_secs(1)).await;
1735                        },
1736                    }
1737                }
1738                metrics.height.set(height as usize);
1739            }
1740            tracing::warn!("aggregator block stream ended unexpectedly; will restart");
1741        }
1742    }
1743}
1744
1745impl<Types, S, P> Fetcher<Types, S, P>
1746where
1747    Types: NodeType,
1748    Header<Types>: QueryableHeader<Types>,
1749    Payload<Types>: QueryablePayload<Types>,
1750    S: VersionedDataSource + 'static,
1751    for<'a> S::ReadOnly<'a>: NodeStorage<Types>,
1752    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1753    P: AvailabilityProvider<Types>,
1754{
1755    /// Load a cert2 from availability storage
1756    async fn get_cert2(self: &Arc<Self>, height: u64) -> QueryResult<Option<Certificate2<Types>>> {
1757        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1758            message: err.to_string(),
1759        })?;
1760
1761        if let Some(cert2) = tx.load_cert2(height).await? {
1762            return Ok(Some(cert2));
1763        }
1764
1765        drop(tx);
1766
1767        let Some(cert2) = self
1768            .provider
1769            .fetch(request::Certificate2Request { height })
1770            .await
1771            .flatten()
1772        else {
1773            return Ok(None);
1774        };
1775
1776        self.store(&(height, cert2.clone())).await;
1777        Ok(Some(cert2))
1778    }
1779}
1780
1781impl<Types, S, P> Fetcher<Types, S, P>
1782where
1783    Types: NodeType,
1784    S: VersionedDataSource,
1785    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
1786{
1787    /// Store an object and notify anyone waiting on this object that it is available.
1788    async fn store_and_notify<T>(&self, obj: &T)
1789    where
1790        T: Storable<Types>,
1791    {
1792        self.store(obj).await;
1793
1794        // Send a notification about the newly received object. It is important that we do this
1795        // _after_ our attempt to store the object in local storage, otherwise there is a potential
1796        // missed notification deadlock:
1797        // * we send the notification
1798        // * a task calls [`get`](Self::get) or [`get_chunk`](Self::get_chunk), finds that the
1799        //   requested object is not in storage, and begins waiting for a notification
1800        // * we store the object. This ensures that no other task will be triggered to fetch it,
1801        //   which means no one will ever notify the waiting task.
1802        //
1803        // Note that we send the notification regardless of whether the store actually succeeded or
1804        // not. This is to avoid _another_ subtle deadlock: if we failed to notify just because we
1805        // failed to store, some fetches might not resolve, even though the object in question has
1806        // actually been fetched. This should actually be ok, because as long as the object is not
1807        // in storage, eventually some other task will come along and fetch, store, and notify about
1808        // it. However, this is certainly not ideal, since we could resolve those pending fetches
1809        // right now, and it causes bigger problems when the fetch that fails to resolve is the
1810        // proactive scanner task, who is often the one that would eventually come along and
1811        // re-fetch the object.
1812        //
1813        // The key thing to note is that it does no harm to notify even if we fail to store: at best
1814        // we wake some tasks up sooner; at worst, anyone who misses the notification still
1815        // satisfies the invariant that we only wait on notifications for objects which are not in
1816        // storage, and eventually some other task will come along, find the object missing from
1817        // storage, and re-fetch it.
1818        obj.notify(&self.notifiers).await;
1819    }
1820
1821    async fn store<T>(&self, obj: &T)
1822    where
1823        T: Storable<Types>,
1824    {
1825        let try_store = || async {
1826            let mut tx = self.storage.write().await?;
1827            obj.clone().store(&mut tx, self.leaf_only).await?;
1828            tx.commit().await
1829        };
1830
1831        // Store the object in local storage, so we can avoid fetching it in the future.
1832        let mut backoff = self.backoff.clone();
1833        backoff.reset();
1834        loop {
1835            let Err(err) = try_store().await else {
1836                break;
1837            };
1838            // It is unfortunate if this fails, but we can still proceed by notifying with the
1839            // object that we fetched, keeping it in memory. Log the error, retry a few times, and
1840            // eventually move on.
1841            tracing::warn!(
1842                obj = obj.debug_name(),
1843                "failed to store fetched object: {err:#}"
1844            );
1845
1846            let Some(delay) = backoff.next_backoff() else {
1847                break;
1848            };
1849            tracing::info!(?delay, "retrying failed operation");
1850            sleep(delay).await;
1851        }
1852    }
1853}
1854
1855#[derive(Debug)]
1856struct Notifiers<Types>
1857where
1858    Types: NodeType,
1859{
1860    block: Notifier<BlockQueryData<Types>>,
1861    leaf: Notifier<LeafQueryData<Types>>,
1862    vid_common: Notifier<VidCommonQueryData<Types>>,
1863}
1864
1865impl<Types> Default for Notifiers<Types>
1866where
1867    Types: NodeType,
1868{
1869    fn default() -> Self {
1870        Self {
1871            block: Notifier::new(),
1872            leaf: Notifier::new(),
1873            vid_common: Notifier::new(),
1874        }
1875    }
1876}
1877
1878#[derive(Clone, Copy, Debug)]
1879struct Heights {
1880    height: u64,
1881    pruned_height: Option<u64>,
1882}
1883
1884impl Heights {
1885    async fn load<Types, T>(tx: &mut T) -> anyhow::Result<Self>
1886    where
1887        Types: NodeType,
1888        Header<Types>: QueryableHeader<Types>,
1889        T: NodeStorage<Types> + PrunedHeightStorage + Send,
1890    {
1891        let height = tx.block_height().await.context("loading block height")? as u64;
1892        let pruned_height = tx
1893            .load_pruned_height()
1894            .await
1895            .context("loading pruned height")?;
1896        Ok(Self {
1897            height,
1898            pruned_height,
1899        })
1900    }
1901
1902    fn might_exist(self, h: u64) -> bool {
1903        h < self.height && self.pruned_height.is_none_or(|ph| h > ph)
1904    }
1905}
1906
1907#[async_trait]
1908impl<Types, S, P, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
1909    for FetchingDataSource<Types, S, P>
1910where
1911    Types: NodeType,
1912    S: VersionedDataSource + 'static,
1913    for<'a> S::ReadOnly<'a>: MerklizedStateStorage<Types, State, ARITY>,
1914    P: Send + Sync,
1915    State: MerklizedState<Types, ARITY> + 'static,
1916    <State as MerkleTreeScheme>::Commitment: Send,
1917{
1918    async fn get_path(
1919        &self,
1920        snapshot: Snapshot<Types, State, ARITY>,
1921        key: State::Key,
1922    ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
1923        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1924            message: err.to_string(),
1925        })?;
1926        tx.get_path(snapshot, key).await
1927    }
1928}
1929
1930#[async_trait]
1931impl<Types, S, P> MerklizedStateHeightPersistence for FetchingDataSource<Types, S, P>
1932where
1933    Types: NodeType,
1934    Header<Types>: QueryableHeader<Types>,
1935    Payload<Types>: QueryablePayload<Types>,
1936    S: VersionedDataSource + 'static,
1937    for<'a> S::ReadOnly<'a>: MerklizedStateHeightStorage,
1938    P: Send + Sync,
1939{
1940    async fn get_last_state_height(&self) -> QueryResult<usize> {
1941        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1942            message: err.to_string(),
1943        })?;
1944        tx.get_last_state_height().await
1945    }
1946}
1947
1948#[async_trait]
1949impl<Types, S, P> NodeDataSource<Types> for FetchingDataSource<Types, S, P>
1950where
1951    Types: NodeType,
1952    Header<Types>: QueryableHeader<Types>,
1953    S: VersionedDataSource + 'static,
1954    for<'a> S::ReadOnly<'a>: NodeStorage<Types> + PrunedHeightStorage,
1955    P: Send + Sync,
1956{
1957    async fn block_height(&self) -> QueryResult<usize> {
1958        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1959            message: err.to_string(),
1960        })?;
1961        tx.block_height().await
1962    }
1963
1964    async fn count_transactions_in_range(
1965        &self,
1966        range: impl RangeBounds<usize> + Send,
1967        namespace: Option<NamespaceId<Types>>,
1968    ) -> QueryResult<usize> {
1969        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1970            message: err.to_string(),
1971        })?;
1972        tx.count_transactions_in_range(range, namespace).await
1973    }
1974
1975    async fn payload_size_in_range(
1976        &self,
1977        range: impl RangeBounds<usize> + Send,
1978        namespace: Option<NamespaceId<Types>>,
1979    ) -> QueryResult<usize> {
1980        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1981            message: err.to_string(),
1982        })?;
1983        tx.payload_size_in_range(range, namespace).await
1984    }
1985
1986    async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
1987    where
1988        ID: Into<BlockId<Types>> + Send + Sync,
1989    {
1990        let mut tx = self.read().await.map_err(|err| QueryError::Error {
1991            message: err.to_string(),
1992        })?;
1993        tx.vid_share(id).await
1994    }
1995
1996    async fn sync_status(&self) -> QueryResult<SyncStatusQueryData> {
1997        self.fetcher
1998            .sync_status()
1999            .await
2000            .map_err(|err| QueryError::Error {
2001                message: format!("{err:#}"),
2002            })
2003    }
2004
2005    async fn get_header_window(
2006        &self,
2007        start: impl Into<WindowStart<Types>> + Send + Sync,
2008        end: u64,
2009        limit: usize,
2010    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
2011        let mut tx = self.read().await.map_err(|err| QueryError::Error {
2012            message: err.to_string(),
2013        })?;
2014        tx.get_header_window(start, end, limit).await
2015    }
2016}
2017
2018#[async_trait]
2019impl<Types, S, P> ExplorerDataSource<Types> for FetchingDataSource<Types, S, P>
2020where
2021    Types: NodeType,
2022    Payload<Types>: QueryablePayload<Types>,
2023    Header<Types>: QueryableHeader<Types> + explorer::traits::ExplorerHeader<Types>,
2024    crate::Transaction<Types>: explorer::traits::ExplorerTransaction<Types>,
2025    S: VersionedDataSource + 'static,
2026    for<'a> S::ReadOnly<'a>: ExplorerStorage<Types>,
2027    P: Send + Sync,
2028{
2029    async fn get_block_summaries(
2030        &self,
2031        request: explorer::query_data::GetBlockSummariesRequest<Types>,
2032    ) -> Result<
2033        Vec<explorer::query_data::BlockSummary<Types>>,
2034        explorer::query_data::GetBlockSummariesError,
2035    > {
2036        let mut tx = self.read().await.map_err(|err| QueryError::Error {
2037            message: err.to_string(),
2038        })?;
2039        tx.get_block_summaries(request).await
2040    }
2041
2042    async fn get_block_detail(
2043        &self,
2044        request: explorer::query_data::BlockIdentifier<Types>,
2045    ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
2046    {
2047        let mut tx = self.read().await.map_err(|err| QueryError::Error {
2048            message: err.to_string(),
2049        })?;
2050        tx.get_block_detail(request).await
2051    }
2052
2053    async fn get_transaction_summaries(
2054        &self,
2055        request: explorer::query_data::GetTransactionSummariesRequest<Types>,
2056    ) -> Result<
2057        Vec<explorer::query_data::TransactionSummary<Types>>,
2058        explorer::query_data::GetTransactionSummariesError,
2059    > {
2060        let mut tx = self.read().await.map_err(|err| QueryError::Error {
2061            message: err.to_string(),
2062        })?;
2063        tx.get_transaction_summaries(request).await
2064    }
2065
2066    async fn get_transaction_detail(
2067        &self,
2068        request: explorer::query_data::TransactionIdentifier<Types>,
2069    ) -> Result<
2070        explorer::query_data::TransactionDetailResponse<Types>,
2071        explorer::query_data::GetTransactionDetailError,
2072    > {
2073        let mut tx = self.read().await.map_err(|err| QueryError::Error {
2074            message: err.to_string(),
2075        })?;
2076        tx.get_transaction_detail(request).await
2077    }
2078
2079    async fn get_explorer_summary(
2080        &self,
2081    ) -> Result<
2082        explorer::query_data::ExplorerSummary<Types>,
2083        explorer::query_data::GetExplorerSummaryError,
2084    > {
2085        let mut tx = self.read().await.map_err(|err| QueryError::Error {
2086            message: err.to_string(),
2087        })?;
2088        tx.get_explorer_summary().await
2089    }
2090
2091    async fn get_search_results(
2092        &self,
2093        query: TaggedBase64,
2094    ) -> Result<
2095        explorer::query_data::SearchResult<Types>,
2096        explorer::query_data::GetSearchResultsError,
2097    > {
2098        let mut tx = self.read().await.map_err(|err| QueryError::Error {
2099            message: err.to_string(),
2100        })?;
2101        tx.get_search_results(query).await
2102    }
2103}
2104
2105/// A provider which can be used as a fetcher by the availability service.
2106pub trait AvailabilityProvider<Types: NodeType>:
2107    Provider<Types, request::LeafRequest>
2108    + Provider<Types, request::LeafRangeRequest>
2109    + Provider<Types, request::PayloadRequest>
2110    + Provider<Types, request::BlockRangeRequest>
2111    + Provider<Types, request::VidCommonRequest>
2112    + Provider<Types, request::VidCommonRangeRequest>
2113    + Provider<Types, request::Certificate2Request>
2114    + Sync
2115    + 'static
2116{
2117}
2118impl<Types: NodeType, P> AvailabilityProvider<Types> for P where
2119    P: Provider<Types, request::LeafRequest>
2120        + Provider<Types, request::LeafRangeRequest>
2121        + Provider<Types, request::PayloadRequest>
2122        + Provider<Types, request::BlockRangeRequest>
2123        + Provider<Types, request::VidCommonRequest>
2124        + Provider<Types, request::VidCommonRangeRequest>
2125        + Provider<Types, request::Certificate2Request>
2126        + Sync
2127        + 'static
2128{
2129}
2130
2131trait FetchRequest: Copy + Debug + Send + Sync + 'static {
2132    /// Indicate whether it is possible this object could exist.
2133    ///
2134    /// This can filter out requests quickly for objects that cannot possibly exist, such as
2135    /// requests for objects with a height greater than the current block height. Not only does this
2136    /// let us fail faster for such requests (without touching storage at all), it also helps keep
2137    /// logging quieter when we fail to fetch an object because the user made a bad request, while
2138    /// still being fairly loud when we fail to fetch an object that might have really existed.
2139    ///
2140    /// This method is conservative: it returns `true` if it cannot tell whether the given object
2141    /// could exist or not.
2142    fn might_exist(self, _heights: Heights) -> bool {
2143        true
2144    }
2145}
2146
2147/// Objects which can be fetched from a remote DA provider and cached in local storage.
2148///
2149/// This trait lets us abstract over leaves, blocks, and other types that can be fetched. Thus, the
2150/// logistics of fetching are shared between all objects, and only the low-level particulars are
2151/// type-specific.
2152#[async_trait]
2153trait Fetchable<Types>: Clone + Send + Sync + 'static
2154where
2155    Types: NodeType,
2156    Header<Types>: QueryableHeader<Types>,
2157    Payload<Types>: QueryablePayload<Types>,
2158{
2159    /// A succinct specification of the object to be fetched.
2160    type Request: FetchRequest;
2161
2162    /// Does this object satisfy the given request?
2163    fn satisfies(&self, req: Self::Request) -> bool;
2164
2165    /// Spawn a task to fetch the object from a remote provider, if possible.
2166    ///
2167    /// An active fetch will only be triggered if:
2168    /// * There is not already an active fetch in progress for the same object
2169    /// * The requested object is known to exist. For example, we will fetch a leaf by height but
2170    ///   not by hash, since we can't guarantee that a leaf with an arbitrary hash exists. Note that
2171    ///   this function assumes `req.might_exist()` has already been checked before calling it, and
2172    ///   so may do unnecessary work if the caller does not ensure this.
2173    ///
2174    /// If we do trigger an active fetch for an object, any passive listeners for the object will be
2175    /// notified once it has been retrieved. If we do not trigger an active fetch for an object,
2176    /// this function does nothing. In either case, as long as the requested object does in fact
2177    /// exist, we will eventually receive it passively, since we will eventually receive all blocks
2178    /// and leaves that are ever produced. Active fetching merely helps us receive certain objects
2179    /// sooner.
2180    ///
2181    /// This function fails if it _might_ be possible to actively fetch the requested object, but we
2182    /// were unable to do so (e.g. due to errors in the database).
2183    async fn active_fetch<S, P>(
2184        tx: &mut impl AvailabilityStorage<Types>,
2185        fetcher: Arc<Fetcher<Types, S, P>>,
2186        req: Self::Request,
2187    ) -> anyhow::Result<()>
2188    where
2189        S: VersionedDataSource + 'static,
2190        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
2191        for<'a> S::ReadOnly<'a>:
2192            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
2193        P: AvailabilityProvider<Types>;
2194
2195    /// Wait for someone else to fetch the object.
2196    async fn passive_fetch(notifiers: &Notifiers<Types>, req: Self::Request) -> PassiveFetch<Self>;
2197
2198    /// Load an object from local storage.
2199    ///
2200    /// This function assumes `req.might_exist()` has already been checked before calling it, and so
2201    /// may do unnecessary work if the caller does not ensure this.
2202    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
2203    where
2204        S: AvailabilityStorage<Types>;
2205}
2206
2207type PassiveFetch<T> = BoxFuture<'static, Option<T>>;
2208
2209#[async_trait]
2210trait RangedFetchable<Types>: Fetchable<Types, Request = Self::RangedRequest> + HeightIndexed
2211where
2212    Types: NodeType,
2213    Header<Types>: QueryableHeader<Types>,
2214    Payload<Types>: QueryablePayload<Types>,
2215{
2216    type RangedRequest: FetchRequest + From<usize> + Send;
2217
2218    /// Load a range of these objects from local storage.
2219    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
2220    where
2221        S: AvailabilityStorage<Types>,
2222        R: RangeBounds<usize> + Send + 'static;
2223}
2224
2225/// An object which can be stored in the database.
2226trait Storable<Types: NodeType>: Clone {
2227    /// The name of this object, for debugging purposes.
2228    fn debug_name(&self) -> String;
2229
2230    /// Notify anyone waiting for this object that it has become available.
2231    fn notify(&self, notifiers: &Notifiers<Types>) -> impl Send + Future<Output = ()>;
2232
2233    /// Store the object in the local database.
2234    fn store(
2235        &self,
2236        storage: &mut impl UpdateAvailabilityStorage<Types>,
2237        leaf_only: bool,
2238    ) -> impl Send + Future<Output = anyhow::Result<()>>;
2239}
2240
2241impl<Types: NodeType> Storable<Types>
2242    for (
2243        LeafQueryData<Types>,
2244        Option<[CertificatePair<Types>; 2]>,
2245        Option<Certificate2<Types>>,
2246    )
2247{
2248    fn debug_name(&self) -> String {
2249        format!("leaf {} with QC chain", self.0.height())
2250    }
2251
2252    async fn notify(&self, notifiers: &Notifiers<Types>) {
2253        self.0.notify(notifiers).await;
2254    }
2255
2256    async fn store(
2257        &self,
2258        storage: &mut impl UpdateAvailabilityStorage<Types>,
2259        _leaf_only: bool,
2260    ) -> anyhow::Result<()> {
2261        storage
2262            .insert_leaf_with_qc_chain(&self.0, self.1.clone())
2263            .await
2264            .context("inserting leaf with QC chain")?;
2265        if let Some(cert2) = &self.2 {
2266            storage.insert_cert2(self.0.height(), cert2.clone()).await?;
2267        }
2268        Ok(())
2269    }
2270}
2271
2272impl<Types: NodeType> Storable<Types> for (u64, Certificate2<Types>) {
2273    fn debug_name(&self) -> String {
2274        format!("cert2 at height {}", self.0)
2275    }
2276
2277    async fn notify(&self, _notifiers: &Notifiers<Types>) {
2278        // No passive listeners for cert2.
2279    }
2280
2281    async fn store(
2282        &self,
2283        storage: &mut impl UpdateAvailabilityStorage<Types>,
2284        _leaf_only: bool,
2285    ) -> anyhow::Result<()> {
2286        storage.insert_cert2(self.0, self.1.clone()).await
2287    }
2288}
2289
2290/// Break a range into fixed-size chunks.
2291fn range_chunks<R>(range: R, chunk_size: usize) -> impl Iterator<Item = Range<usize>>
2292where
2293    R: RangeBounds<usize>,
2294{
2295    // Transform range to explicit start (inclusive) and end (exclusive) bounds.
2296    let Range { mut start, end } = range_to_bounds(range);
2297    std::iter::from_fn(move || {
2298        let chunk_end = min(start + chunk_size, end);
2299        if chunk_end == start {
2300            return None;
2301        }
2302
2303        let chunk = start..chunk_end;
2304        start = chunk_end;
2305        Some(chunk)
2306    })
2307}
2308
2309/// Break a range into fixed-alignment chunks.
2310///
2311/// Each chunk is of size `alignment`, and starts on a multiple of `alignment`, with the possible
2312/// exception of the first chunk (which may be misaligned and small) and the last (which may be
2313/// small).
2314fn range_chunks_aligned<R>(range: R, alignment: usize) -> impl Iterator<Item = Range<usize>>
2315where
2316    R: RangeBounds<usize>,
2317{
2318    // Transform range to explicit start (inclusive) and end (exclusive) bounds.
2319    let Range { mut start, end } = range_to_bounds(range);
2320
2321    // If necessary, generate a partial first chunk to force the remaining chunks into alignment.
2322    let first = if start.is_multiple_of(alignment) {
2323        None
2324    } else {
2325        // The partial first chunk ends at the next multiple of the alignment, or at the end of the
2326        // overall range, whichever comes first.
2327        let chunk_end = min(start.next_multiple_of(alignment), end);
2328        let chunk = start..chunk_end;
2329
2330        // Start the series of aligned chunks at the end of the partial first chunk.
2331        start = chunk_end;
2332        Some(chunk)
2333    };
2334
2335    first.into_iter().chain(range_chunks(start..end, alignment))
2336}
2337
2338/// Transform a range to explicit start (inclusive) and end (exclusive) bounds.
2339fn range_to_bounds(range: impl RangeBounds<usize>) -> Range<usize> {
2340    let start = match range.start_bound() {
2341        Bound::Included(i) => *i,
2342        Bound::Excluded(i) => *i + 1,
2343        Bound::Unbounded => 0,
2344    };
2345    let end = match range.end_bound() {
2346        Bound::Included(i) => *i + 1,
2347        Bound::Excluded(i) => *i,
2348        Bound::Unbounded => usize::MAX,
2349    };
2350    Range { start, end }
2351}
2352
2353/// Break a range into fixed-size chunks, starting from the end and moving towards the start.
2354///
2355/// While the chunks are yielded in reverse order, from `end` to `start`, each individual chunk is
2356/// in the usual ascending order. That is, the first chunk ends with `end` and the last chunk starts
2357/// with `start`.
2358///
2359/// Note that unlike [`range_chunks`], which accepts any range and yields an infinite iterator if
2360/// the range has no upper bound, this function requires there to be a defined upper bound,
2361/// otherwise we don't know where the reversed iterator should _start_. The `end` bound given here
2362/// is inclusive; i.e. the end of the first chunk yielded by the stream will be exactly `end`.
2363fn range_chunks_rev(
2364    start: Bound<usize>,
2365    end: usize,
2366    chunk_size: usize,
2367) -> impl Iterator<Item = Range<usize>> {
2368    // Transform the start bound to be inclusive.
2369    let start = match start {
2370        Bound::Included(i) => i,
2371        Bound::Excluded(i) => i + 1,
2372        Bound::Unbounded => 0,
2373    };
2374    // Transform the end bound to be exclusive.
2375    let mut end = end + 1;
2376
2377    std::iter::from_fn(move || {
2378        let chunk_start = max(start, end.saturating_sub(chunk_size));
2379        if end <= chunk_start {
2380            return None;
2381        }
2382
2383        let chunk = chunk_start..end;
2384        end = chunk_start;
2385        Some(chunk)
2386    })
2387}
2388
2389trait ResultExt<T, E> {
2390    fn ok_or_trace(self) -> Option<T>
2391    where
2392        E: Display;
2393}
2394
2395impl<T, E> ResultExt<T, E> for Result<T, E> {
2396    fn ok_or_trace(self) -> Option<T>
2397    where
2398        E: Display,
2399    {
2400        match self {
2401            Ok(t) => Some(t),
2402            Err(err) => {
2403                tracing::info!(
2404                    "error loading resource from local storage, will try to fetch: {err:#}"
2405                );
2406                None
2407            },
2408        }
2409    }
2410}
2411
2412#[derive(Debug)]
2413struct ScannerMetrics {
2414    /// Whether a scan is currently running (1) or not (0).
2415    running: Box<dyn Gauge>,
2416    /// The current number that is running.
2417    current_scan: Box<dyn Gauge>,
2418    /// Number of blocks processed in the current scan.
2419    scanned_blocks: Box<dyn Gauge>,
2420    /// Number of VID entries processed in the current scan.
2421    scanned_vid: Box<dyn Gauge>,
2422    /// The number of missing blocks discovered and not yet resolved in the current scan.
2423    missing_blocks: Box<dyn Gauge>,
2424    /// The number of missing VID entries discovered and not yet resolved in the current scan.
2425    missing_vid: Box<dyn Gauge>,
2426}
2427
2428impl ScannerMetrics {
2429    fn new(metrics: &PrometheusMetrics) -> Self {
2430        let group = metrics.subgroup("scanner".into());
2431        Self {
2432            running: group.create_gauge("running".into(), None),
2433            current_scan: group.create_gauge("current".into(), None),
2434            scanned_blocks: group.create_gauge("scanned_blocks".into(), None),
2435            scanned_vid: group.create_gauge("scanned_vid".into(), None),
2436            missing_blocks: group.create_gauge("missing_blocks".into(), None),
2437            missing_vid: group.create_gauge("missing_vid".into(), None),
2438        }
2439    }
2440}
2441
2442#[derive(Debug)]
2443struct AggregatorMetrics {
2444    /// The block height for which aggregate statistics are currently available.
2445    height: Box<dyn Gauge>,
2446}
2447
2448impl AggregatorMetrics {
2449    fn new(metrics: &PrometheusMetrics) -> Self {
2450        let group = metrics.subgroup("aggregator".into());
2451        Self {
2452            height: group.create_gauge("height".into(), None),
2453        }
2454    }
2455}
2456
2457#[derive(Debug)]
2458struct SyncStatusMetrics {
2459    current_range_start: Box<dyn Gauge>,
2460    current_range_end: Box<dyn Gauge>,
2461    current_start_time: Box<dyn Gauge>,
2462    avg_rate: Box<dyn Histogram>,
2463    ranges_scanned: Box<dyn Counter>,
2464    running: Box<dyn Gauge>,
2465}
2466
2467impl SyncStatusMetrics {
2468    fn new(metrics: &PrometheusMetrics, size: usize) -> Self {
2469        let group = metrics.subgroup("sync_status".into());
2470        group.create_gauge("range_size".into(), None).set(size);
2471
2472        Self {
2473            current_range_start: group.create_gauge("current_range_start".into(), None),
2474            current_range_end: group.create_gauge("current_range_end".into(), None),
2475            current_start_time: group
2476                .create_gauge("current_range_start_time".into(), Some("s".into())),
2477            avg_rate: group
2478                .create_histogram("avg_time_per_block_scanned".into(), Some("ms".into())),
2479            ranges_scanned: group.create_counter("ranges_scanned".into(), None),
2480            running: group.create_gauge("running".into(), None),
2481        }
2482    }
2483
2484    fn start_range(&self, range: &Range<usize>) -> SyncStatusRangeMetrics<'_> {
2485        let start = Utc::now();
2486        self.current_range_start.set(range.start);
2487        self.current_range_end.set(range.end);
2488        self.current_start_time.set(start.timestamp() as usize);
2489        self.running.set(1);
2490        SyncStatusRangeMetrics {
2491            size: range.end - range.start,
2492            start,
2493            metrics: self,
2494        }
2495    }
2496}
2497
2498#[must_use]
2499#[derive(Debug)]
2500struct SyncStatusRangeMetrics<'a> {
2501    size: usize,
2502    start: DateTime<Utc>,
2503    metrics: &'a SyncStatusMetrics,
2504}
2505
2506impl<'a> SyncStatusRangeMetrics<'a> {
2507    fn end(self) {
2508        let elapsed = Utc::now() - self.start;
2509        self.metrics
2510            .avg_rate
2511            .add_point((elapsed.num_milliseconds() as f64) / (self.size as f64));
2512        self.metrics.ranges_scanned.add(1);
2513        self.metrics.running.set(0);
2514    }
2515}
2516
2517#[derive(Debug)]
2518struct CachedSyncStatus {
2519    last_updated: Instant,
2520    ttl: Duration,
2521    cached: Option<SyncStatusQueryData>,
2522}
2523
2524impl CachedSyncStatus {
2525    fn new(ttl: Duration) -> Self {
2526        Self {
2527            last_updated: Instant::now(),
2528            ttl,
2529            cached: None,
2530        }
2531    }
2532
2533    /// Return the cached sync status, if present and fresh.
2534    fn try_get(&self) -> Option<&SyncStatusQueryData> {
2535        if self.last_updated.elapsed() > self.ttl {
2536            // Cached value is stale.
2537            return None;
2538        }
2539        self.cached.as_ref()
2540    }
2541
2542    /// Refresh the cache with an updated value.
2543    fn update(&mut self, value: SyncStatusQueryData) {
2544        self.last_updated = Instant::now();
2545        self.cached = Some(value);
2546    }
2547}
2548
2549/// Turn a fallible passive fetch future into an infallible "fetch".
2550///
2551/// Basically, we ignore failures due to a channel sender being dropped, which should never happen.
2552fn passive<T>(
2553    req: impl Debug + Send + 'static,
2554    fut: impl Future<Output = Option<T>> + Send + 'static,
2555) -> Fetch<T>
2556where
2557    T: Send + 'static,
2558{
2559    Fetch::Pending(
2560        fut.then(move |opt| async move {
2561            match opt {
2562                Some(t) => t,
2563                None => {
2564                    // If `passive_fetch` returns `None`, it means the notifier was dropped without
2565                    // ever sending a notification. In this case, the correct behavior is actually
2566                    // to block forever (unless the `Fetch` itself is dropped), since the semantics
2567                    // of `Fetch` are to never fail. This is analogous to fetching an object which
2568                    // doesn't actually exist: the `Fetch` will never return.
2569                    //
2570                    // However, for ease of debugging, and since this is never expected to happen in
2571                    // normal usage, we panic instead. This should only happen in two cases:
2572                    // * The server was shut down (dropping the notifier) without cleaning up some
2573                    //   background tasks. This will not affect runtime behavior, but should be
2574                    //   fixed if it happens.
2575                    // * There is a very unexpected runtime bug resulting in the notifier being
2576                    //   dropped. If this happens, things are very broken in any case, and it is
2577                    //   better to panic loudly than simply block forever.
2578                    panic!("notifier dropped without satisfying request {req:?}");
2579                },
2580            }
2581        })
2582        .boxed(),
2583    )
2584}
2585
2586/// Get the result of the first future to return `Some`, if either do.
2587async fn select_some<T>(
2588    a: impl Future<Output = Option<T>> + Unpin,
2589    b: impl Future<Output = Option<T>> + Unpin,
2590) -> Option<T> {
2591    match future::select(a, b).await {
2592        // If the first future resolves with `Some`, immediately return the result.
2593        Either::Left((Some(a), _)) => Some(a),
2594        Either::Right((Some(b), _)) => Some(b),
2595
2596        // If the first future resolves with `None`, wait for the result of the second future.
2597        Either::Left((None, b)) => b.await,
2598        Either::Right((None, a)) => a.await,
2599    }
2600}
2601
2602#[cfg(test)]
2603mod test {
2604    use hotshot_example_types::node_types::TEST_VERSIONS;
2605
2606    use super::*;
2607    use crate::{
2608        data_source::{
2609            sql::testing::TmpDb,
2610            storage::{SqlStorage, StorageConnectionType},
2611        },
2612        fetching::provider::NoFetching,
2613        testing::{consensus::MockSqlDataSource, mocks::MockTypes},
2614    };
2615
2616    #[test]
2617    fn test_range_chunks() {
2618        // Inclusive bounds, partial last chunk.
2619        assert_eq!(
2620            range_chunks(0..=4, 2).collect::<Vec<_>>(),
2621            [0..2, 2..4, 4..5]
2622        );
2623
2624        // Inclusive bounds, complete last chunk.
2625        assert_eq!(
2626            range_chunks(0..=5, 2).collect::<Vec<_>>(),
2627            [0..2, 2..4, 4..6]
2628        );
2629
2630        // Exclusive bounds, partial last chunk.
2631        assert_eq!(
2632            range_chunks(0..5, 2).collect::<Vec<_>>(),
2633            [0..2, 2..4, 4..5]
2634        );
2635
2636        // Exclusive bounds, complete last chunk.
2637        assert_eq!(
2638            range_chunks(0..6, 2).collect::<Vec<_>>(),
2639            [0..2, 2..4, 4..6]
2640        );
2641
2642        // Unbounded.
2643        assert_eq!(
2644            range_chunks(0.., 2).take(5).collect::<Vec<_>>(),
2645            [0..2, 2..4, 4..6, 6..8, 8..10]
2646        );
2647    }
2648
2649    #[test]
2650    fn test_range_chunks_aligned() {
2651        #![allow(clippy::single_range_in_vec_init)]
2652
2653        // Aligned first chunk, partial last chunk.
2654        assert_eq!(
2655            range_chunks_aligned(2..5, 2).collect::<Vec<_>>(),
2656            [2..4, 4..5]
2657        );
2658
2659        // Misaligned first chunk, complete last chunk.
2660        assert_eq!(
2661            range_chunks_aligned(1..4, 2).collect::<Vec<_>>(),
2662            [1..2, 2..4]
2663        );
2664
2665        // Incomplete chunk.
2666        assert_eq!(range_chunks_aligned(1..3, 10).collect::<Vec<_>>(), [1..3]);
2667
2668        // Unbounded.
2669        assert_eq!(
2670            range_chunks_aligned(1.., 2).take(5).collect::<Vec<_>>(),
2671            [1..2, 2..4, 4..6, 6..8, 8..10]
2672        );
2673    }
2674
2675    #[test]
2676    fn test_range_chunks_rev() {
2677        // Inclusive bounds, partial last chunk.
2678        assert_eq!(
2679            range_chunks_rev(Bound::Included(0), 4, 2).collect::<Vec<_>>(),
2680            [3..5, 1..3, 0..1]
2681        );
2682
2683        // Inclusive bounds, complete last chunk.
2684        assert_eq!(
2685            range_chunks_rev(Bound::Included(0), 5, 2).collect::<Vec<_>>(),
2686            [4..6, 2..4, 0..2]
2687        );
2688
2689        // Exclusive bounds, partial last chunk.
2690        assert_eq!(
2691            range_chunks_rev(Bound::Excluded(0), 5, 2).collect::<Vec<_>>(),
2692            [4..6, 2..4, 1..2]
2693        );
2694
2695        // Exclusive bounds, complete last chunk.
2696        assert_eq!(
2697            range_chunks_rev(Bound::Excluded(0), 4, 2).collect::<Vec<_>>(),
2698            [3..5, 1..3]
2699        );
2700    }
2701
2702    async fn test_sync_status(chunk_size: usize, present_ranges: &[(usize, usize)]) {
2703        let block_height = present_ranges.last().unwrap().1;
2704        let storage = TmpDb::init().await;
2705        let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
2706            .await
2707            .unwrap();
2708        let ds = MockSqlDataSource::builder(db, NoFetching)
2709            .with_sync_status_chunk_size(chunk_size)
2710            .with_sync_status_ttl(Duration::ZERO)
2711            .build()
2712            .await
2713            .unwrap();
2714
2715        // Generate some mock leaves to insert.
2716        let mut leaves: Vec<LeafQueryData<MockTypes>> = vec![
2717            LeafQueryData::<MockTypes>::genesis(
2718                &Default::default(),
2719                &Default::default(),
2720                TEST_VERSIONS.test,
2721            )
2722            .await,
2723        ];
2724        for i in 1..block_height {
2725            let mut leaf = leaves[i - 1].clone();
2726            leaf.leaf.block_header_mut().block_number = i as u64;
2727            leaves.push(leaf);
2728        }
2729
2730        // Set up.
2731        {
2732            let mut tx = ds.write().await.unwrap();
2733
2734            for &(start, end) in present_ranges {
2735                for leaf in &leaves[start..end] {
2736                    tracing::info!(height = leaf.height(), "insert leaf");
2737                    tx.insert_leaf(leaf).await.unwrap();
2738                }
2739            }
2740
2741            if present_ranges[0].0 > 0 {
2742                tx.save_pruned_height((present_ranges[0].0 - 1) as u64)
2743                    .await
2744                    .unwrap();
2745            }
2746
2747            tx.commit().await.unwrap();
2748        }
2749
2750        let sync_status = ds.sync_status().await.unwrap().leaves;
2751
2752        // Verify missing.
2753        let present: usize = present_ranges.iter().map(|(start, end)| end - start).sum();
2754        assert_eq!(
2755            sync_status.missing,
2756            block_height - present - present_ranges[0].0
2757        );
2758
2759        // Verify ranges.
2760        let mut ranges = sync_status.ranges.into_iter();
2761        let mut prev = 0;
2762        for &(start, end) in present_ranges {
2763            if start != prev {
2764                let range = ranges.next().unwrap();
2765                assert_eq!(
2766                    range,
2767                    SyncStatusRange {
2768                        start: prev,
2769                        end: start,
2770                        status: if prev == 0 {
2771                            SyncStatus::Pruned
2772                        } else {
2773                            SyncStatus::Missing
2774                        },
2775                    }
2776                );
2777            }
2778            let range = ranges.next().unwrap();
2779            assert_eq!(
2780                range,
2781                SyncStatusRange {
2782                    start,
2783                    end,
2784                    status: SyncStatus::Present,
2785                }
2786            );
2787            prev = end;
2788        }
2789
2790        if prev != block_height {
2791            let range = ranges.next().unwrap();
2792            assert_eq!(
2793                range,
2794                SyncStatusRange {
2795                    start: prev,
2796                    end: block_height,
2797                    status: SyncStatus::Missing,
2798                }
2799            );
2800        }
2801
2802        assert_eq!(ranges.next(), None);
2803    }
2804
2805    #[tokio::test]
2806    #[test_log::test]
2807    async fn test_sync_status_multiple_chunks() {
2808        test_sync_status(10, &[(0, 1), (3, 5), (8, 10)]).await;
2809    }
2810
2811    #[tokio::test]
2812    #[test_log::test]
2813    async fn test_sync_status_multiple_chunks_present_range_overlapping_chunk() {
2814        test_sync_status(5, &[(1, 4)]).await;
2815    }
2816
2817    #[tokio::test]
2818    #[test_log::test]
2819    async fn test_sync_status_multiple_chunks_missing_range_overlapping_chunk() {
2820        test_sync_status(5, &[(0, 1), (4, 5)]).await;
2821    }
2822
2823    #[tokio::test]
2824    #[test_log::test]
2825    async fn test_load_range_incomplete() {
2826        let storage = TmpDb::init().await;
2827        let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
2828            .await
2829            .unwrap();
2830        {
2831            let mut tx = db.write().await.unwrap();
2832            tx.insert_leaf(
2833                &LeafQueryData::<MockTypes>::genesis(
2834                    &Default::default(),
2835                    &Default::default(),
2836                    TEST_VERSIONS.test,
2837                )
2838                .await,
2839            )
2840            .await
2841            .unwrap();
2842            tx.insert_block(
2843                &BlockQueryData::<MockTypes>::genesis(
2844                    &Default::default(),
2845                    &Default::default(),
2846                    TEST_VERSIONS.test.base,
2847                )
2848                .await,
2849            )
2850            .await
2851            .unwrap();
2852            tx.insert_vid(
2853                &VidCommonQueryData::<MockTypes>::genesis(
2854                    &Default::default(),
2855                    &Default::default(),
2856                    TEST_VERSIONS.test.base,
2857                )
2858                .await,
2859                None,
2860            )
2861            .await
2862            .unwrap();
2863            tx.commit().await.unwrap();
2864        }
2865
2866        let mut tx = db.read().await.unwrap();
2867        let req = RangeRequest { start: 0, end: 100 };
2868
2869        let err = <NonEmptyRange<BlockQueryData<MockTypes>>>::load(&mut tx, req)
2870            .await
2871            .unwrap_err();
2872        tracing::info!("loading partial block range failed as expected: {err:#}");
2873        assert!(matches!(err, QueryError::Missing));
2874
2875        let err =
2876            <NonEmptyRange<LeafQueryData<MockTypes>> as Fetchable<MockTypes>>::load(&mut tx, req)
2877                .await
2878                .unwrap_err();
2879        tracing::info!("loading partial leaf range failed as expected: {err:#}");
2880        assert!(matches!(err, QueryError::Missing));
2881
2882        let err = <NonEmptyRange<VidCommonQueryData<MockTypes>>>::load(&mut tx, req)
2883            .await
2884            .unwrap_err();
2885        tracing::info!("loading partial VID common range failed as expected: {err:#}");
2886        assert!(matches!(err, QueryError::Missing));
2887    }
2888}