hotshot_query_service/fetching.rs
1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Fetching missing data from remote providers.
14//!
15//! This module provides a mechanism to fetch data that is missing from this query service's storage
16//! from a remote data availability provider. [`Fetcher`] can be used to handle concurrent requests
17//! for data, ensuring that each distinct resource is only fetched once at a time.
18//!
19//! Fetching is ultimately dispatched to a [`Provider`], which implements fetching for a specific
20//! type of resource from a specific source. The [`provider`] module contains built-in
21//! implementations of [`Provider`] for various data availability sources.
22//!
23
24use std::{
25 collections::{BTreeSet, HashMap, hash_map::Entry},
26 fmt::Debug,
27 sync::Arc,
28 time::Duration,
29};
30
31use anyhow::ensure;
32use async_lock::{Mutex, Semaphore};
33use backoff::{ExponentialBackoff, backoff::Backoff};
34use derivative::Derivative;
35use derive_more::Into;
36use serde::{Deserialize, Serialize};
37use tokio::{spawn, time::sleep};
38
39pub mod provider;
40pub mod request;
41
42pub use provider::Provider;
43pub use request::Request;
44
45use crate::types::HeightIndexed;
46
47/// A callback to process the result of a request.
48///
49/// Sometimes, we may fetch the same object for multiple purposes, so a request may have more than
50/// one callback registered. For example, we may fetch a leaf for its own sake and also to
51/// reconstruct a block. Or, we may fetch the same payload for two different blocks. In both of
52/// these cases, there are two objects that must be processed and stored after the fetch completes.
53///
54/// In these cases, we only want one task to actually fetch the resource, but there may be several
55/// unrelated actions to take after the resource is fetched. This trait allows us to identify a
56/// callback, so that when the task that actually fetched the resource completes, it will run one
57/// instance of each distinct callback which was registered. Callbacks will run in the order
58/// determined by `Ord`.
59#[trait_variant::make(Callback: Send)]
60pub trait LocalCallback<T>: Debug + Ord {
61 async fn run(self, response: T);
62}
63
64/// Management of concurrent requests to fetch resources.
65#[derive(Derivative)]
66#[derivative(Clone(bound = ""), Debug(bound = ""))]
67pub struct Fetcher<T, C> {
68 #[derivative(Debug = "ignore")]
69 in_progress: Arc<Mutex<HashMap<T, BTreeSet<C>>>>,
70 backoff: ExponentialBackoff,
71 permit: Arc<Semaphore>,
72}
73
74impl<T, C> Fetcher<T, C> {
75 pub fn new(permit: Arc<Semaphore>, backoff: ExponentialBackoff) -> Self {
76 Self {
77 in_progress: Default::default(),
78 permit,
79 backoff,
80 }
81 }
82}
83
84impl<T, C> Fetcher<T, C> {
85 /// Fetch a resource, if it is not already being fetched.
86 ///
87 /// This function will spawn a new task to fetch the resource in the background, using callbacks
88 /// to process the fetched resource upon success. If the resource is already being fetched, the
89 /// spawned task will terminate without fetching the resource, but only after registering the
90 /// provided callbacks to be executed by the existing fetching task upon completion, as long as
91 /// there are not already equivalent callbacks registered.
92 ///
93 /// We spawn a (short-lived) task even if the resource is already being fetched, because the
94 /// check that the resource is being fetched requires an exclusive lock, and we do not want to
95 /// block the caller, which might be on the critical path of request handling.
96 ///
97 /// Note that while callbacks are allowed to be async, they are executed sequentially while an
98 /// exclusive lock is held, and thus they should not take too long to run or block indefinitely.
99 ///
100 /// The spawned task will continue trying to fetch the object until it succeeds, so it is the
101 /// caller's responsibility only to use this method for resources which are known to exist and
102 /// be fetchable by `provider`.
103 pub fn spawn_fetch<Types>(
104 &self,
105 req: T,
106 provider: impl Provider<Types, T> + 'static,
107 callbacks: impl IntoIterator<Item = C> + Send + 'static,
108 ) where
109 T: Request<Types> + 'static,
110 C: Callback<T::Response> + 'static,
111 {
112 let in_progress = self.in_progress.clone();
113 let permit = self.permit.clone();
114 let mut backoff = self.backoff.clone();
115
116 spawn(async move {
117 tracing::info!("spawned active fetch for {req:?}");
118
119 // Check if the requested object is already being fetched. If not, take a lock on it so
120 // we are the only ones to fetch this particular object.
121 {
122 let mut in_progress = in_progress.lock().await;
123 match in_progress.entry(req) {
124 Entry::Occupied(mut e) => {
125 // If the object is already being fetched, add our callback for the fetching
126 // task to execute upon completion.
127 e.get_mut().extend(callbacks);
128 tracing::info!(?req, callbacks = ?e.get(), "resource is already being fetched");
129 return;
130 },
131 Entry::Vacant(e) => {
132 // If the object is not being fetched, we will register our own callback and
133 // then fetch it ourselves.
134 e.insert(callbacks.into_iter().collect());
135 },
136 }
137 }
138
139 // Now we are responsible for fetching the object, reach out to the provider.
140 backoff.reset();
141 let mut delay = backoff.next_backoff().unwrap_or(Duration::from_secs(1));
142 let res = loop {
143 // Acquire a permit from the semaphore to rate limit the number of concurrent fetch requests
144 let permit = permit.acquire().await;
145 if let Some(res) = provider.fetch(req).await {
146 break res;
147 }
148
149 // We only fetch objects which are known to exist, so we should eventually succeed
150 // in fetching if we retry enough. For example, we may be fetching a block from a
151 // peer who hasn't received the block yet.
152 //
153 // To understand why it is ok to retry indefinitely, think about manual
154 // intervention: if we don't retry, or retry with a limit, we may require manual
155 // intervention whenever a query service fails to fetch a resource that should exist
156 // and stops retrying, since it now may never receive that resource. With indefinite
157 // fetching, we require manual intervention only when active fetches are
158 // accumulating because a peer which _should_ have the resource isn't providing it.
159 // In this case, we would require manual intervention on the peer anyways.
160 tracing::warn!("failed to fetch {req:?}, will retry in {delay:?}");
161 drop(permit);
162 sleep(delay).await;
163
164 if let Some(next_delay) = backoff.next_backoff() {
165 delay = next_delay;
166 }
167 };
168
169 // Done fetching, remove our lock on the object and execute all callbacks.
170 //
171 // We will keep this lock the whole time we are running the callbacks. We can't release
172 // it earlier because we can't allow another task to register a callback after we have
173 // taken the list of callbacks that we will execute. We also don't want to allow any new
174 // fetches until we have executed the callbacks, because one of the callbacks may store
175 // some resource that obviates the need for another fetch.
176 //
177 // The callbacks may acquire arbitrary locks from this task, while we already hold the
178 // lock on `in_progress`. This is fine because we are always running in a freshly
179 // spawned task. Therefore we know that this task holds no locks _before_ acquiring
180 // `in_progress`, and so it is safe to acquire any lock _after_ acquiring `in_progress`.
181 let mut in_progress = in_progress.lock().await;
182 let callbacks = in_progress.remove(&req).unwrap_or_default();
183 for callback in callbacks {
184 callback.run(res.clone()).await;
185 }
186 });
187 }
188}
189
190/// Added type safety for objects which are fetched in batches.
191///
192/// A [`NonEmptyRange`] has a similar interface as a [`Vec`], but it enforces, via the methods with
193/// which it can be constructed, that the data it contains is always
194/// * at least one object of type `T`
195/// * a contiguous range of objects ordered by increasing [`height`](HeightIndexed::height).
196#[derive(Clone, Debug, Into, Deserialize, Serialize, PartialEq, Eq)]
197#[serde(
198 // Important: use `try_from` when deserializing so that we perform the necessary invariant
199 // checks.
200 try_from = "Vec<T>",
201 into = "Vec<T>",
202 bound(
203 deserialize = "T: HeightIndexed + serde::de::DeserializeOwned",
204 serialize = "T: Clone + Serialize"
205 )
206)]
207pub struct NonEmptyRange<T>(Vec<T>);
208
209impl<T> NonEmptyRange<T>
210where
211 T: HeightIndexed,
212{
213 /// Construct a [`NonEmptyRange`] from a sequence of elements.
214 ///
215 /// # Errors
216 ///
217 /// This constructor will fail if the given sequence is empty, or if its elements do not
218 /// represent a contiguous range by height.
219 pub fn new(elems: impl IntoIterator<Item = T>) -> anyhow::Result<Self> {
220 elems.into_iter().collect::<Vec<_>>().try_into()
221 }
222
223 /// The inclusive lower bound of the range of heights of objects in this [`NonEmptyRange`].
224 pub fn start(&self) -> u64 {
225 self.0[0].height()
226 }
227
228 /// The exclusive upper bound of the range of heights of objects in this [`NonEmptyRange`].
229 pub fn end(&self) -> u64 {
230 self.start() + (self.len() as u64)
231 }
232
233 /// The number of objects in this [`NonEmptyRange`].
234 pub fn len(&self) -> usize {
235 self.0.len()
236 }
237
238 /// Whether the [`NonEmptyRange`] is empty.
239 ///
240 /// This function always returns `false`. It is included only because it is idiomatically
241 /// paired with [`Self::len`], as demanded by Clippy.
242 pub fn is_empty(&self) -> bool {
243 false
244 }
245
246 pub fn iter(&self) -> std::slice::Iter<'_, T> {
247 self.0.iter()
248 }
249
250 /// Convert a range of objects into an equivalent range of sub-objects with the same heights.
251 pub(crate) fn as_ref_cloned<U>(&self) -> NonEmptyRange<U>
252 where
253 T: AsRef<U>,
254 U: Clone,
255 {
256 NonEmptyRange(self.0.iter().map(|t| t.as_ref().clone()).collect())
257 }
258}
259
260impl<T> TryFrom<Vec<T>> for NonEmptyRange<T>
261where
262 T: HeightIndexed,
263{
264 type Error = anyhow::Error;
265
266 fn try_from(elems: Vec<T>) -> Result<Self, Self::Error> {
267 ensure!(
268 !elems.is_empty(),
269 "cannot construct a non-empty range from an empty vector"
270 );
271 for (x, y) in elems.iter().zip(&elems[1..]) {
272 ensure!(
273 x.height() + 1 == y.height(),
274 "cannot construct a non-empty range from a non-contiguous vector"
275 );
276 }
277 Ok(Self(elems))
278 }
279}
280
281impl<T> PartialEq<Vec<T>> for NonEmptyRange<T>
282where
283 T: PartialEq,
284{
285 fn eq(&self, other: &Vec<T>) -> bool {
286 self.0.eq(other)
287 }
288}
289
290impl<T> IntoIterator for NonEmptyRange<T> {
291 type IntoIter = <Vec<T> as IntoIterator>::IntoIter;
292 type Item = T;
293
294 fn into_iter(self) -> Self::IntoIter {
295 self.0.into_iter()
296 }
297}
298
299impl<'a, T> IntoIterator for &'a NonEmptyRange<T> {
300 type IntoIter = <&'a Vec<T> as IntoIterator>::IntoIter;
301 type Item = &'a T;
302
303 fn into_iter(self) -> Self::IntoIter {
304 self.0.iter()
305 }
306}
307
308impl<T> AsRef<[T]> for NonEmptyRange<T> {
309 fn as_ref(&self) -> &[T] {
310 self.0.as_ref()
311 }
312}