hotshot_query_service/
fetching.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Fetching missing data from remote providers.
14//!
15//! This module provides a mechanism to fetch data that is missing from this query service's storage
16//! from a remote data availability provider. [`Fetcher`] can be used to handle concurrent requests
17//! for data, ensuring that each distinct resource is only fetched once at a time.
18//!
19//! Fetching is ultimately dispatched to a [`Provider`], which implements fetching for a specific
20//! type of resource from a specific source. The [`provider`] module contains built-in
21//! implementations of [`Provider`] for various data availability sources.
22//!
23
24use std::{
25    collections::{BTreeSet, HashMap, hash_map::Entry},
26    fmt::Debug,
27    sync::Arc,
28    time::Duration,
29};
30
31use anyhow::ensure;
32use async_lock::{Mutex, Semaphore};
33use backoff::{ExponentialBackoff, backoff::Backoff};
34use derivative::Derivative;
35use derive_more::Into;
36use serde::{Deserialize, Serialize};
37use tokio::{spawn, time::sleep};
38
39pub mod provider;
40pub mod request;
41
42pub use provider::Provider;
43pub use request::Request;
44
45use crate::types::HeightIndexed;
46
47/// A callback to process the result of a request.
48///
49/// Sometimes, we may fetch the same object for multiple purposes, so a request may have more than
50/// one callback registered. For example, we may fetch a leaf for its own sake and also to
51/// reconstruct a block. Or, we may fetch the same payload for two different blocks. In both of
52/// these cases, there are two objects that must be processed and stored after the fetch completes.
53///
54/// In these cases, we only want one task to actually fetch the resource, but there may be several
55/// unrelated actions to take after the resource is fetched. This trait allows us to identify a
56/// callback, so that when the task that actually fetched the resource completes, it will run one
57/// instance of each distinct callback which was registered. Callbacks will run in the order
58/// determined by `Ord`.
59#[trait_variant::make(Callback: Send)]
60pub trait LocalCallback<T>: Debug + Ord {
61    async fn run(self, response: T);
62}
63
64/// Management of concurrent requests to fetch resources.
65#[derive(Derivative)]
66#[derivative(Clone(bound = ""), Debug(bound = ""))]
67pub struct Fetcher<T, C> {
68    #[derivative(Debug = "ignore")]
69    in_progress: Arc<Mutex<HashMap<T, BTreeSet<C>>>>,
70    backoff: ExponentialBackoff,
71    permit: Arc<Semaphore>,
72}
73
74impl<T, C> Fetcher<T, C> {
75    pub fn new(permit: Arc<Semaphore>, backoff: ExponentialBackoff) -> Self {
76        Self {
77            in_progress: Default::default(),
78            permit,
79            backoff,
80        }
81    }
82}
83
84impl<T, C> Fetcher<T, C> {
85    /// Fetch a resource, if it is not already being fetched.
86    ///
87    /// This function will spawn a new task to fetch the resource in the background, using callbacks
88    /// to process the fetched resource upon success. If the resource is already being fetched, the
89    /// spawned task will terminate without fetching the resource, but only after registering the
90    /// provided callbacks to be executed by the existing fetching task upon completion, as long as
91    /// there are not already equivalent callbacks registered.
92    ///
93    /// We spawn a (short-lived) task even if the resource is already being fetched, because the
94    /// check that the resource is being fetched requires an exclusive lock, and we do not want to
95    /// block the caller, which might be on the critical path of request handling.
96    ///
97    /// Note that while callbacks are allowed to be async, they are executed sequentially while an
98    /// exclusive lock is held, and thus they should not take too long to run or block indefinitely.
99    ///
100    /// The spawned task will continue trying to fetch the object until it succeeds, so it is the
101    /// caller's responsibility only to use this method for resources which are known to exist and
102    /// be fetchable by `provider`.
103    pub fn spawn_fetch<Types>(
104        &self,
105        req: T,
106        provider: impl Provider<Types, T> + 'static,
107        callbacks: impl IntoIterator<Item = C> + Send + 'static,
108    ) where
109        T: Request<Types> + 'static,
110        C: Callback<T::Response> + 'static,
111    {
112        let in_progress = self.in_progress.clone();
113        let permit = self.permit.clone();
114        let mut backoff = self.backoff.clone();
115
116        spawn(async move {
117            tracing::info!("spawned active fetch for {req:?}");
118
119            // Check if the requested object is already being fetched. If not, take a lock on it so
120            // we are the only ones to fetch this particular object.
121            {
122                let mut in_progress = in_progress.lock().await;
123                match in_progress.entry(req) {
124                    Entry::Occupied(mut e) => {
125                        // If the object is already being fetched, add our callback for the fetching
126                        // task to execute upon completion.
127                        e.get_mut().extend(callbacks);
128                        tracing::info!(?req, callbacks = ?e.get(), "resource is already being fetched");
129                        return;
130                    },
131                    Entry::Vacant(e) => {
132                        // If the object is not being fetched, we will register our own callback and
133                        // then fetch it ourselves.
134                        e.insert(callbacks.into_iter().collect());
135                    },
136                }
137            }
138
139            // Now we are responsible for fetching the object, reach out to the provider.
140            backoff.reset();
141            let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
142            let res = loop {
143                // Acquire a permit from the semaphore to rate limit the number of concurrent fetch requests
144                let permit = permit.acquire().await;
145                if let Some(res) = provider.fetch(req).await {
146                    break res;
147                }
148
149                // We only fetch objects which are known to exist, so we should eventually succeed
150                // in fetching if we retry enough. For example, we may be fetching a block from a
151                // peer who hasn't received the block yet.
152                //
153                // To understand why it is ok to retry indefinitely, think about manual
154                // intervention: if we don't retry, or retry with a limit, we may require manual
155                // intervention whenever a query service fails to fetch a resource that should exist
156                // and stops retrying, since it now may never receive that resource. With indefinite
157                // fetching, we require manual intervention only when active fetches are
158                // accumulating because a peer which _should_ have the resource isn't providing it.
159                // In this case, we would require manual intervention on the peer anyways.
160                tracing::warn!("failed to fetch {req:?}, will retry in {delay:?}");
161                drop(permit);
162                sleep(delay).await;
163
164                if let Some(next_delay) = backoff.next_backoff() {
165                    delay = next_delay;
166                }
167            };
168
169            // Done fetching, remove our lock on the object and execute all callbacks.
170            //
171            // We will keep this lock the whole time we are running the callbacks. We can't release
172            // it earlier because we can't allow another task to register a callback after we have
173            // taken the list of callbacks that we will execute. We also don't want to allow any new
174            // fetches until we have executed the callbacks, because one of the callbacks may store
175            // some resource that obviates the need for another fetch.
176            //
177            // The callbacks may acquire arbitrary locks from this task, while we already hold the
178            // lock on `in_progress`. This is fine because we are always running in a freshly
179            // spawned task. Therefore we know that this task holds no locks _before_ acquiring
180            // `in_progress`, and so it is safe to acquire any lock _after_ acquiring `in_progress`.
181            let mut in_progress = in_progress.lock().await;
182            let callbacks = in_progress.remove(&req).unwrap_or_default();
183            for callback in callbacks {
184                callback.run(res.clone()).await;
185            }
186        });
187    }
188}
189
190/// Added type safety for objects which are fetched in batches.
191///
192/// A [`NonEmptyRange`] has a similar interface as a [`Vec`], but it enforces, via the methods with
193/// which it can be constructed, that the data it contains is always
194/// * at least one object of type `T`
195/// * a contiguous range of objects ordered by increasing [`height`](HeightIndexed::height).
196#[derive(Clone, Debug, Into, Deserialize, Serialize, PartialEq, Eq)]
197#[serde(
198    // Important: use `try_from` when deserializing so that we perform the necessary invariant
199    // checks.
200    try_from = "Vec<T>",
201    into = "Vec<T>",
202    bound(
203        deserialize = "T: HeightIndexed + serde::de::DeserializeOwned",
204        serialize = "T: Clone + Serialize"
205    )
206)]
207pub struct NonEmptyRange<T>(Vec<T>);
208
209impl<T> NonEmptyRange<T>
210where
211    T: HeightIndexed,
212{
213    /// Construct a [`NonEmptyRange`] from a sequence of elements.
214    ///
215    /// # Errors
216    ///
217    /// This constructor will fail if the given sequence is empty, or if its elements do not
218    /// represent a contiguous range by height.
219    pub fn new(elems: impl IntoIterator<Item = T>) -> anyhow::Result<Self> {
220        elems.into_iter().collect::<Vec<_>>().try_into()
221    }
222
223    /// The inclusive lower bound of the range of heights of objects in this [`NonEmptyRange`].
224    pub fn start(&self) -> u64 {
225        self.0[0].height()
226    }
227
228    /// The exclusive upper bound of the range of heights of objects in this [`NonEmptyRange`].
229    pub fn end(&self) -> u64 {
230        self.start() + (self.len() as u64)
231    }
232
233    /// The number of objects in this [`NonEmptyRange`].
234    pub fn len(&self) -> usize {
235        self.0.len()
236    }
237
238    /// Whether the [`NonEmptyRange`] is empty.
239    ///
240    /// This function always returns `false`. It is included only because it is idiomatically
241    /// paired with [`Self::len`], as demanded by Clippy.
242    pub fn is_empty(&self) -> bool {
243        false
244    }
245
246    pub fn iter(&self) -> std::slice::Iter<'_, T> {
247        self.0.iter()
248    }
249
250    /// Convert a range of objects into an equivalent range of sub-objects with the same heights.
251    pub(crate) fn as_ref_cloned<U>(&self) -> NonEmptyRange<U>
252    where
253        T: AsRef<U>,
254        U: Clone,
255    {
256        NonEmptyRange(self.0.iter().map(|t| t.as_ref().clone()).collect())
257    }
258}
259
260impl<T> TryFrom<Vec<T>> for NonEmptyRange<T>
261where
262    T: HeightIndexed,
263{
264    type Error = anyhow::Error;
265
266    fn try_from(elems: Vec<T>) -> Result<Self, Self::Error> {
267        ensure!(
268            !elems.is_empty(),
269            "cannot construct a non-empty range from an empty vector"
270        );
271        for (x, y) in elems.iter().zip(&elems[1..]) {
272            ensure!(
273                x.height() + 1 == y.height(),
274                "cannot construct a non-empty range from a non-contiguous vector"
275            );
276        }
277        Ok(Self(elems))
278    }
279}
280
281impl<T> PartialEq<Vec<T>> for NonEmptyRange<T>
282where
283    T: PartialEq,
284{
285    fn eq(&self, other: &Vec<T>) -> bool {
286        self.0.eq(other)
287    }
288}
289
290impl<T> IntoIterator for NonEmptyRange<T> {
291    type IntoIter = <Vec<T> as IntoIterator>::IntoIter;
292    type Item = T;
293
294    fn into_iter(self) -> Self::IntoIter {
295        self.0.into_iter()
296    }
297}
298
299impl<'a, T> IntoIterator for &'a NonEmptyRange<T> {
300    type IntoIter = <&'a Vec<T> as IntoIterator>::IntoIter;
301    type Item = &'a T;
302
303    fn into_iter(self) -> Self::IntoIter {
304        self.0.iter()
305    }
306}
307
308impl<T> AsRef<[T]> for NonEmptyRange<T> {
309    fn as_ref(&self) -> &[T] {
310        self.0.as_ref()
311    }
312}