Skip to main content

hotshot_query_service/fetching/provider/
any.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use std::{fmt::Debug, sync::Arc};
14
15use async_trait::async_trait;
16use derivative::Derivative;
17use hotshot_types::{data::VidCommon, traits::node_implementation::NodeType};
18
19use super::{Provider, Request};
20use crate::{
21    Payload,
22    availability::{BlockQueryData, Certificate2, LeafQueryData, VidCommonQueryData},
23    data_source::AvailabilityProvider,
24    fetching::{
25        NonEmptyRange,
26        request::{
27            BlockRangeRequest, Certificate2Request, LeafRangeRequest, LeafRequest, PayloadRequest,
28            VidCommonRangeRequest, VidCommonRequest,
29        },
30    },
31};
32
33/// Blanket trait combining [`Debug`] and [`Provider`].
34///
35/// This is necessary to create a fetcher trait object (`dyn Provider`, see [`PayloadProvider`] and
36/// [`LeafProvider`]) which also implements [`Debug`], since trait objects can only have one
37/// non-auto trait bound.
38trait DebugProvider<Types, T>: Provider<Types, T> + Debug
39where
40    Types: NodeType,
41    T: Request<Types>,
42{
43}
44
45impl<Types, T, P> DebugProvider<Types, T> for P
46where
47    Types: NodeType,
48    T: Request<Types>,
49    P: Provider<Types, T> + Debug,
50{
51}
52
53type PayloadProvider<Types> = Arc<dyn DebugProvider<Types, PayloadRequest>>;
54type PayloadRangeProvider<Types> = Arc<dyn DebugProvider<Types, BlockRangeRequest>>;
55type LeafProvider<Types> = Arc<dyn DebugProvider<Types, LeafRequest>>;
56type LeafRangeProvider<Types> = Arc<dyn DebugProvider<Types, LeafRangeRequest>>;
57type VidCommonProvider<Types> = Arc<dyn DebugProvider<Types, VidCommonRequest>>;
58type VidCommonRangeProvider<Types> = Arc<dyn DebugProvider<Types, VidCommonRangeRequest>>;
59type Cert2Provider<Types> = Arc<dyn DebugProvider<Types, Certificate2Request>>;
60
61/// Adaptor combining multiple data availability providers.
62///
63/// This provider adaptor implements the [`Provider`](super::Provider) protocol by fetching
64/// requested objects from several different underlying providers. If any of the underlying sources
65/// have the object, the request will eventually succeed.
66///
67/// This can be used to combine multiple instances of the same kind of provider, like using
68/// [`TrustedQueryServiceProvider`](super::TrustedQueryServiceProvider) to request objects from a
69/// number of different query services. It can also be used to search different kinds of data
70/// providers for the same object, like searching for a block both in another instance of the query
71/// service and in the HotShot DA committee. Finally, [`AnyProvider`] can be used to combine a
72/// provider which only provides blocks and one which only provides leaves into a provider which
73/// provides both, and thus can be used as a provider for the availability API module.
74///
75/// # Examples
76///
77/// Fetching from multiple query services, for resiliency.
78///
79/// ```
80/// # use vbs::version::StaticVersionType;
81/// # use hotshot_types::traits::node_implementation::NodeType;
82/// # async fn doc<Types>() -> anyhow::Result<()>
83/// # where
84/// #   Types: NodeType,
85/// # {
86/// use hotshot_query_service::{
87///     fetching::provider::{AnyProvider, TrustedQueryServiceProvider},
88///     testing::mocks::MockBase,
89/// };
90///
91/// let qs1 = TrustedQueryServiceProvider::new("https://backup.query-service.1".parse()?, MockBase::instance());
92/// let qs2 = TrustedQueryServiceProvider::new("https://backup.query-service.2".parse()?, MockBase::instance());
93/// let provider = AnyProvider::<Types>::default()
94///     .with_provider(qs1)
95///     .with_provider(qs2);
96/// # Ok(())
97/// # }
98/// ```
99#[derive(Derivative)]
100#[derivative(Clone(bound = ""), Debug(bound = ""), Default(bound = ""))]
101pub struct AnyProvider<Types>
102where
103    Types: NodeType,
104{
105    payload_providers: Vec<PayloadProvider<Types>>,
106    payload_range_providers: Vec<PayloadRangeProvider<Types>>,
107    leaf_providers: Vec<LeafProvider<Types>>,
108    leaf_range_providers: Vec<LeafRangeProvider<Types>>,
109    vid_common_providers: Vec<VidCommonProvider<Types>>,
110    vid_common_range_providers: Vec<VidCommonRangeProvider<Types>>,
111    cert2_providers: Vec<Cert2Provider<Types>>,
112}
113
114#[async_trait]
115impl<Types> Provider<Types, PayloadRequest> for AnyProvider<Types>
116where
117    Types: NodeType,
118{
119    async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
120        any_fetch(&self.payload_providers, req).await
121    }
122}
123
124#[async_trait]
125impl<Types> Provider<Types, BlockRangeRequest> for AnyProvider<Types>
126where
127    Types: NodeType,
128{
129    async fn fetch(&self, req: BlockRangeRequest) -> Option<NonEmptyRange<BlockQueryData<Types>>> {
130        any_fetch(&self.payload_range_providers, req).await
131    }
132}
133
134#[async_trait]
135impl<Types> Provider<Types, LeafRequest> for AnyProvider<Types>
136where
137    Types: NodeType,
138{
139    async fn fetch(&self, req: LeafRequest) -> Option<LeafQueryData<Types>> {
140        any_fetch(&self.leaf_providers, req).await
141    }
142}
143
144#[async_trait]
145impl<Types> Provider<Types, LeafRangeRequest> for AnyProvider<Types>
146where
147    Types: NodeType,
148{
149    async fn fetch(&self, req: LeafRangeRequest) -> Option<NonEmptyRange<LeafQueryData<Types>>> {
150        any_fetch(&self.leaf_range_providers, req).await
151    }
152}
153
154#[async_trait]
155impl<Types> Provider<Types, VidCommonRequest> for AnyProvider<Types>
156where
157    Types: NodeType,
158{
159    async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
160        any_fetch(&self.vid_common_providers, req).await
161    }
162}
163
164#[async_trait]
165impl<Types> Provider<Types, VidCommonRangeRequest> for AnyProvider<Types>
166where
167    Types: NodeType,
168{
169    async fn fetch(
170        &self,
171        req: VidCommonRangeRequest,
172    ) -> Option<NonEmptyRange<VidCommonQueryData<Types>>> {
173        any_fetch(&self.vid_common_range_providers, req).await
174    }
175}
176
177#[async_trait]
178impl<Types> Provider<Types, Certificate2Request> for AnyProvider<Types>
179where
180    Types: NodeType,
181{
182    async fn fetch(&self, req: Certificate2Request) -> Option<Option<Certificate2<Types>>> {
183        any_fetch(&self.cert2_providers, req).await
184    }
185}
186
187impl<Types> AnyProvider<Types>
188where
189    Types: NodeType,
190{
191    /// Add a sub-provider which fetches both blocks and leaves.
192    pub fn with_provider<P>(mut self, provider: P) -> Self
193    where
194        P: AvailabilityProvider<Types> + Debug + 'static,
195    {
196        let provider = Arc::new(provider);
197        self.payload_providers.push(provider.clone());
198        self.payload_range_providers.push(provider.clone());
199        self.leaf_providers.push(provider.clone());
200        self.leaf_range_providers.push(provider.clone());
201        self.vid_common_providers.push(provider.clone());
202        self.vid_common_range_providers.push(provider.clone());
203        self.cert2_providers.push(provider);
204        self
205    }
206
207    /// Add a sub-provider which fetches blocks.
208    pub fn with_block_provider<P>(mut self, provider: P) -> Self
209    where
210        P: Provider<Types, PayloadRequest> + Debug + 'static,
211    {
212        self.payload_providers.push(Arc::new(provider));
213        self
214    }
215
216    /// Add a sub-provider which fetches block ranges.
217    pub fn with_block_range_provider<P>(mut self, provider: P) -> Self
218    where
219        P: Provider<Types, BlockRangeRequest> + Debug + 'static,
220    {
221        self.payload_range_providers.push(Arc::new(provider));
222        self
223    }
224
225    /// Add a sub-provider which fetches leaves.
226    pub fn with_leaf_provider<P>(mut self, provider: P) -> Self
227    where
228        P: Provider<Types, LeafRequest> + Debug + 'static,
229    {
230        self.leaf_providers.push(Arc::new(provider));
231        self
232    }
233
234    /// Add a sub-provider which fetches leaf ranges.
235    pub fn with_leaf_range_provider<P>(mut self, provider: P) -> Self
236    where
237        P: Provider<Types, LeafRangeRequest> + Debug + 'static,
238    {
239        self.leaf_range_providers.push(Arc::new(provider));
240        self
241    }
242
243    /// Add a sub-provider which fetches VID common data.
244    pub fn with_vid_common_provider<P>(mut self, provider: P) -> Self
245    where
246        P: Provider<Types, VidCommonRequest> + Debug + 'static,
247    {
248        self.vid_common_providers.push(Arc::new(provider));
249        self
250    }
251
252    /// Add a sub-provider which fetches VID common ranges.
253    pub fn with_vid_common_range_provider<P>(mut self, provider: P) -> Self
254    where
255        P: Provider<Types, VidCommonRangeRequest> + Debug + 'static,
256    {
257        self.vid_common_range_providers.push(Arc::new(provider));
258        self
259    }
260}
261
262async fn any_fetch<Types, P, T>(providers: &[Arc<P>], req: T) -> Option<T::Response>
263where
264    Types: NodeType,
265    P: Provider<Types, T> + Debug + ?Sized,
266    T: Request<Types>,
267{
268    // There's a policy question of how to decide when to try each fetcher: all in parallel, in
269    // serial, or a combination. For now, we do the simplest thing of trying each in order, in
270    // serial. This has the best performance in the common case when we succeed on the first
271    // fetcher: low latency, and no undue burden on the other providers. However, a more complicated
272    // strategy where we slowly ramp up the parallelism as more and more requests fail may provide
273    // better worst-case latency.
274    for (i, p) in providers.iter().enumerate() {
275        match p.fetch(req).await {
276            Some(obj) => return Some(obj),
277            None => {
278                tracing::debug!(
279                    "failed to fetch {req:?} from provider {i}/{}: {p:?}",
280                    providers.len()
281                );
282                continue;
283            },
284        }
285    }
286
287    tracing::warn!(
288        "failed to fetch {req:?} from all {} providers",
289        providers.len()
290    );
291
292    None
293}
294
295// These tests run the `postgres` Docker image, which doesn't work on Windows.
296#[cfg(all(test, not(target_os = "windows")))]
297mod test {
298    use futures::stream::StreamExt;
299    use test_utils::reserve_tcp_port;
300    use tide_disco::App;
301    use vbs::version::StaticVersionType;
302
303    use super::*;
304    use crate::{
305        ApiState, Error,
306        availability::{AvailabilityDataSource, UpdateAvailabilityData, define_api},
307        data_source::storage::sql::testing::TmpDb,
308        fetching::provider::{NoFetching, TrustedQueryServiceProvider},
309        task::BackgroundTask,
310        testing::{
311            consensus::{MockDataSource, MockNetwork},
312            mocks::{MockBase, MockTypes},
313        },
314        types::HeightIndexed,
315    };
316
317    type Provider = AnyProvider<MockTypes>;
318
319    #[test_log::test(tokio::test(flavor = "multi_thread"))]
320    async fn test_fetch_first_provider_fails() {
321        // Create the consensus network.
322        let mut network = MockNetwork::<MockDataSource>::init().await;
323
324        // Start a web server that the non-consensus node can use to fetch blocks.
325        let port = reserve_tcp_port().unwrap();
326        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
327        app.register_module(
328            "availability",
329            define_api(
330                &Default::default(),
331                MockBase::instance(),
332                "1.0.0".parse().unwrap(),
333            )
334            .unwrap(),
335        )
336        .unwrap();
337        let _server = BackgroundTask::spawn(
338            "server",
339            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
340        );
341
342        // Start a data source which is not receiving events from consensus, only from a peer.
343        let db = TmpDb::init().await;
344        let provider = Provider::default().with_provider(NoFetching).with_provider(
345            TrustedQueryServiceProvider::new(
346                format!("http://localhost:{port}").parse().unwrap(),
347                MockBase::instance(),
348            ),
349        );
350        let data_source = db.config().connect(provider.clone()).await.unwrap();
351
352        // Start consensus.
353        network.start().await;
354
355        // Wait until the block height reaches 4. This gives us the genesis block, one additional
356        // block at the end, and then one block each for fetching a leaf and a payload.
357        let leaves = network.data_source().subscribe_leaves(1).await;
358        let leaves = leaves.take(3).collect::<Vec<_>>().await;
359        let test_leaf = &leaves[0];
360        let test_payload = &leaves[1];
361
362        // Give the node a leaf after the range of interest so it learns about the correct block
363        // height.
364        data_source
365            .append(leaves.last().cloned().unwrap().into())
366            .await
367            .unwrap();
368
369        tracing::info!("requesting leaf from multiple providers");
370        let leaf = data_source
371            .get_leaf(test_leaf.height() as usize)
372            .await
373            .await;
374        assert_eq!(leaf, *test_leaf);
375
376        tracing::info!("requesting payload from multiple providers");
377        let payload = data_source
378            .get_payload(test_payload.height() as usize)
379            .await
380            .await;
381        assert_eq!(payload.height(), test_payload.height());
382        assert_eq!(payload.block_hash(), test_payload.block_hash());
383        assert_eq!(payload.hash(), test_payload.payload_hash());
384    }
385}