Skip to main content

hotshot_query_service/fetching/provider/
query_service.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use std::fmt::Debug;
14
15use anyhow::{Context, ensure};
16use async_trait::async_trait;
17use futures::{TryFutureExt, future::try_join_all};
18use hotshot_types::{data::VidCommon, traits::node_implementation::NodeType};
19use surf_disco::{Client, Url};
20use vbs::version::StaticVersionType;
21
22use super::Provider;
23use crate::{
24    Error, Payload,
25    availability::{BlockQueryData, Certificate2, LeafQueryData, VidCommonQueryData},
26    fetching::{
27        NonEmptyRange,
28        request::{
29            BlockRangeRequest, Certificate2Request, LeafRangeRequest, LeafRequest, PayloadRequest,
30            VidCommonRangeRequest, VidCommonRequest,
31        },
32    },
33    types::HeightIndexed,
34};
35
36/// Data availability provider backed by another instance of this query service.
37///
38/// This fetcher implements the [`Provider`] interface by querying the REST API provided by another
39/// instance of this query service to try and retrieve missing objects.
40///
41/// This provider trusts the external query service is connected to: it does not verify the
42/// responses it receives. Instantiating this provider adds a trust assumption not only on the
43/// external service, but on the correctness of this node's local TLS configuration, etc.: anything
44/// needed to ensure that the HTTP client is connected to the intended, trusted server.
45///
46/// To avoid adding additional trust dependencies, do not use this provider; instead, use a provider
47/// implementation that verifies the data it receives, for example using a light client for the
48/// protocol.
49#[derive(Clone, Debug)]
50pub struct TrustedQueryServiceProvider<Ver: StaticVersionType> {
51    client: Client<Error, Ver>,
52}
53
54impl<Ver: StaticVersionType> TrustedQueryServiceProvider<Ver> {
55    pub fn new(url: Url, _: Ver) -> Self {
56        Self {
57            client: Client::new(url),
58        }
59    }
60}
61
62impl<Ver: StaticVersionType> TrustedQueryServiceProvider<Ver> {
63    pub async fn fetch_payload<Types: NodeType>(
64        &self,
65        req: PayloadRequest,
66    ) -> anyhow::Result<Payload<Types>> {
67        let block = self
68            .client
69            .get::<BlockQueryData<Types>>(&format!("availability/block/payload-hash/{}", req.0))
70            .send()
71            .await
72            .context("fetching block")?;
73
74        Ok(block.payload)
75    }
76
77    pub async fn fetch_payload_range<Types: NodeType>(
78        &self,
79        req: BlockRangeRequest,
80    ) -> anyhow::Result<NonEmptyRange<BlockQueryData<Types>>> {
81        let blocks = self
82            .client
83            .get::<NonEmptyRange<BlockQueryData<Types>>>(&format!(
84                "availability/block/{}/{}",
85                req.start, req.end
86            ))
87            .send()
88            .await
89            .context("fetching blocks")?;
90
91        ensure!(
92            blocks.start() == req.start && blocks.end() == req.end,
93            "wrong block range ({}..{})",
94            blocks.start(),
95            blocks.end()
96        );
97
98        Ok(blocks)
99    }
100
101    fn handle_result<R: Debug, T>(&self, req: R, res: anyhow::Result<T>) -> Option<T> {
102        match res {
103            Ok(res) => Some(res),
104            Err(err) => {
105                tracing::warn!(upstream = %self.client.base_url(), ?req, "failed to fetch: {err:#}");
106                None
107            },
108        }
109    }
110}
111
112#[async_trait]
113impl<Types, Ver: StaticVersionType> Provider<Types, PayloadRequest>
114    for TrustedQueryServiceProvider<Ver>
115where
116    Types: NodeType,
117{
118    async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
119        self.handle_result(req, self.fetch_payload::<Types>(req).await)
120    }
121}
122
123#[async_trait]
124impl<Types, Ver: StaticVersionType> Provider<Types, BlockRangeRequest>
125    for TrustedQueryServiceProvider<Ver>
126where
127    Types: NodeType,
128{
129    async fn fetch(&self, req: BlockRangeRequest) -> Option<NonEmptyRange<BlockQueryData<Types>>> {
130        self.handle_result(req, self.fetch_payload_range::<Types>(req).await)
131    }
132}
133
134impl<Ver: StaticVersionType> TrustedQueryServiceProvider<Ver> {
135    pub async fn fetch_leaf<Types: NodeType>(
136        &self,
137        req: LeafRequest,
138    ) -> anyhow::Result<LeafQueryData<Types>> {
139        let leaf = self
140            .client
141            .get::<LeafQueryData<Types>>(&format!("availability/leaf/{}", req.height))
142            .send()
143            .await
144            .context("fetching leaf")?;
145
146        ensure!(
147            leaf.height() == req.height,
148            "received leaf with the wrong height ({})",
149            leaf.height(),
150        );
151
152        Ok(leaf)
153    }
154
155    pub async fn fetch_leaf_range<Types: NodeType>(
156        &self,
157        req: LeafRangeRequest,
158    ) -> anyhow::Result<NonEmptyRange<LeafQueryData<Types>>> {
159        let leaves = self
160            .client
161            .get::<NonEmptyRange<LeafQueryData<Types>>>(&format!(
162                "availability/leaf/{}/{}",
163                req.start, req.end
164            ))
165            .send()
166            .await
167            .context("fetching leaf chain")?;
168
169        ensure!(
170            leaves.start() == req.start && leaves.end() == req.end,
171            "server returned wrong range of leaves ({}..{})",
172            leaves.start(),
173            leaves.end()
174        );
175
176        Ok(leaves)
177    }
178}
179
180#[async_trait]
181impl<Types, Ver: StaticVersionType> Provider<Types, LeafRequest>
182    for TrustedQueryServiceProvider<Ver>
183where
184    Types: NodeType,
185{
186    async fn fetch(&self, req: LeafRequest) -> Option<LeafQueryData<Types>> {
187        self.handle_result(req, self.fetch_leaf(req).await)
188    }
189}
190
191#[async_trait]
192impl<Types, Ver: StaticVersionType> Provider<Types, LeafRangeRequest>
193    for TrustedQueryServiceProvider<Ver>
194where
195    Types: NodeType,
196{
197    async fn fetch(&self, req: LeafRangeRequest) -> Option<NonEmptyRange<LeafQueryData<Types>>> {
198        self.handle_result(req, self.fetch_leaf_range(req).await)
199    }
200}
201
202impl<Ver: StaticVersionType> TrustedQueryServiceProvider<Ver> {
203    /// Fetch a cert2 from a peer at the given height.
204    pub async fn fetch_cert2<Types: NodeType>(
205        &self,
206        height: u64,
207    ) -> anyhow::Result<Option<Certificate2<Types>>> {
208        self.client
209            .get(&format!("availability/cert2/{height}"))
210            .send()
211            .await
212            .context("fetching cert2")
213    }
214}
215
216#[async_trait]
217impl<Types, Ver: StaticVersionType> Provider<Types, Certificate2Request>
218    for TrustedQueryServiceProvider<Ver>
219where
220    Types: NodeType,
221{
222    async fn fetch(&self, req: Certificate2Request) -> Option<Option<Certificate2<Types>>> {
223        self.handle_result(req, self.fetch_cert2(req.height).await)
224    }
225}
226
227impl<Ver: StaticVersionType> TrustedQueryServiceProvider<Ver> {
228    pub async fn fetch_vid_common<Types: NodeType>(
229        &self,
230        req: VidCommonRequest,
231    ) -> anyhow::Result<VidCommon> {
232        let res = self
233            .client
234            .get::<VidCommonQueryData<Types>>(&format!(
235                "availability/vid/common/payload-hash/{}",
236                req.0
237            ))
238            .send()
239            .await
240            .context("fetching VID common")?;
241
242        Ok(res.common)
243    }
244
245    async fn fetch_vid_common_range_with_fallback<Types: NodeType>(
246        &self,
247        start: u64,
248        end: u64,
249    ) -> anyhow::Result<NonEmptyRange<VidCommonQueryData<Types>>> {
250        let res = self
251            .client
252            .get::<NonEmptyRange<VidCommonQueryData<Types>>>(&format!(
253                "availability/vid/common/{start}/{end}",
254            ))
255            .send()
256            .await;
257        match res {
258            Ok(common) => Ok(common),
259            Err(Error::Custom { message, .. }) if message.contains("No route matches") => {
260                // Old versions of the upstream query service do not support the ranged VID common
261                // endpoint. Fall back to fetching each object individually.
262                tracing::info!(
263                    start,
264                    end,
265                    "server does not support ranged VID fetching, falling back to individual \
266                     fetches"
267                );
268                let common = try_join_all((start..end).map(|i| {
269                    self.client
270                        .get::<VidCommonQueryData<Types>>(&format!("availability/vid/common/{i}"))
271                        .send()
272                        .map_err(move |err| {
273                            anyhow::Error::new(err).context(format!("fetching VID common {i}"))
274                        })
275                }))
276                .await?;
277                NonEmptyRange::new(common)
278                    .context("converting individually fetched VID common into range")
279            },
280            Err(err) => Err(err).context("fetching VID common range"),
281        }
282    }
283
284    pub async fn fetch_vid_common_range<Types: NodeType>(
285        &self,
286        req: VidCommonRangeRequest,
287    ) -> anyhow::Result<NonEmptyRange<VidCommonQueryData<Types>>> {
288        let common = self
289            .fetch_vid_common_range_with_fallback(req.start, req.end)
290            .await?;
291
292        ensure!(
293            common.start() == req.start && common.end() == req.end,
294            "server returned wrong VID common ({}..{})",
295            common.start(),
296            common.end()
297        );
298
299        Ok(common)
300    }
301}
302
303#[async_trait]
304impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRequest>
305    for TrustedQueryServiceProvider<Ver>
306where
307    Types: NodeType,
308{
309    async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
310        self.handle_result(req, self.fetch_vid_common::<Types>(req).await)
311    }
312}
313
314#[async_trait]
315impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRangeRequest>
316    for TrustedQueryServiceProvider<Ver>
317where
318    Types: NodeType,
319{
320    async fn fetch(
321        &self,
322        req: VidCommonRangeRequest,
323    ) -> Option<NonEmptyRange<VidCommonQueryData<Types>>> {
324        self.handle_result(req, self.fetch_vid_common_range::<Types>(req).await)
325    }
326}
327
328// These tests run the `postgres` Docker image, which doesn't work on Windows.
329#[cfg(all(test, not(target_os = "windows")))]
330mod test {
331    use std::{future::IntoFuture, time::Duration};
332
333    use committable::Committable;
334    use futures::{
335        future::{FutureExt, join},
336        stream::StreamExt,
337    };
338    use hotshot_example_types::node_types::{EpochVersion, TEST_VERSIONS};
339    use test_utils::reserve_tcp_port;
340    use tide_disco::{Api, App};
341    use toml::toml;
342    use vbs::version::StaticVersion;
343
344    use super::*;
345    use crate::{
346        ApiState,
347        availability::{
348            self, AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction,
349            Fetch, UpdateAvailabilityData, define_api,
350        },
351        data_source::{
352            AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource,
353            sql::{self, SqlDataSource},
354            storage::{
355                AvailabilityStorage, SqlStorage, StorageConnectionType, UpdateAvailabilityStorage,
356                fail_storage::{FailStorage, FailableAction},
357                pruning::{PrunedHeightStorage, PrunerCfg},
358                sql::testing::TmpDb,
359            },
360        },
361        fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
362        node::data_source::NodeDataSource,
363        task::BackgroundTask,
364        testing::{
365            consensus::{MockDataSource, MockNetwork},
366            mocks::{MockBase, MockTypes, mock_transaction},
367            sleep,
368        },
369        types::HeightIndexed,
370    };
371
372    type Provider = TestProvider<TrustedQueryServiceProvider<MockBase>>;
373    type EpochProvider = TestProvider<TrustedQueryServiceProvider<EpochVersion>>;
374
375    fn ignore<T>(_: T) {}
376
377    /// Build a data source suitable for this suite of tests.
378    async fn builder<P: AvailabilityProvider<MockTypes> + Clone>(
379        db: &TmpDb,
380        provider: &P,
381    ) -> sql::Builder<MockTypes, P> {
382        db.config()
383            .builder((*provider).clone())
384            .await
385            .unwrap()
386            // We disable proactive fetching for these tests, since we are intending to test on
387            // demand fetching, and proactive fetching could lead to false successes.
388            .disable_proactive_fetching()
389    }
390
391    /// A data source suitable for this suite of tests, with the default options.
392    async fn data_source<P: AvailabilityProvider<MockTypes> + Clone>(
393        db: &TmpDb,
394        provider: &P,
395    ) -> SqlDataSource<MockTypes, P> {
396        builder(db, provider).await.build().await.unwrap()
397    }
398
399    #[test_log::test(tokio::test(flavor = "multi_thread"))]
400    async fn test_fetch_on_request() {
401        // Create the consensus network.
402        let mut network = MockNetwork::<MockDataSource>::init().await;
403
404        // Start a web server that the non-consensus node can use to fetch blocks.
405        let port = reserve_tcp_port().unwrap();
406        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
407        app.register_module(
408            "availability",
409            define_api(
410                &Default::default(),
411                MockBase::instance(),
412                "1.0.0".parse().unwrap(),
413            )
414            .unwrap(),
415        )
416        .unwrap();
417        network.spawn(
418            "server",
419            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
420        );
421
422        // Start a data source which is not receiving events from consensus, only from a peer.
423        let db = TmpDb::init().await;
424        let provider = Provider::new(TrustedQueryServiceProvider::new(
425            format!("http://localhost:{port}").parse().unwrap(),
426            MockBase::instance(),
427        ));
428        let data_source = data_source(&db, &provider).await;
429
430        // Start consensus.
431        network.start().await;
432
433        // Wait until the block height reaches 6. This gives us the genesis block, one additional
434        // block at the end, and then one block to play around with fetching each type of resource:
435        // * Leaf
436        // * Block
437        // * Payload
438        // * VID common
439        let leaves = network.data_source().subscribe_leaves(1).await;
440        let leaves = leaves.take(5).collect::<Vec<_>>().await;
441        let test_leaf = &leaves[0];
442        let test_block = &leaves[1];
443        let test_payload = &leaves[2];
444        let test_common = &leaves[3];
445
446        // Make requests for missing data that should _not_ trigger an active fetch:
447        tracing::info!("requesting unfetchable resources");
448        let mut fetches = vec![];
449        // * An unknown leaf hash.
450        fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
451        // * An unknown leaf height.
452        fetches.push(
453            data_source
454                .get_leaf(test_leaf.height() as usize)
455                .await
456                .map(ignore),
457        );
458        // * An unknown block hash.
459        fetches.push(
460            data_source
461                .get_block(test_block.block_hash())
462                .await
463                .map(ignore),
464        );
465        fetches.push(
466            data_source
467                .get_payload(test_payload.block_hash())
468                .await
469                .map(ignore),
470        );
471        fetches.push(
472            data_source
473                .get_vid_common(test_common.block_hash())
474                .await
475                .map(ignore),
476        );
477        // * An unknown block height.
478        fetches.push(
479            data_source
480                .get_block(test_block.height() as usize)
481                .await
482                .map(ignore),
483        );
484        fetches.push(
485            data_source
486                .get_payload(test_payload.height() as usize)
487                .await
488                .map(ignore),
489        );
490        fetches.push(
491            data_source
492                .get_vid_common(test_common.height() as usize)
493                .await
494                .map(ignore),
495        );
496        // * Genesis VID common (no VID for genesis)
497        fetches.push(data_source.get_vid_common(0).await.map(ignore));
498        // * An unknown transaction.
499        fetches.push(
500            data_source
501                .get_block_containing_transaction(mock_transaction(vec![]).commit())
502                .await
503                .map(ignore),
504        );
505
506        // Even if we give data extra time to propagate, these requests will not resolve, since we
507        // didn't trigger any active fetches.
508        sleep(Duration::from_secs(1)).await;
509        for (i, fetch) in fetches.into_iter().enumerate() {
510            tracing::info!("checking fetch {i} is unresolved");
511            fetch.try_resolve().unwrap_err();
512        }
513
514        // Now we will actually fetch the missing data. First, since our node is not really
515        // connected to consensus, we need to give it a leaf after the range of interest so it
516        // learns about the correct block height. We will temporarily lock requests to the provider
517        // so that we can verify that without the provider, the node does _not_ get the data.
518        provider.block().await;
519        data_source
520            .append(leaves.last().cloned().unwrap().into())
521            .await
522            .unwrap();
523
524        tracing::info!("requesting fetchable resources");
525        let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
526        let req_block = data_source.get_block(test_block.height() as usize).await;
527        let req_payload = data_source
528            .get_payload(test_payload.height() as usize)
529            .await;
530        let req_common = data_source
531            .get_vid_common(test_common.height() as usize)
532            .await;
533
534        // Give the requests some extra time to complete, and check that they still haven't
535        // resolved, since the provider is blocked. This just ensures the integrity of the test by
536        // checking the node didn't mysteriously get the block from somewhere else, so that when we
537        // unblock the provider and the node finally gets the block, we know it came from the
538        // provider.
539        sleep(Duration::from_secs(1)).await;
540        req_leaf.try_resolve().unwrap_err();
541        req_block.try_resolve().unwrap_err();
542        req_payload.try_resolve().unwrap_err();
543        req_common.try_resolve().unwrap_err();
544
545        // Unblock the request and see that we eventually receive the data.
546        provider.unblock().await;
547        let leaf = data_source
548            .get_leaf(test_leaf.height() as usize)
549            .await
550            .await;
551        let block = data_source
552            .get_block(test_block.height() as usize)
553            .await
554            .await;
555        let payload = data_source
556            .get_payload(test_payload.height() as usize)
557            .await
558            .await;
559        let common = data_source
560            .get_vid_common(test_common.height() as usize)
561            .await
562            .await;
563        {
564            // Verify the data.
565            let truth = network.data_source();
566            assert_eq!(
567                leaf,
568                truth.get_leaf(test_leaf.height() as usize).await.await
569            );
570            assert_eq!(
571                block,
572                truth.get_block(test_block.height() as usize).await.await
573            );
574            assert_eq!(
575                payload,
576                truth
577                    .get_payload(test_payload.height() as usize)
578                    .await
579                    .await
580            );
581            assert_eq!(
582                common,
583                truth
584                    .get_vid_common(test_common.height() as usize)
585                    .await
586                    .await
587            );
588        }
589
590        // Fetching the block and payload should have also fetched the corresponding leaves, since
591        // we have an invariant that we should not store a block in the database without its
592        // corresponding leaf and header. Thus we should be able to get the leaves even if the
593        // provider is blocked.
594        provider.block().await;
595        for leaf in [test_block, test_payload] {
596            tracing::info!("fetching existing leaf {}", leaf.height());
597            let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
598            assert_eq!(*leaf, fetched_leaf);
599        }
600
601        // On the other hand, fetching the block corresponding to `leaf` _will_ trigger a fetch,
602        // since fetching a leaf does not necessarily fetch the corresponding block. We can fetch by
603        // hash now, since the presence of the corresponding leaf allows us to confirm that a block
604        // with this hash exists, and trigger a fetch for it.
605        tracing::info!("fetching block by hash");
606        provider.unblock().await;
607        {
608            let block = data_source.get_block(test_leaf.block_hash()).await.await;
609            assert_eq!(block.hash(), leaf.block_hash());
610        }
611
612        // Test a similar scenario, but with payload instead of block: we are aware of
613        // `leaves.last()` but not the corresponding payload, but we can fetch that payload by block
614        // hash.
615        tracing::info!("fetching payload by hash");
616        {
617            let leaf = leaves.last().unwrap();
618            let payload = data_source.get_payload(leaf.block_hash()).await.await;
619            assert_eq!(payload.height(), leaf.height());
620            assert_eq!(payload.block_hash(), leaf.block_hash());
621            assert_eq!(payload.hash(), leaf.payload_hash());
622        }
623    }
624
625    #[tokio::test(flavor = "multi_thread")]
626    async fn test_fetch_on_request_epoch_version() {
627        // This test verifies that our provider can handle fetching things by their hashes,
628        // specifically focused on epoch version transitions
629        tracing::info!("Starting test_fetch_on_request_epoch_version");
630
631        // Create the consensus network.
632        let mut network = MockNetwork::<MockDataSource>::init().await;
633
634        // Start a web server that the non-consensus node can use to fetch blocks.
635        let port = reserve_tcp_port().unwrap();
636        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
637        app.register_module(
638            "availability",
639            define_api(
640                &Default::default(),
641                EpochVersion::instance(),
642                "1.0.0".parse().unwrap(),
643            )
644            .unwrap(),
645        )
646        .unwrap();
647        network.spawn(
648            "server",
649            app.serve(format!("0.0.0.0:{port}"), EpochVersion::instance()),
650        );
651
652        // Start a data source which is not receiving events from consensus, only from a peer.
653        // Use our special test provider that handles epoch version transitions
654        let db = TmpDb::init().await;
655        let provider = EpochProvider::new(TrustedQueryServiceProvider::new(
656            format!("http://localhost:{port}").parse().unwrap(),
657            EpochVersion::instance(),
658        ));
659        let data_source = data_source(&db, &provider).await;
660
661        // Start consensus.
662        network.start().await;
663
664        // Wait until the block height reaches 6. This gives us the genesis block, one additional
665        // block at the end, and then one block to play around with fetching each type of resource:
666        // * Leaf
667        // * Block
668        // * Payload
669        // * VID common
670        let leaves = network.data_source().subscribe_leaves(1).await;
671        let leaves = leaves.take(5).collect::<Vec<_>>().await;
672        let test_leaf = &leaves[0];
673        let test_block = &leaves[1];
674        let test_payload = &leaves[2];
675        let test_common = &leaves[3];
676
677        // Make requests for missing data that should _not_ trigger an active fetch:
678        let mut fetches = vec![];
679        // * An unknown leaf hash.
680        fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
681        // * An unknown leaf height.
682        fetches.push(
683            data_source
684                .get_leaf(test_leaf.height() as usize)
685                .await
686                .map(ignore),
687        );
688        // * An unknown block hash.
689        fetches.push(
690            data_source
691                .get_block(test_block.block_hash())
692                .await
693                .map(ignore),
694        );
695        fetches.push(
696            data_source
697                .get_payload(test_payload.block_hash())
698                .await
699                .map(ignore),
700        );
701        fetches.push(
702            data_source
703                .get_vid_common(test_common.block_hash())
704                .await
705                .map(ignore),
706        );
707        // * An unknown block height.
708        fetches.push(
709            data_source
710                .get_block(test_block.height() as usize)
711                .await
712                .map(ignore),
713        );
714        fetches.push(
715            data_source
716                .get_payload(test_payload.height() as usize)
717                .await
718                .map(ignore),
719        );
720        fetches.push(
721            data_source
722                .get_vid_common(test_common.height() as usize)
723                .await
724                .map(ignore),
725        );
726        // * Genesis VID common (no VID for genesis)
727        fetches.push(data_source.get_vid_common(0).await.map(ignore));
728        // * An unknown transaction.
729        fetches.push(
730            data_source
731                .get_block_containing_transaction(mock_transaction(vec![]).commit())
732                .await
733                .map(ignore),
734        );
735
736        // Even if we give data extra time to propagate, these requests will not resolve, since we
737        // didn't trigger any active fetches.
738        sleep(Duration::from_secs(1)).await;
739        for (i, fetch) in fetches.into_iter().enumerate() {
740            tracing::info!("checking fetch {i} is unresolved");
741            fetch.try_resolve().unwrap_err();
742        }
743
744        // Now we will actually fetch the missing data. First, since our node is not really
745        // connected to consensus, we need to give it a leaf after the range of interest so it
746        // learns about the correct block height. We will temporarily lock requests to the provider
747        // so that we can verify that without the provider, the node does _not_ get the data.
748        provider.block().await;
749        data_source
750            .append(leaves.last().cloned().unwrap().into())
751            .await
752            .unwrap();
753
754        let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
755        let req_block = data_source.get_block(test_block.height() as usize).await;
756        let req_payload = data_source
757            .get_payload(test_payload.height() as usize)
758            .await;
759        let req_common = data_source
760            .get_vid_common(test_common.height() as usize)
761            .await;
762
763        // Give the requests some extra time to complete, and check that they still haven't
764        // resolved, since the provider is blocked. This just ensures the integrity of the test by
765        // checking the node didn't mysteriously get the block from somewhere else, so that when we
766        // unblock the provider and the node finally gets the block, we know it came from the
767        // provider.
768        sleep(Duration::from_secs(1)).await;
769        req_leaf.try_resolve().unwrap_err();
770        req_block.try_resolve().unwrap_err();
771        req_payload.try_resolve().unwrap_err();
772        req_common.try_resolve().unwrap_err();
773
774        // Unblock the request and see that we eventually receive the data.
775        provider.unblock().await;
776        let leaf = data_source
777            .get_leaf(test_leaf.height() as usize)
778            .await
779            .await;
780        let block = data_source
781            .get_block(test_block.height() as usize)
782            .await
783            .await;
784        let payload = data_source
785            .get_payload(test_payload.height() as usize)
786            .await
787            .await;
788        let common = data_source
789            .get_vid_common(test_common.height() as usize)
790            .await
791            .await;
792        {
793            // Verify the data.
794            let truth = network.data_source();
795            assert_eq!(
796                leaf,
797                truth.get_leaf(test_leaf.height() as usize).await.await
798            );
799            assert_eq!(
800                block,
801                truth.get_block(test_block.height() as usize).await.await
802            );
803            assert_eq!(
804                payload,
805                truth
806                    .get_payload(test_payload.height() as usize)
807                    .await
808                    .await
809            );
810            assert_eq!(
811                common,
812                truth
813                    .get_vid_common(test_common.height() as usize)
814                    .await
815                    .await
816            );
817        }
818
819        // Fetching the block and payload should have also fetched the corresponding leaves, since
820        // we have an invariant that we should not store a block in the database without its
821        // corresponding leaf and header. Thus we should be able to get the leaves even if the
822        // provider is blocked.
823        provider.block().await;
824        for leaf in [test_block, test_payload] {
825            tracing::info!("fetching existing leaf {}", leaf.height());
826            let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
827            assert_eq!(*leaf, fetched_leaf);
828        }
829
830        // On the other hand, fetching the block corresponding to `leaf` _will_ trigger a fetch,
831        // since fetching a leaf does not necessarily fetch the corresponding block. We can fetch by
832        // hash now, since the presence of the corresponding leaf allows us to confirm that a block
833        // with this hash exists, and trigger a fetch for it.
834        provider.unblock().await;
835        {
836            let block = data_source.get_block(test_leaf.block_hash()).await.await;
837            assert_eq!(block.hash(), leaf.block_hash());
838        }
839
840        // Test a similar scenario, but with payload instead of block: we are aware of
841        // `leaves.last()` but not the corresponding payload, but we can fetch that payload by block
842        // hash.
843        {
844            let leaf = leaves.last().unwrap();
845            let payload = data_source.get_payload(leaf.block_hash()).await.await;
846            assert_eq!(payload.height(), leaf.height());
847            assert_eq!(payload.block_hash(), leaf.block_hash());
848            assert_eq!(payload.hash(), leaf.payload_hash());
849        }
850
851        // Add more debug logs throughout the test
852        tracing::info!("Test completed successfully!");
853    }
854
855    #[test_log::test(tokio::test(flavor = "multi_thread"))]
856    async fn test_fetch_block_and_leaf_concurrently() {
857        // Create the consensus network.
858        let mut network = MockNetwork::<MockDataSource>::init().await;
859
860        // Start a web server that the non-consensus node can use to fetch blocks.
861        let port = reserve_tcp_port().unwrap();
862        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
863        app.register_module(
864            "availability",
865            define_api(
866                &Default::default(),
867                MockBase::instance(),
868                "1.0.0".parse().unwrap(),
869            )
870            .unwrap(),
871        )
872        .unwrap();
873        network.spawn(
874            "server",
875            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
876        );
877
878        // Start a data source which is not receiving events from consensus, only from a peer.
879        let db = TmpDb::init().await;
880        let provider = Provider::new(TrustedQueryServiceProvider::new(
881            format!("http://localhost:{port}").parse().unwrap(),
882            MockBase::instance(),
883        ));
884        let data_source = data_source(&db, &provider).await;
885
886        // Start consensus.
887        network.start().await;
888
889        // Wait until the block height reaches 3. This gives us the genesis block, one additional
890        // block at the end, and then one block that we can use to test fetching.
891        let leaves = network.data_source().subscribe_leaves(1).await;
892        let leaves = leaves.take(2).collect::<Vec<_>>().await;
893        let test_leaf = &leaves[0];
894
895        // Tell the node about a leaf after the one of interest so it learns about the block height.
896        data_source.append(leaves[1].clone().into()).await.unwrap();
897
898        // Fetch a leaf and the corresponding block at the same time. This will result in two tasks
899        // trying to fetch the same leaf, but one should win and notify the other, which ultimately
900        // ends up not fetching anything.
901        let (leaf, block) = join(
902            data_source
903                .get_leaf(test_leaf.height() as usize)
904                .await
905                .into_future(),
906            data_source
907                .get_block(test_leaf.height() as usize)
908                .await
909                .into_future(),
910        )
911        .await;
912        assert_eq!(leaf, *test_leaf);
913        assert_eq!(leaf.header(), block.header());
914    }
915
916    #[test_log::test(tokio::test(flavor = "multi_thread"))]
917    async fn test_fetch_different_blocks_same_payload() {
918        // Create the consensus network.
919        let mut network = MockNetwork::<MockDataSource>::init().await;
920
921        // Start a web server that the non-consensus node can use to fetch blocks.
922        let port = reserve_tcp_port().unwrap();
923        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
924        app.register_module(
925            "availability",
926            define_api(
927                &Default::default(),
928                MockBase::instance(),
929                "1.0.0".parse().unwrap(),
930            )
931            .unwrap(),
932        )
933        .unwrap();
934        network.spawn(
935            "server",
936            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
937        );
938
939        // Start a data source which is not receiving events from consensus, only from a peer.
940        let db = TmpDb::init().await;
941        let provider = Provider::new(TrustedQueryServiceProvider::new(
942            format!("http://localhost:{port}").parse().unwrap(),
943            MockBase::instance(),
944        ));
945        let data_source = data_source(&db, &provider).await;
946
947        // Start consensus.
948        network.start().await;
949
950        // Wait until the block height reaches 4. This gives us the genesis block, one additional
951        // block at the end, and then two blocks that we can use to test fetching.
952        let leaves = network.data_source().subscribe_leaves(1).await;
953        let leaves = leaves.take(4).collect::<Vec<_>>().await;
954
955        // Tell the node about a leaf after the range of interest so it learns about the block
956        // height.
957        data_source
958            .append(leaves.last().cloned().unwrap().into())
959            .await
960            .unwrap();
961
962        // All the blocks here are empty, so they have the same payload:
963        assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash());
964        // If we fetch both blocks at the same time, we can check that we haven't broken anything
965        // with whatever optimizations we add to deduplicate payload fetching.
966        let (block1, block2) = join(
967            data_source
968                .get_block(leaves[0].height() as usize)
969                .await
970                .into_future(),
971            data_source
972                .get_block(leaves[1].height() as usize)
973                .await
974                .into_future(),
975        )
976        .await;
977        assert_eq!(block1.header(), leaves[0].header());
978        assert_eq!(block2.header(), leaves[1].header());
979    }
980
981    #[test_log::test(tokio::test(flavor = "multi_thread"))]
982    async fn test_fetch_stream() {
983        // Create the consensus network.
984        let mut network = MockNetwork::<MockDataSource>::init().await;
985
986        // Start a web server that the non-consensus node can use to fetch blocks.
987        let port = reserve_tcp_port().unwrap();
988        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
989        app.register_module(
990            "availability",
991            define_api(
992                &Default::default(),
993                MockBase::instance(),
994                "1.0.0".parse().unwrap(),
995            )
996            .unwrap(),
997        )
998        .unwrap();
999        network.spawn(
1000            "server",
1001            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1002        );
1003
1004        // Start a data source which is not receiving events from consensus, only from a peer.
1005        let db = TmpDb::init().await;
1006        let provider = Provider::new(TrustedQueryServiceProvider::new(
1007            format!("http://localhost:{port}").parse().unwrap(),
1008            MockBase::instance(),
1009        ));
1010        let data_source = data_source(&db, &provider).await;
1011
1012        // Start consensus.
1013        network.start().await;
1014
1015        // Subscribe to objects from the future.
1016        let blocks = data_source.subscribe_blocks(0).await;
1017        let leaves = data_source.subscribe_leaves(0).await;
1018        let common = data_source.subscribe_vid_common(0).await;
1019
1020        // Wait for a few blocks to be finalized.
1021        let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1022        let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1023
1024        // Tell the node about a leaf after the range of interest so it learns about the block
1025        // height.
1026        data_source
1027            .append(finalized_leaves.last().cloned().unwrap().into())
1028            .await
1029            .unwrap();
1030
1031        // Check the subscriptions.
1032        let blocks = blocks.take(5).collect::<Vec<_>>().await;
1033        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1034        let common = common.take(5).collect::<Vec<_>>().await;
1035        for i in 0..5 {
1036            tracing::info!("checking block {i}");
1037            assert_eq!(leaves[i], finalized_leaves[i]);
1038            assert_eq!(blocks[i].header(), finalized_leaves[i].header());
1039            assert_eq!(common[i], data_source.get_vid_common(i).await.await);
1040        }
1041    }
1042
1043    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1044    async fn test_fetch_range_start() {
1045        // Create the consensus network.
1046        let mut network = MockNetwork::<MockDataSource>::init().await;
1047
1048        // Start a web server that the non-consensus node can use to fetch blocks.
1049        let port = reserve_tcp_port().unwrap();
1050        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1051        app.register_module(
1052            "availability",
1053            define_api(
1054                &Default::default(),
1055                MockBase::instance(),
1056                "1.0.0".parse().unwrap(),
1057            )
1058            .unwrap(),
1059        )
1060        .unwrap();
1061        network.spawn(
1062            "server",
1063            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1064        );
1065
1066        // Start a data source which is not receiving events from consensus, only from a peer.
1067        let db = TmpDb::init().await;
1068        let provider = Provider::new(TrustedQueryServiceProvider::new(
1069            format!("http://localhost:{port}").parse().unwrap(),
1070            MockBase::instance(),
1071        ));
1072        let data_source = data_source(&db, &provider).await;
1073
1074        // Start consensus.
1075        network.start().await;
1076
1077        // Wait for a few blocks to be finalized.
1078        let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1079        let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1080
1081        // Tell the node about a leaf after the range of interest (so it learns about the block
1082        // height) and one in the middle of the range, to test what happens when data is missing
1083        // from the beginning of the range but other data is available.
1084        let mut tx = data_source.write().await.unwrap();
1085        tx.insert_leaf(&finalized_leaves[2]).await.unwrap();
1086        tx.insert_leaf(&finalized_leaves[4]).await.unwrap();
1087        tx.commit().await.unwrap();
1088
1089        // Get the whole range of leaves.
1090        let leaves = data_source
1091            .get_leaf_range(..5)
1092            .await
1093            .then(Fetch::resolve)
1094            .collect::<Vec<_>>()
1095            .await;
1096        for i in 0..5 {
1097            tracing::info!("checking leaf {i}");
1098            assert_eq!(leaves[i], finalized_leaves[i]);
1099        }
1100    }
1101
1102    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1103    async fn fetch_transaction() {
1104        // Create the consensus network.
1105        let mut network = MockNetwork::<MockDataSource>::init().await;
1106
1107        // Start a web server that the non-consensus node can use to fetch blocks.
1108        let port = reserve_tcp_port().unwrap();
1109        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1110        app.register_module(
1111            "availability",
1112            define_api(
1113                &Default::default(),
1114                MockBase::instance(),
1115                "1.0.0".parse().unwrap(),
1116            )
1117            .unwrap(),
1118        )
1119        .unwrap();
1120        network.spawn(
1121            "server",
1122            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1123        );
1124
1125        // Start a data source which is not receiving events from consensus. We don't give it a
1126        // fetcher since transactions are always fetched passively anyways.
1127        let db = TmpDb::init().await;
1128        let data_source = data_source(&db, &NoFetching).await;
1129
1130        // Subscribe to blocks.
1131        let mut leaves = network.data_source().subscribe_leaves(1).await;
1132        let mut blocks = network.data_source().subscribe_blocks(1).await;
1133
1134        // Start consensus.
1135        network.start().await;
1136
1137        // Subscribe to a transaction which hasn't been sequenced yet. This is completely passive
1138        // and works without a fetcher; we don't trigger fetches for transactions that we don't know
1139        // exist.
1140        let tx = mock_transaction(vec![1, 2, 3]);
1141        let fut = data_source
1142            .get_block_containing_transaction(tx.commit())
1143            .await;
1144
1145        // Sequence the transaction.
1146        network.submit_transaction(tx.clone()).await;
1147
1148        // Send blocks to the query service, the future will resolve as soon as it sees a block
1149        // containing the transaction.
1150        let block = loop {
1151            let leaf = leaves.next().await.unwrap();
1152            let block = blocks.next().await.unwrap();
1153
1154            data_source
1155                .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1156                .await
1157                .unwrap();
1158
1159            if block.transaction_by_hash(tx.commit()).is_some() {
1160                break block;
1161            }
1162        };
1163        tracing::info!("transaction included in block {}", block.height());
1164
1165        let fetched_tx = fut.await;
1166        assert_eq!(
1167            fetched_tx,
1168            BlockWithTransaction::with_hash(block, tx.commit()).unwrap()
1169        );
1170
1171        // Future queries for this transaction resolve immediately.
1172        assert_eq!(
1173            fetched_tx,
1174            data_source
1175                .get_block_containing_transaction(tx.commit())
1176                .await
1177                .await
1178        );
1179    }
1180
1181    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1182    async fn test_retry() {
1183        // Create the consensus network.
1184        let mut network = MockNetwork::<MockDataSource>::init().await;
1185
1186        // Start a web server that the non-consensus node can use to fetch blocks.
1187        let port = reserve_tcp_port().unwrap();
1188        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1189        app.register_module(
1190            "availability",
1191            define_api(
1192                &Default::default(),
1193                MockBase::instance(),
1194                "1.0.0".parse().unwrap(),
1195            )
1196            .unwrap(),
1197        )
1198        .unwrap();
1199        network.spawn(
1200            "server",
1201            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1202        );
1203
1204        // Start a data source which is not receiving events from consensus.
1205        let db = TmpDb::init().await;
1206        let provider = Provider::new(TrustedQueryServiceProvider::new(
1207            format!("http://localhost:{port}").parse().unwrap(),
1208            MockBase::instance(),
1209        ));
1210        let data_source = builder(&db, &provider)
1211            .await
1212            .with_max_retry_interval(Duration::from_secs(1))
1213            .build()
1214            .await
1215            .unwrap();
1216
1217        // Start consensus.
1218        network.start().await;
1219
1220        // Wait until the block height reaches 3. This gives us the genesis block, one additional
1221        // block at the end, and one block to try fetching.
1222        let leaves = network.data_source().subscribe_leaves(1).await;
1223        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1224        let test_leaf = &leaves[0];
1225
1226        // Cause requests to fail temporarily, so we can test retries.
1227        provider.fail();
1228
1229        // Give the node a leaf after the range of interest so it learns about the correct block
1230        // height.
1231        data_source
1232            .append(leaves.last().cloned().unwrap().into())
1233            .await
1234            .unwrap();
1235
1236        tracing::info!("requesting leaf from failing providers");
1237        let fut = data_source.get_leaf(test_leaf.height() as usize).await;
1238
1239        // Wait a few retries and check that the request has not completed, since the provider is
1240        // failing.
1241        sleep(Duration::from_secs(5)).await;
1242        fut.try_resolve().unwrap_err();
1243
1244        // As soon as the provider recovers, the request can complete.
1245        provider.unfail();
1246        assert_eq!(
1247            data_source
1248                .get_leaf(test_leaf.height() as usize)
1249                .await
1250                .await,
1251            *test_leaf
1252        );
1253    }
1254
1255    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1256    async fn test_archive_recovery() {
1257        // Create the consensus network.
1258        let mut network = MockNetwork::<MockDataSource>::init().await;
1259
1260        // Start a web server that the non-consensus node can use to fetch blocks.
1261        let port = reserve_tcp_port().unwrap();
1262        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1263        app.register_module(
1264            "availability",
1265            define_api(
1266                &Default::default(),
1267                MockBase::instance(),
1268                "1.0.0".parse().unwrap(),
1269            )
1270            .unwrap(),
1271        )
1272        .unwrap();
1273        network.spawn(
1274            "server",
1275            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1276        );
1277
1278        // Start a data source which is not receiving events from consensus, only from a peer. The
1279        // data source is at first configured to aggressively prune data.
1280        let db = TmpDb::init().await;
1281        let provider = Provider::new(TrustedQueryServiceProvider::new(
1282            format!("http://localhost:{port}").parse().unwrap(),
1283            MockBase::instance(),
1284        ));
1285        let mut data_source = db
1286            .config()
1287            .pruner_cfg(
1288                PrunerCfg::new()
1289                    .with_target_retention(Duration::from_secs(0))
1290                    .with_interval(Duration::from_secs(5)),
1291            )
1292            .unwrap()
1293            .builder(provider.clone())
1294            .await
1295            .unwrap()
1296            // Set a fast retry for failed operations. Occasionally storage operations will fail due
1297            // to conflicting write-mode transactions running concurrently. This is ok as they will
1298            // be retried. Having a fast retry interval speeds up the test.
1299            .with_min_retry_interval(Duration::from_millis(100))
1300            // Randomize retries a lot. This will temporarlly separate competing transactions write
1301            // transactions with high probability, so that one of them quickly gets exclusive access
1302            // to the database.
1303            .with_retry_randomization_factor(3.)
1304            .build()
1305            .await
1306            .unwrap();
1307
1308        // Start consensus.
1309        network.start().await;
1310
1311        // Wait until a few blocks are produced.
1312        let leaves = network.data_source().subscribe_leaves(1).await;
1313        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1314
1315        // The disconnected data source has no data yet, so it hasn't done any pruning.
1316        let pruned_height = data_source
1317            .read()
1318            .await
1319            .unwrap()
1320            .load_pruned_height()
1321            .await
1322            .unwrap();
1323        // Either None or 0 is acceptable, depending on whether or not the prover has run yet.
1324        assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");
1325
1326        // Send the last leaf to the disconnected data source so it learns about the height and
1327        // fetches the missing data.
1328        let last_leaf = leaves.last().unwrap();
1329        data_source.append(last_leaf.clone().into()).await.unwrap();
1330
1331        // Trigger a fetch of each leaf so the database gets populated.
1332        for i in 1..=last_leaf.height() {
1333            tracing::info!(i, "fetching leaf");
1334            assert_eq!(
1335                data_source.get_leaf(i as usize).await.await,
1336                leaves[i as usize - 1]
1337            );
1338        }
1339
1340        // After a bit of time, the pruner has run and deleted all the missing data we just fetched.
1341        loop {
1342            let pruned_height = data_source
1343                .read()
1344                .await
1345                .unwrap()
1346                .load_pruned_height()
1347                .await
1348                .unwrap();
1349            if pruned_height == Some(last_leaf.height()) {
1350                break;
1351            }
1352            tracing::info!(
1353                ?pruned_height,
1354                target_height = last_leaf.height(),
1355                "waiting for pruner to run"
1356            );
1357            sleep(Duration::from_secs(1)).await;
1358        }
1359
1360        // Now close the data source and restart it with archive recovery.
1361        data_source = db
1362            .config()
1363            .archive()
1364            .builder(provider.clone())
1365            .await
1366            .unwrap()
1367            .with_proactive_interval(Duration::from_secs(1))
1368            .with_sync_status_ttl(Duration::from_secs(1))
1369            .build()
1370            .await
1371            .unwrap();
1372
1373        // Pruned height should be reset.
1374        let pruned_height = data_source
1375            .read()
1376            .await
1377            .unwrap()
1378            .load_pruned_height()
1379            .await
1380            .unwrap();
1381        assert_eq!(pruned_height, None);
1382
1383        // The node has pruned all of it's data including the latest block, so it's forgotten the
1384        // block height. We need to give it another leaf with some height so it will be willing to
1385        // fetch.
1386        data_source.append(last_leaf.clone().into()).await.unwrap();
1387
1388        // Wait for the data to be restored. It should be restored by the next major scan.
1389        loop {
1390            let sync_status = data_source.sync_status().await.unwrap();
1391            if sync_status.is_fully_synced() {
1392                break;
1393            }
1394            tracing::info!(?sync_status, "waiting for node to sync");
1395            sleep(Duration::from_secs(1)).await;
1396        }
1397
1398        // The node remains fully synced even after some time; no pruning.
1399        sleep(Duration::from_secs(3)).await;
1400        let sync_status = data_source.sync_status().await.unwrap();
1401        assert!(sync_status.is_fully_synced(), "{sync_status:#?}");
1402    }
1403
1404    #[derive(Clone, Copy, Debug)]
1405    enum FailureType {
1406        Begin,
1407        Write,
1408        Commit,
1409    }
1410
1411    async fn test_fetch_storage_failure_helper(failure: FailureType) {
1412        // Create the consensus network.
1413        let mut network = MockNetwork::<MockDataSource>::init().await;
1414
1415        // Start a web server that the non-consensus node can use to fetch blocks.
1416        let port = reserve_tcp_port().unwrap();
1417        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1418        app.register_module(
1419            "availability",
1420            define_api(
1421                &Default::default(),
1422                MockBase::instance(),
1423                "1.0.0".parse().unwrap(),
1424            )
1425            .unwrap(),
1426        )
1427        .unwrap();
1428        network.spawn(
1429            "server",
1430            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1431        );
1432
1433        // Start a data source which is not receiving events from consensus, only from a peer.
1434        let provider = Provider::new(TrustedQueryServiceProvider::new(
1435            format!("http://localhost:{port}").parse().unwrap(),
1436            MockBase::instance(),
1437        ));
1438        let db = TmpDb::init().await;
1439        let storage = FailStorage::from(
1440            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1441                .await
1442                .unwrap(),
1443        );
1444        let data_source = FetchingDataSource::builder(storage, provider)
1445            .disable_proactive_fetching()
1446            .disable_aggregator()
1447            .with_max_retry_interval(Duration::from_millis(100))
1448            .with_retry_timeout(Duration::from_secs(1))
1449            .build()
1450            .await
1451            .unwrap();
1452
1453        // Start consensus.
1454        network.start().await;
1455
1456        // Wait until a couple of blocks are produced.
1457        let leaves = network.data_source().subscribe_leaves(1).await;
1458        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1459
1460        // Send the last leaf to the disconnected data source so it learns about the height.
1461        let last_leaf = leaves.last().unwrap();
1462        let mut tx = data_source.write().await.unwrap();
1463        tx.insert_leaf(last_leaf).await.unwrap();
1464        tx.commit().await.unwrap();
1465
1466        // Trigger a fetch of the first leaf; it should resolve even if we fail to store the leaf.
1467        tracing::info!("fetch with write failure");
1468        match failure {
1469            FailureType::Begin => {
1470                data_source
1471                    .as_ref()
1472                    .fail_begins_writable(FailableAction::Any)
1473                    .await
1474            },
1475            FailureType::Write => data_source.as_ref().fail_writes(FailableAction::Any).await,
1476            FailureType::Commit => data_source.as_ref().fail_commits(FailableAction::Any).await,
1477        }
1478        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1479        data_source.as_ref().pass().await;
1480
1481        // It is possible that the fetch above completes, notifies the subscriber,
1482        // and the fetch below quickly subscribes and gets notified by the same loop.
1483        // We add a delay here to avoid this situation.
1484        // This is not a bug, as being notified after subscribing is fine.
1485        sleep(Duration::from_secs(1)).await;
1486
1487        // We can get the same leaf again, this will again trigger an active fetch since storage
1488        // failed the first time.
1489        tracing::info!("fetch with write success");
1490        let fetch = data_source.get_leaf(1).await;
1491        assert!(fetch.is_pending());
1492        assert_eq!(leaves[0], fetch.await);
1493
1494        sleep(Duration::from_secs(1)).await;
1495
1496        // Finally, we should have the leaf locally and not need to fetch it.
1497        tracing::info!("retrieve from storage");
1498        let fetch = data_source.get_leaf(1).await;
1499        assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());
1500    }
1501
1502    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1503    async fn test_fetch_storage_failure_on_begin() {
1504        test_fetch_storage_failure_helper(FailureType::Begin).await;
1505    }
1506
1507    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1508    async fn test_fetch_storage_failure_on_write() {
1509        test_fetch_storage_failure_helper(FailureType::Write).await;
1510    }
1511
1512    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1513    async fn test_fetch_storage_failure_on_commit() {
1514        test_fetch_storage_failure_helper(FailureType::Commit).await;
1515    }
1516
1517    async fn test_fetch_storage_failure_retry_helper(failure: FailureType) {
1518        // Create the consensus network.
1519        let mut network = MockNetwork::<MockDataSource>::init().await;
1520
1521        // Start a web server that the non-consensus node can use to fetch blocks.
1522        let port = reserve_tcp_port().unwrap();
1523        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1524        app.register_module(
1525            "availability",
1526            define_api(
1527                &Default::default(),
1528                MockBase::instance(),
1529                "1.0.0".parse().unwrap(),
1530            )
1531            .unwrap(),
1532        )
1533        .unwrap();
1534        network.spawn(
1535            "server",
1536            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1537        );
1538
1539        // Start a data source which is not receiving events from consensus, only from a peer.
1540        let provider = Provider::new(TrustedQueryServiceProvider::new(
1541            format!("http://localhost:{port}").parse().unwrap(),
1542            MockBase::instance(),
1543        ));
1544        let db = TmpDb::init().await;
1545        let storage = FailStorage::from(
1546            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1547                .await
1548                .unwrap(),
1549        );
1550        let data_source = FetchingDataSource::builder(storage, provider)
1551            .disable_proactive_fetching()
1552            .disable_aggregator()
1553            .with_min_retry_interval(Duration::from_millis(100))
1554            .build()
1555            .await
1556            .unwrap();
1557
1558        // Start consensus.
1559        network.start().await;
1560
1561        // Wait until a couple of blocks are produced.
1562        let leaves = network.data_source().subscribe_leaves(1).await;
1563        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1564
1565        // Send the last leaf to the disconnected data source so it learns about the height.
1566        let last_leaf = leaves.last().unwrap();
1567        let mut tx = data_source.write().await.unwrap();
1568        tx.insert_leaf(last_leaf).await.unwrap();
1569        tx.commit().await.unwrap();
1570
1571        // Trigger a fetch of the first leaf; it should retry until it successfully stores the leaf.
1572        tracing::info!("fetch with write failure");
1573        match failure {
1574            FailureType::Begin => {
1575                data_source
1576                    .as_ref()
1577                    .fail_one_begin_writable(FailableAction::Any)
1578                    .await
1579            },
1580            FailureType::Write => {
1581                data_source
1582                    .as_ref()
1583                    .fail_one_write(FailableAction::Any)
1584                    .await
1585            },
1586            FailureType::Commit => {
1587                data_source
1588                    .as_ref()
1589                    .fail_one_commit(FailableAction::Any)
1590                    .await
1591            },
1592        }
1593        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1594
1595        // Check that the leaf ended up in local storage.
1596        let mut tx = data_source.read().await.unwrap();
1597        assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap());
1598    }
1599
1600    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1601    async fn test_fetch_storage_failure_retry_on_begin() {
1602        test_fetch_storage_failure_retry_helper(FailureType::Begin).await;
1603    }
1604
1605    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1606    async fn test_fetch_storage_failure_retry_on_write() {
1607        test_fetch_storage_failure_retry_helper(FailureType::Write).await;
1608    }
1609
1610    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1611    async fn test_fetch_storage_failure_retry_on_commit() {
1612        test_fetch_storage_failure_retry_helper(FailureType::Commit).await;
1613    }
1614
1615    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1616    async fn test_fetch_on_decide() {
1617        // Create the consensus network.
1618        let mut network = MockNetwork::<MockDataSource>::init().await;
1619
1620        // Start a web server that the non-consensus node can use to fetch blocks.
1621        let port = reserve_tcp_port().unwrap();
1622        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1623        app.register_module(
1624            "availability",
1625            define_api(
1626                &Default::default(),
1627                MockBase::instance(),
1628                "1.0.0".parse().unwrap(),
1629            )
1630            .unwrap(),
1631        )
1632        .unwrap();
1633        network.spawn(
1634            "server",
1635            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1636        );
1637
1638        // Start a data source which is not receiving events from consensus.
1639        let db = TmpDb::init().await;
1640        let provider = Provider::new(TrustedQueryServiceProvider::new(
1641            format!("http://localhost:{port}").parse().unwrap(),
1642            MockBase::instance(),
1643        ));
1644        let data_source = builder(&db, &provider)
1645            .await
1646            .with_max_retry_interval(Duration::from_secs(1))
1647            .build()
1648            .await
1649            .unwrap();
1650
1651        // Start consensus.
1652        network.start().await;
1653
1654        // Wait until a block has been decided.
1655        let leaf = network
1656            .data_source()
1657            .subscribe_leaves(1)
1658            .await
1659            .next()
1660            .await
1661            .unwrap();
1662
1663        // Give the node a decide containing the leaf but no additional information.
1664        tracing::info!("send decide event");
1665        data_source.append(leaf.clone().into()).await.unwrap();
1666
1667        // We will eventually retrieve the corresponding block and VID common, triggered by seeing
1668        // the leaf.
1669        tracing::info!("wait");
1670        sleep(Duration::from_secs(5)).await;
1671
1672        // Read the missing data directly from storage (via a database transaction), rather than
1673        // going through the data source, so that this request itself does not trigger a fetch.
1674        // Thus, this will only work if the data was already fetched, triggered by the leaf.
1675        let mut tx = data_source.read().await.unwrap();
1676        let id = BlockId::<MockTypes>::from(leaf.height() as usize);
1677        let block = tx.get_block(id).await.unwrap();
1678        let vid = tx.get_vid_common(id).await.unwrap();
1679
1680        assert_eq!(block.hash(), leaf.block_hash());
1681        assert_eq!(vid.block_hash(), leaf.block_hash());
1682    }
1683
1684    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1685    async fn test_fetch_begin_failure() {
1686        // Create the consensus network.
1687        let mut network = MockNetwork::<MockDataSource>::init().await;
1688
1689        // Start a web server that the non-consensus node can use to fetch blocks.
1690        let port = reserve_tcp_port().unwrap();
1691        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1692        app.register_module(
1693            "availability",
1694            define_api(
1695                &Default::default(),
1696                MockBase::instance(),
1697                "1.0.0".parse().unwrap(),
1698            )
1699            .unwrap(),
1700        )
1701        .unwrap();
1702        network.spawn(
1703            "server",
1704            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1705        );
1706
1707        // Start a data source which is not receiving events from consensus, only from a peer.
1708        let provider = Provider::new(TrustedQueryServiceProvider::new(
1709            format!("http://localhost:{port}").parse().unwrap(),
1710            MockBase::instance(),
1711        ));
1712        let db = TmpDb::init().await;
1713        let storage = FailStorage::from(
1714            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1715                .await
1716                .unwrap(),
1717        );
1718        let data_source = FetchingDataSource::builder(storage, provider)
1719            .disable_proactive_fetching()
1720            .disable_aggregator()
1721            .with_min_retry_interval(Duration::from_millis(100))
1722            .build()
1723            .await
1724            .unwrap();
1725
1726        // Start consensus.
1727        network.start().await;
1728
1729        // Wait until a couple of blocks are produced.
1730        let leaves = network.data_source().subscribe_leaves(1).await;
1731        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1732
1733        // Send the last leaf to the disconnected data source so it learns about the height.
1734        let last_leaf = leaves.last().unwrap();
1735        let mut tx = data_source.write().await.unwrap();
1736        tx.insert_leaf(last_leaf).await.unwrap();
1737        tx.commit().await.unwrap();
1738
1739        // Trigger a fetch of the first leaf; it should retry until it is able to determine
1740        // the leaf is fetchable and trigger the fetch.
1741        tracing::info!("fetch with transaction failure");
1742        data_source
1743            .as_ref()
1744            .fail_one_begin_read_only(FailableAction::Any)
1745            .await;
1746        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1747    }
1748
1749    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1750    async fn test_fetch_load_failure_block() {
1751        // Create the consensus network.
1752        let mut network = MockNetwork::<MockDataSource>::init().await;
1753
1754        // Start a web server that the non-consensus node can use to fetch blocks.
1755        let port = reserve_tcp_port().unwrap();
1756        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1757        app.register_module(
1758            "availability",
1759            define_api(
1760                &Default::default(),
1761                MockBase::instance(),
1762                "1.0.0".parse().unwrap(),
1763            )
1764            .unwrap(),
1765        )
1766        .unwrap();
1767        network.spawn(
1768            "server",
1769            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1770        );
1771
1772        // Start a data source which is not receiving events from consensus, only from a peer.
1773        let provider = Provider::new(TrustedQueryServiceProvider::new(
1774            format!("http://localhost:{port}").parse().unwrap(),
1775            MockBase::instance(),
1776        ));
1777        let db = TmpDb::init().await;
1778        let storage = FailStorage::from(
1779            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1780                .await
1781                .unwrap(),
1782        );
1783        let data_source = FetchingDataSource::builder(storage, provider)
1784            .disable_proactive_fetching()
1785            .disable_aggregator()
1786            .with_min_retry_interval(Duration::from_millis(100))
1787            .build()
1788            .await
1789            .unwrap();
1790
1791        // Start consensus.
1792        network.start().await;
1793
1794        // Wait until a block is produced.
1795        let mut leaves = network.data_source().subscribe_leaves(1).await;
1796        let leaf = leaves.next().await.unwrap();
1797
1798        // Send the leaf to the disconnected data source, so the corresponding block becomes
1799        // fetchable.
1800        let mut tx = data_source.write().await.unwrap();
1801        tx.insert_leaf(&leaf).await.unwrap();
1802        tx.commit().await.unwrap();
1803
1804        // Trigger a fetch of the block by hash; it should retry until it is able to determine the
1805        // leaf is available, thus the block is fetchable, trigger the fetch.
1806        //
1807        // Failing only on the `get_header` call here hits an edge case which is only possible when
1808        // fetching blocks: we successfully determine that the object is not available locally and
1809        // that it might exist, so we actually call `active_fetch` to try and get it. If we then
1810        // fail to load the header and erroneously treat this as the header not being available, we
1811        // will give up and consider the object unfetchable (since the next step would be to fetch
1812        // the corresponding leaf, but we cannot do this with just a block hash).
1813        //
1814        // Thus, this test will only pass if we correctly retry the `active_fetch` until we are
1815        // successfully able to load the header from storage and determine that the block is
1816        // fetchable.
1817        tracing::info!("fetch with read failure");
1818        data_source
1819            .as_ref()
1820            .fail_one_read(FailableAction::GetHeader)
1821            .await;
1822        let fetch = data_source.get_block(leaf.block_hash()).await;
1823
1824        // Give some time for a few reads to fail before letting them succeed.
1825        sleep(Duration::from_secs(2)).await;
1826        data_source.as_ref().pass().await;
1827
1828        let block: BlockQueryData<MockTypes> = fetch.await;
1829        assert_eq!(block.hash(), leaf.block_hash());
1830    }
1831
1832    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1833    async fn test_fetch_load_failure_tx() {
1834        // Create the consensus network.
1835        let mut network = MockNetwork::<MockDataSource>::init().await;
1836
1837        // Start a web server that the non-consensus node can use to fetch blocks.
1838        let port = reserve_tcp_port().unwrap();
1839        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1840        app.register_module(
1841            "availability",
1842            define_api(
1843                &Default::default(),
1844                MockBase::instance(),
1845                "1.0.0".parse().unwrap(),
1846            )
1847            .unwrap(),
1848        )
1849        .unwrap();
1850        network.spawn(
1851            "server",
1852            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1853        );
1854
1855        // Start a data source which is not receiving events from consensus, only from a peer.
1856        let provider = Provider::new(TrustedQueryServiceProvider::new(
1857            format!("http://localhost:{port}").parse().unwrap(),
1858            MockBase::instance(),
1859        ));
1860        let db = TmpDb::init().await;
1861        let storage = FailStorage::from(
1862            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1863                .await
1864                .unwrap(),
1865        );
1866        let data_source = FetchingDataSource::builder(storage, provider)
1867            .disable_proactive_fetching()
1868            .disable_aggregator()
1869            .with_min_retry_interval(Duration::from_millis(100))
1870            .build()
1871            .await
1872            .unwrap();
1873
1874        // Start consensus.
1875        network.start().await;
1876
1877        // Wait until a transaction is sequenced.
1878        let tx = mock_transaction(vec![1, 2, 3]);
1879        network.submit_transaction(tx.clone()).await;
1880        let tx = network
1881            .data_source()
1882            .get_block_containing_transaction(tx.commit())
1883            .await
1884            .await;
1885
1886        // Send the block containing the transaction to the disconnected data source.
1887        {
1888            let leaf = network
1889                .data_source()
1890                .get_leaf(tx.transaction.block_height() as usize)
1891                .await
1892                .await;
1893            let block = network
1894                .data_source()
1895                .get_block(tx.transaction.block_height() as usize)
1896                .await
1897                .await;
1898            let mut tx = data_source.write().await.unwrap();
1899            tx.insert_leaf(&leaf).await.unwrap();
1900            tx.insert_block(&block).await.unwrap();
1901            tx.commit().await.unwrap();
1902        }
1903
1904        // Check that the transaction is there.
1905        tracing::info!("fetch success");
1906        assert_eq!(
1907            tx,
1908            data_source
1909                .get_block_containing_transaction(tx.transaction.hash())
1910                .await
1911                .await
1912        );
1913
1914        // Fetch the transaction with storage failures.
1915        //
1916        // Failing only one read here hits an edge case that only exists for unfetchable objects
1917        // (e.g. transactions). This will cause the initial aload of the transaction to fail, but,
1918        // if we erroneously treat this load failure as the transaction being missing, we will
1919        // succeed in calling `fetch`, since subsequent loads succeed. However, since a transaction
1920        // is not active-fetchable, no active fetch will actually be spawned, and this fetch will
1921        // never resolve.
1922        //
1923        // Thus, the test should only pass if we correctly retry the initial load until it succeeds
1924        // and we discover that the transaction doesn't need to be fetched at all.
1925        tracing::info!("fetch with read failure");
1926        data_source
1927            .as_ref()
1928            .fail_one_read(FailableAction::Any)
1929            .await;
1930        let fetch = data_source
1931            .get_block_containing_transaction(tx.transaction.hash())
1932            .await;
1933
1934        assert_eq!(tx, fetch.await);
1935    }
1936
1937    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1938    async fn test_stream_begin_failure() {
1939        // Create the consensus network.
1940        let mut network = MockNetwork::<MockDataSource>::init().await;
1941
1942        // Start a web server that the non-consensus node can use to fetch blocks.
1943        let port = reserve_tcp_port().unwrap();
1944        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1945        app.register_module(
1946            "availability",
1947            define_api(
1948                &Default::default(),
1949                MockBase::instance(),
1950                "1.0.0".parse().unwrap(),
1951            )
1952            .unwrap(),
1953        )
1954        .unwrap();
1955        network.spawn(
1956            "server",
1957            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1958        );
1959
1960        // Start a data source which is not receiving events from consensus, only from a peer.
1961        let provider = Provider::new(TrustedQueryServiceProvider::new(
1962            format!("http://localhost:{port}").parse().unwrap(),
1963            MockBase::instance(),
1964        ));
1965        let db = TmpDb::init().await;
1966        let storage = FailStorage::from(
1967            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1968                .await
1969                .unwrap(),
1970        );
1971        let data_source = FetchingDataSource::builder(storage, provider)
1972            .disable_proactive_fetching()
1973            .disable_aggregator()
1974            .with_min_retry_interval(Duration::from_millis(100))
1975            .with_range_chunk_size(3)
1976            .build()
1977            .await
1978            .unwrap();
1979
1980        // Start consensus.
1981        network.start().await;
1982
1983        // Wait until a few blocks are produced.
1984        let leaves = network.data_source().subscribe_leaves(1).await;
1985        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1986
1987        // Send the last leaf to the disconnected data source so it learns about the height.
1988        let last_leaf = leaves.last().unwrap();
1989        let mut tx = data_source.write().await.unwrap();
1990        tx.insert_leaf(last_leaf).await.unwrap();
1991        tx.commit().await.unwrap();
1992
1993        // Stream the leaves; it should retry until it is able to determine each leaf is fetchable
1994        // and trigger the fetch.
1995        tracing::info!("stream with transaction failure");
1996        data_source
1997            .as_ref()
1998            .fail_one_begin_read_only(FailableAction::Any)
1999            .await;
2000        assert_eq!(
2001            leaves,
2002            data_source
2003                .subscribe_leaves(1)
2004                .await
2005                .take(5)
2006                .collect::<Vec<_>>()
2007                .await
2008        );
2009    }
2010
2011    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2012    async fn test_stream_load_failure() {
2013        // Create the consensus network.
2014        let mut network = MockNetwork::<MockDataSource>::init().await;
2015
2016        // Start a web server that the non-consensus node can use to fetch blocks.
2017        let port = reserve_tcp_port().unwrap();
2018        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2019        app.register_module(
2020            "availability",
2021            define_api(
2022                &Default::default(),
2023                MockBase::instance(),
2024                "1.0.0".parse().unwrap(),
2025            )
2026            .unwrap(),
2027        )
2028        .unwrap();
2029        network.spawn(
2030            "server",
2031            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2032        );
2033
2034        // Start a data source which is not receiving events from consensus, only from a peer.
2035        let provider = Provider::new(TrustedQueryServiceProvider::new(
2036            format!("http://localhost:{port}").parse().unwrap(),
2037            MockBase::instance(),
2038        ));
2039        let db = TmpDb::init().await;
2040        let storage = FailStorage::from(
2041            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2042                .await
2043                .unwrap(),
2044        );
2045        let data_source = FetchingDataSource::builder(storage, provider)
2046            .disable_proactive_fetching()
2047            .disable_aggregator()
2048            .with_min_retry_interval(Duration::from_millis(100))
2049            .with_range_chunk_size(3)
2050            .build()
2051            .await
2052            .unwrap();
2053
2054        // Start consensus.
2055        network.start().await;
2056
2057        // Wait until a few blocks are produced.
2058        let leaves = network.data_source().subscribe_leaves(1).await;
2059        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2060
2061        // Send the last leaf to the disconnected data source, so the blocks becomes fetchable.
2062        let last_leaf = leaves.last().unwrap();
2063        let mut tx = data_source.write().await.unwrap();
2064        tx.insert_leaf(last_leaf).await.unwrap();
2065        tx.commit().await.unwrap();
2066
2067        // Stream the blocks with a period of database failures.
2068        tracing::info!("stream with read failure");
2069        data_source.as_ref().fail_reads(FailableAction::Any).await;
2070        let fetches = data_source
2071            .get_block_range(1..=5)
2072            .await
2073            .collect::<Vec<_>>()
2074            .await;
2075
2076        // Give some time for a few reads to fail before letting them succeed.
2077        sleep(Duration::from_secs(2)).await;
2078        data_source.as_ref().pass().await;
2079
2080        for (leaf, fetch) in leaves.iter().zip(fetches) {
2081            let block: BlockQueryData<MockTypes> = fetch.await;
2082            assert_eq!(block.hash(), leaf.block_hash());
2083        }
2084    }
2085
2086    enum MetadataType {
2087        Payload,
2088        Vid,
2089    }
2090
2091    async fn test_metadata_stream_begin_failure_helper(stream: MetadataType) {
2092        // Create the consensus network.
2093        let mut network = MockNetwork::<MockDataSource>::init().await;
2094
2095        // Start a web server that the non-consensus node can use to fetch blocks.
2096        let port = reserve_tcp_port().unwrap();
2097        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2098        app.register_module(
2099            "availability",
2100            define_api(
2101                &Default::default(),
2102                MockBase::instance(),
2103                "1.0.0".parse().unwrap(),
2104            )
2105            .unwrap(),
2106        )
2107        .unwrap();
2108        network.spawn(
2109            "server",
2110            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2111        );
2112
2113        // Start a data source which is not receiving events from consensus, only from a peer.
2114        let provider = Provider::new(TrustedQueryServiceProvider::new(
2115            format!("http://localhost:{port}").parse().unwrap(),
2116            MockBase::instance(),
2117        ));
2118        let db = TmpDb::init().await;
2119        let storage = FailStorage::from(
2120            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2121                .await
2122                .unwrap(),
2123        );
2124        let data_source = FetchingDataSource::builder(storage, provider)
2125            .disable_proactive_fetching()
2126            .disable_aggregator()
2127            .with_min_retry_interval(Duration::from_millis(100))
2128            .with_range_chunk_size(3)
2129            .build()
2130            .await
2131            .unwrap();
2132
2133        // Start consensus.
2134        network.start().await;
2135
2136        // Wait until a few blocks are produced.
2137        let leaves = network.data_source().subscribe_leaves(1).await;
2138        let leaves = leaves.take(3).collect::<Vec<_>>().await;
2139
2140        // Send the last leaf to the disconnected data source, so the blocks becomes fetchable.
2141        let last_leaf = leaves.last().unwrap();
2142        let mut tx = data_source.write().await.unwrap();
2143        tx.insert_leaf(last_leaf).await.unwrap();
2144        tx.commit().await.unwrap();
2145
2146        // Send the first object to the disconnected data source, so we hit all the cases:
2147        // * leaf present but not full object (from the last leaf)
2148        // * full object present but inaccessible due to storage failures (first object)
2149        // * nothing present (middle object)
2150        let leaf = network.data_source().get_leaf(1).await.await;
2151        let block = network.data_source().get_block(1).await.await;
2152        let vid = network.data_source().get_vid_common(1).await.await;
2153        data_source
2154            .append(BlockInfo::new(leaf, Some(block), Some(vid), None))
2155            .await
2156            .unwrap();
2157
2158        // Stream the objects with a period of database failures.
2159        tracing::info!("stream with transaction failure");
2160        data_source
2161            .as_ref()
2162            .fail_begins_read_only(FailableAction::Any)
2163            .await;
2164        match stream {
2165            MetadataType::Payload => {
2166                let payloads = data_source.subscribe_payload_metadata(1).await.take(3);
2167
2168                // Give some time for a few reads to fail before letting them succeed.
2169                sleep(Duration::from_secs(2)).await;
2170                tracing::info!("stop failing transactions");
2171                data_source.as_ref().pass().await;
2172
2173                let payloads = payloads.collect::<Vec<_>>().await;
2174                for (leaf, payload) in leaves.iter().zip(payloads) {
2175                    assert_eq!(payload.block_hash, leaf.block_hash());
2176                }
2177            },
2178            MetadataType::Vid => {
2179                let vids = data_source.subscribe_vid_common_metadata(1).await.take(3);
2180
2181                // Give some time for a few reads to fail before letting them succeed.
2182                sleep(Duration::from_secs(2)).await;
2183                tracing::info!("stop failing transactions");
2184                data_source.as_ref().pass().await;
2185
2186                let vids = vids.collect::<Vec<_>>().await;
2187                for (leaf, vid) in leaves.iter().zip(vids) {
2188                    assert_eq!(vid.block_hash, leaf.block_hash());
2189                }
2190            },
2191        }
2192    }
2193
2194    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2195    async fn test_metadata_stream_begin_failure_payload() {
2196        test_metadata_stream_begin_failure_helper(MetadataType::Payload).await
2197    }
2198
2199    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2200    async fn test_metadata_stream_begin_failure_vid() {
2201        test_metadata_stream_begin_failure_helper(MetadataType::Vid).await
2202    }
2203
2204    #[tokio::test(flavor = "multi_thread")]
2205    #[test_log::test]
2206    async fn test_ranged_fetch() {
2207        // Create the consensus network.
2208        let mut network = MockNetwork::<MockDataSource>::init().await;
2209
2210        // Start a web server that the non-consensus node can use to fetch blocks.
2211        let port = reserve_tcp_port().unwrap();
2212        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2213        app.register_module(
2214            "availability",
2215            define_api(
2216                &Default::default(),
2217                MockBase::instance(),
2218                "1.0.0".parse().unwrap(),
2219            )
2220            .unwrap(),
2221        )
2222        .unwrap();
2223        network.spawn(
2224            "server",
2225            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2226        );
2227
2228        // Start consensus.
2229        network.start().await;
2230
2231        // Wait for a few blocks to be produced.
2232        let leaves = network
2233            .data_source()
2234            .subscribe_leaves(0)
2235            .await
2236            .take(5)
2237            .collect::<Vec<_>>()
2238            .await;
2239        let blocks = network
2240            .data_source()
2241            .subscribe_blocks(0)
2242            .await
2243            .take(5)
2244            .collect::<Vec<_>>()
2245            .await;
2246        let vid = network
2247            .data_source()
2248            .subscribe_vid_common(0)
2249            .await
2250            .take(5)
2251            .collect::<Vec<_>>()
2252            .await;
2253
2254        // Connect a fetching provider.
2255        let provider = TrustedQueryServiceProvider::new(
2256            format!("http://localhost:{port}").parse().unwrap(),
2257            MockBase::instance(),
2258        );
2259
2260        // Make ranged requests.
2261        tracing::info!("fetch leaf range");
2262        assert_eq!(
2263            provider
2264                .fetch(LeafRangeRequest { start: 0, end: 5 })
2265                .await
2266                .unwrap(),
2267            leaves
2268        );
2269        tracing::info!("fetch block range");
2270        assert_eq!(
2271            ProviderTrait::<MockTypes, _>::fetch(&provider, BlockRangeRequest { start: 0, end: 5 })
2272                .await
2273                .unwrap(),
2274            blocks
2275        );
2276        tracing::info!("fetch VID common range");
2277        assert_eq!(
2278            ProviderTrait::<MockTypes, _>::fetch(
2279                &provider,
2280                VidCommonRangeRequest { start: 0, end: 5 }
2281            )
2282            .await
2283            .unwrap(),
2284            vid
2285        );
2286    }
2287
2288    /// A test server which does not support ranged VID common requests.
2289    async fn old_server(port: u16) {
2290        let mut api = Api::<(), availability::Error, StaticVersion<1, 0>>::new(toml! {
2291            [route.get_vid_common]
2292            PATH = ["vid/common/:height"]
2293            ":height" = "Integer"
2294        })
2295        .unwrap();
2296
2297        api.get("get_vid_common", move |req, _| {
2298            async move {
2299                let mut common = VidCommonQueryData::<MockTypes>::genesis(
2300                    &Default::default(),
2301                    &Default::default(),
2302                    TEST_VERSIONS.test.base,
2303                )
2304                .await;
2305                common.height = req.integer_param("height")?;
2306                Ok(common)
2307            }
2308            .boxed()
2309        })
2310        .unwrap();
2311
2312        let mut app = App::<(), Error>::with_state(());
2313        app.register_module("availability", api).unwrap();
2314        app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
2315            .await
2316            .ok();
2317    }
2318
2319    #[tokio::test]
2320    #[test_log::test]
2321    async fn test_vid_common_fallback() {
2322        let port = reserve_tcp_port().unwrap();
2323        let _server = BackgroundTask::spawn("old server", old_server(port));
2324        let provider = TrustedQueryServiceProvider::new(
2325            format!("http://localhost:{port}").parse().unwrap(),
2326            StaticVersion::<1, 0>::instance(),
2327        );
2328
2329        // First fetch a range of VID common one by one, to get a ground truth.
2330        let common = try_join_all((0..5).map(|i| {
2331            provider
2332                .client
2333                .get::<VidCommonQueryData<MockTypes>>(&format!("availability/vid/common/{i}"))
2334                .send()
2335        }))
2336        .await
2337        .unwrap();
2338
2339        // Now fetch the whole thing as a range.
2340        let req = VidCommonRangeRequest { start: 0, end: 5 };
2341        assert_eq!(
2342            common.as_slice(),
2343            provider.fetch_vid_common_range(req).await.unwrap().as_ref()
2344        );
2345    }
2346}