hotshot_query_service/fetching/provider/
query_service.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use std::fmt::Debug;
14
15use anyhow::{Context, ensure};
16use async_trait::async_trait;
17use committable::Committable;
18use futures::{TryFutureExt, future::try_join_all};
19use hotshot_types::{
20    data::{VidCommitment, VidCommon, ns_table},
21    traits::{EncodeBytes, node_implementation::NodeType},
22    vid::{
23        advz::{ADVZScheme, advz_scheme},
24        avidm::{AvidMScheme, init_avidm_param},
25        avidm_gf2::AvidmGf2Scheme,
26    },
27};
28use jf_advz::VidScheme;
29use surf_disco::{Client, Url};
30use vbs::version::StaticVersionType;
31
32use super::Provider;
33use crate::{
34    Error, Payload,
35    availability::{BlockQueryData, LeafQueryData, VidCommonQueryData},
36    fetching::{
37        NonEmptyRange,
38        request::{
39            BlockRangeRequest, LeafRangeRequest, LeafRequest, PayloadRequest, RangeRequest,
40            VidCommonRangeRequest, VidCommonRequest,
41        },
42    },
43    types::HeightIndexed,
44};
45
46/// Data availability provider backed by another instance of this query service.
47///
48/// This fetcher implements the [`Provider`] interface by querying the REST API provided by another
49/// instance of this query service to try and retrieve missing objects.
50#[derive(Clone, Debug)]
51pub struct QueryServiceProvider<Ver: StaticVersionType> {
52    client: Client<Error, Ver>,
53}
54
55impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
56    pub fn new(url: Url, _: Ver) -> Self {
57        Self {
58            client: Client::new(url),
59        }
60    }
61}
62
63impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
64    pub async fn fetch_payload<Types: NodeType>(
65        &self,
66        req: PayloadRequest,
67    ) -> anyhow::Result<Payload<Types>> {
68        let req_hash = req.0;
69
70        // Fetch the payload and the VID common data. We need the common data to recompute the VID
71        // commitment, to ensure the payload we received is consistent with the commitment we
72        // requested.
73        let block = self
74            .client
75            .get::<BlockQueryData<Types>>(&format!("availability/block/payload-hash/{req_hash}"))
76            .send()
77            .await
78            .context("fetching block")?;
79        let common = self
80            .client
81            .get::<VidCommonQueryData<Types>>(&format!(
82                "availability/vid/common/payload-hash/{req_hash}",
83            ))
84            .send()
85            .await
86            .context("fetching VID common")?;
87
88        let comm =
89            recompute_payload_commitment(&block, &common).context("computing VID commitment")?;
90        ensure!(
91            comm == req_hash,
92            "VID commitment {comm} does not match request"
93        );
94
95        Ok(block.payload)
96    }
97
98    pub async fn fetch_payload_range<Types: NodeType>(
99        &self,
100        req: BlockRangeRequest,
101    ) -> anyhow::Result<NonEmptyRange<BlockQueryData<Types>>> {
102        let req = RangeRequest::from(req);
103
104        // Fetch the payload and the VID common data. We need the common data to recompute the VID
105        // commitment, to ensure the payloads we received are consistent with the expected
106        // commitments.
107        let blocks = self
108            .client
109            .get::<NonEmptyRange<BlockQueryData<Types>>>(&format!(
110                "availability/block/{}/{}",
111                req.start, req.end
112            ))
113            .send()
114            .await
115            .context("fetching blocks")?;
116        let common = self
117            .fetch_vid_common_range_with_fallback(req.start, req.end)
118            .await?;
119
120        ensure!(
121            blocks.start() == req.start && blocks.end() == req.end,
122            "wrong block range ({}..{})",
123            blocks.start(),
124            blocks.end()
125        );
126        ensure!(
127            common.start() == req.start && common.end() == req.end,
128            "wrong VID common range (expected {}..{})",
129            common.start(),
130            common.end()
131        );
132
133        let commits = blocks
134            .iter()
135            .zip(&common)
136            .map(|(block, common)| recompute_payload_commitment(block, common))
137            .collect::<Result<Vec<VidCommitment>, _>>()
138            .context("computing VID commitments")?;
139        let hash = RangeRequest::hash_payloads(commits.iter().copied());
140        ensure!(
141            hash == req.expected_hash,
142            "server returned blocks with wrong payload hash ({hash}, commits {commits:?})",
143        );
144
145        Ok(blocks)
146    }
147
148    fn handle_result<R: Debug, T>(&self, req: R, res: anyhow::Result<T>) -> Option<T> {
149        match res {
150            Ok(res) => Some(res),
151            Err(err) => {
152                tracing::warn!(upstream = %self.client.base_url(), ?req, "failed to fetch: {err:#}");
153                None
154            },
155        }
156    }
157}
158
159#[async_trait]
160impl<Types, Ver: StaticVersionType> Provider<Types, PayloadRequest> for QueryServiceProvider<Ver>
161where
162    Types: NodeType,
163{
164    async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
165        self.handle_result(req, self.fetch_payload::<Types>(req).await)
166    }
167}
168
169#[async_trait]
170impl<Types, Ver: StaticVersionType> Provider<Types, BlockRangeRequest> for QueryServiceProvider<Ver>
171where
172    Types: NodeType,
173{
174    async fn fetch(&self, req: BlockRangeRequest) -> Option<NonEmptyRange<BlockQueryData<Types>>> {
175        self.handle_result(req, self.fetch_payload_range::<Types>(req).await)
176    }
177}
178
179fn recompute_payload_commitment<Types>(
180    block: &BlockQueryData<Types>,
181    common: &VidCommonQueryData<Types>,
182) -> anyhow::Result<VidCommitment>
183where
184    Types: NodeType,
185{
186    match common.common() {
187        VidCommon::V0(common) => {
188            let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common) as usize;
189            let bytes = block.payload().encode();
190            advz_scheme(num_storage_nodes)
191                .commit_only(bytes)
192                .map(VidCommitment::V0)
193                .context("failed to compute VID commitment (V0)")
194        },
195        VidCommon::V1(common) => {
196            let bytes = block.payload().encode();
197            let avidm_param = init_avidm_param(common.total_weights).context(format!(
198                "failed to initialize AVIDM params. total_weight={}",
199                common.total_weights
200            ))?;
201            let metadata = block.metadata().encode();
202            AvidMScheme::commit(
203                &avidm_param,
204                &bytes,
205                ns_table::parse_ns_table(bytes.len(), &metadata),
206            )
207            .map(VidCommitment::V1)
208            .map_err(anyhow::Error::msg)
209            .context("failed to compute AVIDM commitment")
210        },
211        VidCommon::V2(common) => {
212            let bytes = block.payload().encode();
213            let metadata = block.metadata().encode();
214            AvidmGf2Scheme::commit(
215                &common.param,
216                &bytes,
217                ns_table::parse_ns_table(bytes.len(), &metadata),
218            )
219            .map(|(commit, _)| VidCommitment::V2(commit))
220            .map_err(anyhow::Error::msg)
221            .context("failed to compute AvidmGf2 commitment")
222        },
223    }
224}
225
226impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
227    pub async fn fetch_leaf<Types: NodeType>(
228        &self,
229        req: LeafRequest<Types>,
230    ) -> anyhow::Result<LeafQueryData<Types>> {
231        let leaf = self
232            .client
233            .get::<LeafQueryData<Types>>(&format!("availability/leaf/{}", req.height))
234            .send()
235            .await
236            .context("fetching leaf")?;
237
238        ensure!(
239            leaf.height() == req.height,
240            "received leaf with the wrong height ({})",
241            leaf.height(),
242        );
243        ensure!(
244            leaf.hash() == req.expected_leaf,
245            "received leaf with the wrong hash ({})",
246            leaf.hash()
247        );
248        ensure!(
249            leaf.qc().commit() == req.expected_qc,
250            "received leaf with the wrong QC ({})",
251            leaf.qc().commit()
252        );
253
254        Ok(leaf)
255    }
256
257    pub async fn fetch_leaf_range<Types: NodeType>(
258        &self,
259        req: LeafRangeRequest<Types>,
260    ) -> anyhow::Result<NonEmptyRange<LeafQueryData<Types>>> {
261        let leaves = self
262            .client
263            .get::<NonEmptyRange<LeafQueryData<Types>>>(&format!(
264                "availability/leaf/{}/{}",
265                req.start, req.end
266            ))
267            .send()
268            .await
269            .context("fetching leaf chain")?;
270
271        ensure!(
272            leaves.start() == req.start && leaves.end() == req.end,
273            "server returned wrong range of leaves ({}..{})",
274            leaves.start(),
275            leaves.end()
276        );
277
278        // Verify hash chaining.
279        let mut expected_leaf = req.last_leaf;
280        let mut expected_qc = req.last_qc;
281        for leaf in leaves.iter().rev() {
282            let leaf_hash = leaf.hash();
283            let qc_hash = leaf.qc().commit();
284            ensure!(
285                leaf_hash == expected_leaf,
286                "received leaf {} with wrong hash {leaf_hash}",
287                leaf.height(),
288            );
289            ensure!(
290                qc_hash == expected_qc,
291                "received leaf {} with wrong QC {qc_hash}",
292                leaf.height()
293            );
294            expected_leaf = leaf.leaf().parent_commitment();
295            expected_qc = leaf.leaf().justify_qc().commit();
296        }
297
298        Ok(leaves)
299    }
300}
301
302#[async_trait]
303impl<Types, Ver: StaticVersionType> Provider<Types, LeafRequest<Types>>
304    for QueryServiceProvider<Ver>
305where
306    Types: NodeType,
307{
308    async fn fetch(&self, req: LeafRequest<Types>) -> Option<LeafQueryData<Types>> {
309        self.handle_result(req, self.fetch_leaf(req).await)
310    }
311}
312
313#[async_trait]
314impl<Types, Ver: StaticVersionType> Provider<Types, LeafRangeRequest<Types>>
315    for QueryServiceProvider<Ver>
316where
317    Types: NodeType,
318{
319    async fn fetch(
320        &self,
321        req: LeafRangeRequest<Types>,
322    ) -> Option<NonEmptyRange<LeafQueryData<Types>>> {
323        self.handle_result(req, self.fetch_leaf_range(req).await)
324    }
325}
326
327impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
328    pub async fn fetch_vid_common<Types: NodeType>(
329        &self,
330        req: VidCommonRequest,
331    ) -> anyhow::Result<VidCommon> {
332        let res = self
333            .client
334            .get::<VidCommonQueryData<Types>>(&format!(
335                "availability/vid/common/payload-hash/{}",
336                req.0
337            ))
338            .send()
339            .await
340            .context("fetching VID common")?;
341
342        ensure!(
343            res.common().is_consistent(&req.0),
344            "inconsistent VID common data {:?}",
345            res.common,
346        );
347        Ok(res.common)
348    }
349
350    async fn fetch_vid_common_range_with_fallback<Types: NodeType>(
351        &self,
352        start: u64,
353        end: u64,
354    ) -> anyhow::Result<NonEmptyRange<VidCommonQueryData<Types>>> {
355        let res = self
356            .client
357            .get::<NonEmptyRange<VidCommonQueryData<Types>>>(&format!(
358                "availability/vid/common/{start}/{end}",
359            ))
360            .send()
361            .await;
362        match res {
363            Ok(common) => Ok(common),
364            Err(Error::Custom { message, .. }) if message.contains("No route matches") => {
365                // Old versions of the upstream query service do not support the ranged VID common
366                // endpoint. Fall back to fetching each object individually.
367                tracing::info!(
368                    start,
369                    end,
370                    "server does not support ranged VID fetching, falling back to individual \
371                     fetches"
372                );
373                let common = try_join_all((start..end).map(|i| {
374                    self.client
375                        .get::<VidCommonQueryData<Types>>(&format!("availability/vid/common/{i}"))
376                        .send()
377                        .map_err(move |err| {
378                            anyhow::Error::new(err).context(format!("fetching VID common {i}"))
379                        })
380                }))
381                .await?;
382                NonEmptyRange::new(common)
383                    .context("converting individually fetched VID common into range")
384            },
385            Err(err) => Err(err).context("fetching VID common range"),
386        }
387    }
388
389    pub async fn fetch_vid_common_range<Types: NodeType>(
390        &self,
391        req: VidCommonRangeRequest,
392    ) -> anyhow::Result<NonEmptyRange<VidCommonQueryData<Types>>> {
393        let req = RangeRequest::from(req);
394        let common = self
395            .fetch_vid_common_range_with_fallback(req.start, req.end)
396            .await?;
397
398        ensure!(
399            common.start() == req.start && common.end() == req.end,
400            "server returned wrong VID common ({}..{})",
401            common.start(),
402            common.end()
403        );
404
405        let commits = common
406            .iter()
407            .map(|common| {
408                // Check that the given `VidCommon` matches the claimed payload commitment.
409                ensure!(
410                    common.common().is_consistent(&common.payload_hash()),
411                    "server returned VID common with inconsistent commitment {common:?}"
412                );
413
414                // Yield the payload commitment for hashing, to check that the full sequence of payloads
415                // is consistent with the request.
416                Ok(common.payload_hash())
417            })
418            .collect::<anyhow::Result<Vec<_>>>()?;
419        let hash = RangeRequest::hash_payloads(commits.iter().copied());
420        ensure!(
421            hash == req.expected_hash,
422            "server returned wrong VID common (hash {hash}, commits {commits:?})"
423        );
424
425        Ok(common)
426    }
427}
428
429#[async_trait]
430impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRequest> for QueryServiceProvider<Ver>
431where
432    Types: NodeType,
433{
434    async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
435        self.handle_result(req, self.fetch_vid_common::<Types>(req).await)
436    }
437}
438
439#[async_trait]
440impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRangeRequest>
441    for QueryServiceProvider<Ver>
442where
443    Types: NodeType,
444{
445    async fn fetch(
446        &self,
447        req: VidCommonRangeRequest,
448    ) -> Option<NonEmptyRange<VidCommonQueryData<Types>>> {
449        self.handle_result(req, self.fetch_vid_common_range::<Types>(req).await)
450    }
451}
452
453// These tests run the `postgres` Docker image, which doesn't work on Windows.
454#[cfg(all(test, not(target_os = "windows")))]
455mod test {
456    use std::{future::IntoFuture, time::Duration};
457
458    use committable::{Commitment, Committable};
459    use futures::{
460        future::{FutureExt, join},
461        stream::StreamExt,
462    };
463    // generic-array 0.14.x is deprecated, but VidCommitment requires this version
464    // for From<Output<H>> impl in jf-merkle-tree
465    #[allow(deprecated)]
466    use generic_array::GenericArray;
467    use hotshot_example_types::node_types::{EpochVersion, TEST_VERSIONS};
468    use rand::RngCore;
469    use test_utils::reserve_tcp_port;
470    use tide_disco::{Api, App, error::ServerError};
471    use toml::toml;
472    use vbs::version::StaticVersion;
473
474    use super::*;
475    use crate::{
476        ApiState,
477        api::load_api,
478        availability::{
479            self, AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction,
480            Fetch, UpdateAvailabilityData, define_api,
481        },
482        data_source::{
483            AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource,
484            sql::{self, SqlDataSource},
485            storage::{
486                AvailabilityStorage, SqlStorage, StorageConnectionType, UpdateAvailabilityStorage,
487                fail_storage::{FailStorage, FailableAction},
488                pruning::{PrunedHeightStorage, PrunerCfg},
489                sql::testing::TmpDb,
490            },
491        },
492        fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
493        node::data_source::NodeDataSource,
494        task::BackgroundTask,
495        testing::{
496            consensus::{MockDataSource, MockNetwork},
497            mocks::{MockBase, MockTypes, mock_transaction},
498            sleep,
499        },
500        types::HeightIndexed,
501    };
502
503    type Provider = TestProvider<QueryServiceProvider<MockBase>>;
504    type EpochProvider = TestProvider<QueryServiceProvider<EpochVersion>>;
505
506    fn ignore<T>(_: T) {}
507
508    /// Build a data source suitable for this suite of tests.
509    async fn builder<P: AvailabilityProvider<MockTypes> + Clone>(
510        db: &TmpDb,
511        provider: &P,
512    ) -> sql::Builder<MockTypes, P> {
513        db.config()
514            .builder((*provider).clone())
515            .await
516            .unwrap()
517            // We disable proactive fetching for these tests, since we are intending to test on
518            // demand fetching, and proactive fetching could lead to false successes.
519            .disable_proactive_fetching()
520    }
521
522    /// A data source suitable for this suite of tests, with the default options.
523    async fn data_source<P: AvailabilityProvider<MockTypes> + Clone>(
524        db: &TmpDb,
525        provider: &P,
526    ) -> SqlDataSource<MockTypes, P> {
527        builder(db, provider).await.build().await.unwrap()
528    }
529
530    #[test_log::test(tokio::test(flavor = "multi_thread"))]
531    async fn test_fetch_on_request() {
532        // Create the consensus network.
533        let mut network = MockNetwork::<MockDataSource>::init().await;
534
535        // Start a web server that the non-consensus node can use to fetch blocks.
536        let port = reserve_tcp_port().unwrap();
537        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
538        app.register_module(
539            "availability",
540            define_api(
541                &Default::default(),
542                MockBase::instance(),
543                "1.0.0".parse().unwrap(),
544            )
545            .unwrap(),
546        )
547        .unwrap();
548        network.spawn(
549            "server",
550            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
551        );
552
553        // Start a data source which is not receiving events from consensus, only from a peer.
554        let db = TmpDb::init().await;
555        let provider = Provider::new(QueryServiceProvider::new(
556            format!("http://localhost:{port}").parse().unwrap(),
557            MockBase::instance(),
558        ));
559        let data_source = data_source(&db, &provider).await;
560
561        // Start consensus.
562        network.start().await;
563
564        // Wait until the block height reaches 6. This gives us the genesis block, one additional
565        // block at the end, and then one block to play around with fetching each type of resource:
566        // * Leaf
567        // * Block
568        // * Payload
569        // * VID common
570        let leaves = network.data_source().subscribe_leaves(1).await;
571        let leaves = leaves.take(5).collect::<Vec<_>>().await;
572        let test_leaf = &leaves[0];
573        let test_block = &leaves[1];
574        let test_payload = &leaves[2];
575        let test_common = &leaves[3];
576
577        // Make requests for missing data that should _not_ trigger an active fetch:
578        tracing::info!("requesting unfetchable resources");
579        let mut fetches = vec![];
580        // * An unknown leaf hash.
581        fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
582        // * An unknown leaf height.
583        fetches.push(
584            data_source
585                .get_leaf(test_leaf.height() as usize)
586                .await
587                .map(ignore),
588        );
589        // * An unknown block hash.
590        fetches.push(
591            data_source
592                .get_block(test_block.block_hash())
593                .await
594                .map(ignore),
595        );
596        fetches.push(
597            data_source
598                .get_payload(test_payload.block_hash())
599                .await
600                .map(ignore),
601        );
602        fetches.push(
603            data_source
604                .get_vid_common(test_common.block_hash())
605                .await
606                .map(ignore),
607        );
608        // * An unknown block height.
609        fetches.push(
610            data_source
611                .get_block(test_block.height() as usize)
612                .await
613                .map(ignore),
614        );
615        fetches.push(
616            data_source
617                .get_payload(test_payload.height() as usize)
618                .await
619                .map(ignore),
620        );
621        fetches.push(
622            data_source
623                .get_vid_common(test_common.height() as usize)
624                .await
625                .map(ignore),
626        );
627        // * Genesis VID common (no VID for genesis)
628        fetches.push(data_source.get_vid_common(0).await.map(ignore));
629        // * An unknown transaction.
630        fetches.push(
631            data_source
632                .get_block_containing_transaction(mock_transaction(vec![]).commit())
633                .await
634                .map(ignore),
635        );
636
637        // Even if we give data extra time to propagate, these requests will not resolve, since we
638        // didn't trigger any active fetches.
639        sleep(Duration::from_secs(1)).await;
640        for (i, fetch) in fetches.into_iter().enumerate() {
641            tracing::info!("checking fetch {i} is unresolved");
642            fetch.try_resolve().unwrap_err();
643        }
644
645        // Now we will actually fetch the missing data. First, since our node is not really
646        // connected to consensus, we need to give it a leaf after the range of interest so it
647        // learns about the correct block height. We will temporarily lock requests to the provider
648        // so that we can verify that without the provider, the node does _not_ get the data.
649        provider.block().await;
650        data_source
651            .append(leaves.last().cloned().unwrap().into())
652            .await
653            .unwrap();
654
655        tracing::info!("requesting fetchable resources");
656        let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
657        let req_block = data_source.get_block(test_block.height() as usize).await;
658        let req_payload = data_source
659            .get_payload(test_payload.height() as usize)
660            .await;
661        let req_common = data_source
662            .get_vid_common(test_common.height() as usize)
663            .await;
664
665        // Give the requests some extra time to complete, and check that they still haven't
666        // resolved, since the provider is blocked. This just ensures the integrity of the test by
667        // checking the node didn't mysteriously get the block from somewhere else, so that when we
668        // unblock the provider and the node finally gets the block, we know it came from the
669        // provider.
670        sleep(Duration::from_secs(1)).await;
671        req_leaf.try_resolve().unwrap_err();
672        req_block.try_resolve().unwrap_err();
673        req_payload.try_resolve().unwrap_err();
674        req_common.try_resolve().unwrap_err();
675
676        // Unblock the request and see that we eventually receive the data.
677        provider.unblock().await;
678        let leaf = data_source
679            .get_leaf(test_leaf.height() as usize)
680            .await
681            .await;
682        let block = data_source
683            .get_block(test_block.height() as usize)
684            .await
685            .await;
686        let payload = data_source
687            .get_payload(test_payload.height() as usize)
688            .await
689            .await;
690        let common = data_source
691            .get_vid_common(test_common.height() as usize)
692            .await
693            .await;
694        {
695            // Verify the data.
696            let truth = network.data_source();
697            assert_eq!(
698                leaf,
699                truth.get_leaf(test_leaf.height() as usize).await.await
700            );
701            assert_eq!(
702                block,
703                truth.get_block(test_block.height() as usize).await.await
704            );
705            assert_eq!(
706                payload,
707                truth
708                    .get_payload(test_payload.height() as usize)
709                    .await
710                    .await
711            );
712            assert_eq!(
713                common,
714                truth
715                    .get_vid_common(test_common.height() as usize)
716                    .await
717                    .await
718            );
719        }
720
721        // Fetching the block and payload should have also fetched the corresponding leaves, since
722        // we have an invariant that we should not store a block in the database without its
723        // corresponding leaf and header. Thus we should be able to get the leaves even if the
724        // provider is blocked.
725        provider.block().await;
726        for leaf in [test_block, test_payload] {
727            tracing::info!("fetching existing leaf {}", leaf.height());
728            let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
729            assert_eq!(*leaf, fetched_leaf);
730        }
731
732        // On the other hand, fetching the block corresponding to `leaf` _will_ trigger a fetch,
733        // since fetching a leaf does not necessarily fetch the corresponding block. We can fetch by
734        // hash now, since the presence of the corresponding leaf allows us to confirm that a block
735        // with this hash exists, and trigger a fetch for it.
736        tracing::info!("fetching block by hash");
737        provider.unblock().await;
738        {
739            let block = data_source.get_block(test_leaf.block_hash()).await.await;
740            assert_eq!(block.hash(), leaf.block_hash());
741        }
742
743        // Test a similar scenario, but with payload instead of block: we are aware of
744        // `leaves.last()` but not the corresponding payload, but we can fetch that payload by block
745        // hash.
746        tracing::info!("fetching payload by hash");
747        {
748            let leaf = leaves.last().unwrap();
749            let payload = data_source.get_payload(leaf.block_hash()).await.await;
750            assert_eq!(payload.height(), leaf.height());
751            assert_eq!(payload.block_hash(), leaf.block_hash());
752            assert_eq!(payload.hash(), leaf.payload_hash());
753        }
754    }
755
756    #[tokio::test(flavor = "multi_thread")]
757    async fn test_fetch_on_request_epoch_version() {
758        // This test verifies that our provider can handle fetching things by their hashes,
759        // specifically focused on epoch version transitions
760        tracing::info!("Starting test_fetch_on_request_epoch_version");
761
762        // Create the consensus network.
763        let mut network = MockNetwork::<MockDataSource>::init().await;
764
765        // Start a web server that the non-consensus node can use to fetch blocks.
766        let port = reserve_tcp_port().unwrap();
767        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
768        app.register_module(
769            "availability",
770            define_api(
771                &Default::default(),
772                EpochVersion::instance(),
773                "1.0.0".parse().unwrap(),
774            )
775            .unwrap(),
776        )
777        .unwrap();
778        network.spawn(
779            "server",
780            app.serve(format!("0.0.0.0:{port}"), EpochVersion::instance()),
781        );
782
783        // Start a data source which is not receiving events from consensus, only from a peer.
784        // Use our special test provider that handles epoch version transitions
785        let db = TmpDb::init().await;
786        let provider = EpochProvider::new(QueryServiceProvider::new(
787            format!("http://localhost:{port}").parse().unwrap(),
788            EpochVersion::instance(),
789        ));
790        let data_source = data_source(&db, &provider).await;
791
792        // Start consensus.
793        network.start().await;
794
795        // Wait until the block height reaches 6. This gives us the genesis block, one additional
796        // block at the end, and then one block to play around with fetching each type of resource:
797        // * Leaf
798        // * Block
799        // * Payload
800        // * VID common
801        let leaves = network.data_source().subscribe_leaves(1).await;
802        let leaves = leaves.take(5).collect::<Vec<_>>().await;
803        let test_leaf = &leaves[0];
804        let test_block = &leaves[1];
805        let test_payload = &leaves[2];
806        let test_common = &leaves[3];
807
808        // Make requests for missing data that should _not_ trigger an active fetch:
809        let mut fetches = vec![];
810        // * An unknown leaf hash.
811        fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
812        // * An unknown leaf height.
813        fetches.push(
814            data_source
815                .get_leaf(test_leaf.height() as usize)
816                .await
817                .map(ignore),
818        );
819        // * An unknown block hash.
820        fetches.push(
821            data_source
822                .get_block(test_block.block_hash())
823                .await
824                .map(ignore),
825        );
826        fetches.push(
827            data_source
828                .get_payload(test_payload.block_hash())
829                .await
830                .map(ignore),
831        );
832        fetches.push(
833            data_source
834                .get_vid_common(test_common.block_hash())
835                .await
836                .map(ignore),
837        );
838        // * An unknown block height.
839        fetches.push(
840            data_source
841                .get_block(test_block.height() as usize)
842                .await
843                .map(ignore),
844        );
845        fetches.push(
846            data_source
847                .get_payload(test_payload.height() as usize)
848                .await
849                .map(ignore),
850        );
851        fetches.push(
852            data_source
853                .get_vid_common(test_common.height() as usize)
854                .await
855                .map(ignore),
856        );
857        // * Genesis VID common (no VID for genesis)
858        fetches.push(data_source.get_vid_common(0).await.map(ignore));
859        // * An unknown transaction.
860        fetches.push(
861            data_source
862                .get_block_containing_transaction(mock_transaction(vec![]).commit())
863                .await
864                .map(ignore),
865        );
866
867        // Even if we give data extra time to propagate, these requests will not resolve, since we
868        // didn't trigger any active fetches.
869        sleep(Duration::from_secs(1)).await;
870        for (i, fetch) in fetches.into_iter().enumerate() {
871            tracing::info!("checking fetch {i} is unresolved");
872            fetch.try_resolve().unwrap_err();
873        }
874
875        // Now we will actually fetch the missing data. First, since our node is not really
876        // connected to consensus, we need to give it a leaf after the range of interest so it
877        // learns about the correct block height. We will temporarily lock requests to the provider
878        // so that we can verify that without the provider, the node does _not_ get the data.
879        provider.block().await;
880        data_source
881            .append(leaves.last().cloned().unwrap().into())
882            .await
883            .unwrap();
884
885        let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
886        let req_block = data_source.get_block(test_block.height() as usize).await;
887        let req_payload = data_source
888            .get_payload(test_payload.height() as usize)
889            .await;
890        let req_common = data_source
891            .get_vid_common(test_common.height() as usize)
892            .await;
893
894        // Give the requests some extra time to complete, and check that they still haven't
895        // resolved, since the provider is blocked. This just ensures the integrity of the test by
896        // checking the node didn't mysteriously get the block from somewhere else, so that when we
897        // unblock the provider and the node finally gets the block, we know it came from the
898        // provider.
899        sleep(Duration::from_secs(1)).await;
900        req_leaf.try_resolve().unwrap_err();
901        req_block.try_resolve().unwrap_err();
902        req_payload.try_resolve().unwrap_err();
903        req_common.try_resolve().unwrap_err();
904
905        // Unblock the request and see that we eventually receive the data.
906        provider.unblock().await;
907        let leaf = data_source
908            .get_leaf(test_leaf.height() as usize)
909            .await
910            .await;
911        let block = data_source
912            .get_block(test_block.height() as usize)
913            .await
914            .await;
915        let payload = data_source
916            .get_payload(test_payload.height() as usize)
917            .await
918            .await;
919        let common = data_source
920            .get_vid_common(test_common.height() as usize)
921            .await
922            .await;
923        {
924            // Verify the data.
925            let truth = network.data_source();
926            assert_eq!(
927                leaf,
928                truth.get_leaf(test_leaf.height() as usize).await.await
929            );
930            assert_eq!(
931                block,
932                truth.get_block(test_block.height() as usize).await.await
933            );
934            assert_eq!(
935                payload,
936                truth
937                    .get_payload(test_payload.height() as usize)
938                    .await
939                    .await
940            );
941            assert_eq!(
942                common,
943                truth
944                    .get_vid_common(test_common.height() as usize)
945                    .await
946                    .await
947            );
948        }
949
950        // Fetching the block and payload should have also fetched the corresponding leaves, since
951        // we have an invariant that we should not store a block in the database without its
952        // corresponding leaf and header. Thus we should be able to get the leaves even if the
953        // provider is blocked.
954        provider.block().await;
955        for leaf in [test_block, test_payload] {
956            tracing::info!("fetching existing leaf {}", leaf.height());
957            let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
958            assert_eq!(*leaf, fetched_leaf);
959        }
960
961        // On the other hand, fetching the block corresponding to `leaf` _will_ trigger a fetch,
962        // since fetching a leaf does not necessarily fetch the corresponding block. We can fetch by
963        // hash now, since the presence of the corresponding leaf allows us to confirm that a block
964        // with this hash exists, and trigger a fetch for it.
965        provider.unblock().await;
966        {
967            let block = data_source.get_block(test_leaf.block_hash()).await.await;
968            assert_eq!(block.hash(), leaf.block_hash());
969        }
970
971        // Test a similar scenario, but with payload instead of block: we are aware of
972        // `leaves.last()` but not the corresponding payload, but we can fetch that payload by block
973        // hash.
974        {
975            let leaf = leaves.last().unwrap();
976            let payload = data_source.get_payload(leaf.block_hash()).await.await;
977            assert_eq!(payload.height(), leaf.height());
978            assert_eq!(payload.block_hash(), leaf.block_hash());
979            assert_eq!(payload.hash(), leaf.payload_hash());
980        }
981
982        // Add more debug logs throughout the test
983        tracing::info!("Test completed successfully!");
984    }
985
986    #[test_log::test(tokio::test(flavor = "multi_thread"))]
987    async fn test_fetch_block_and_leaf_concurrently() {
988        // Create the consensus network.
989        let mut network = MockNetwork::<MockDataSource>::init().await;
990
991        // Start a web server that the non-consensus node can use to fetch blocks.
992        let port = reserve_tcp_port().unwrap();
993        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
994        app.register_module(
995            "availability",
996            define_api(
997                &Default::default(),
998                MockBase::instance(),
999                "1.0.0".parse().unwrap(),
1000            )
1001            .unwrap(),
1002        )
1003        .unwrap();
1004        network.spawn(
1005            "server",
1006            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1007        );
1008
1009        // Start a data source which is not receiving events from consensus, only from a peer.
1010        let db = TmpDb::init().await;
1011        let provider = Provider::new(QueryServiceProvider::new(
1012            format!("http://localhost:{port}").parse().unwrap(),
1013            MockBase::instance(),
1014        ));
1015        let data_source = data_source(&db, &provider).await;
1016
1017        // Start consensus.
1018        network.start().await;
1019
1020        // Wait until the block height reaches 3. This gives us the genesis block, one additional
1021        // block at the end, and then one block that we can use to test fetching.
1022        let leaves = network.data_source().subscribe_leaves(1).await;
1023        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1024        let test_leaf = &leaves[0];
1025
1026        // Tell the node about a leaf after the one of interest so it learns about the block height.
1027        data_source.append(leaves[1].clone().into()).await.unwrap();
1028
1029        // Fetch a leaf and the corresponding block at the same time. This will result in two tasks
1030        // trying to fetch the same leaf, but one should win and notify the other, which ultimately
1031        // ends up not fetching anything.
1032        let (leaf, block) = join(
1033            data_source
1034                .get_leaf(test_leaf.height() as usize)
1035                .await
1036                .into_future(),
1037            data_source
1038                .get_block(test_leaf.height() as usize)
1039                .await
1040                .into_future(),
1041        )
1042        .await;
1043        assert_eq!(leaf, *test_leaf);
1044        assert_eq!(leaf.header(), block.header());
1045    }
1046
1047    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1048    async fn test_fetch_different_blocks_same_payload() {
1049        // Create the consensus network.
1050        let mut network = MockNetwork::<MockDataSource>::init().await;
1051
1052        // Start a web server that the non-consensus node can use to fetch blocks.
1053        let port = reserve_tcp_port().unwrap();
1054        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1055        app.register_module(
1056            "availability",
1057            define_api(
1058                &Default::default(),
1059                MockBase::instance(),
1060                "1.0.0".parse().unwrap(),
1061            )
1062            .unwrap(),
1063        )
1064        .unwrap();
1065        network.spawn(
1066            "server",
1067            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1068        );
1069
1070        // Start a data source which is not receiving events from consensus, only from a peer.
1071        let db = TmpDb::init().await;
1072        let provider = Provider::new(QueryServiceProvider::new(
1073            format!("http://localhost:{port}").parse().unwrap(),
1074            MockBase::instance(),
1075        ));
1076        let data_source = data_source(&db, &provider).await;
1077
1078        // Start consensus.
1079        network.start().await;
1080
1081        // Wait until the block height reaches 4. This gives us the genesis block, one additional
1082        // block at the end, and then two blocks that we can use to test fetching.
1083        let leaves = network.data_source().subscribe_leaves(1).await;
1084        let leaves = leaves.take(4).collect::<Vec<_>>().await;
1085
1086        // Tell the node about a leaf after the range of interest so it learns about the block
1087        // height.
1088        data_source
1089            .append(leaves.last().cloned().unwrap().into())
1090            .await
1091            .unwrap();
1092
1093        // All the blocks here are empty, so they have the same payload:
1094        assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash());
1095        // If we fetch both blocks at the same time, we can check that we haven't broken anything
1096        // with whatever optimizations we add to deduplicate payload fetching.
1097        let (block1, block2) = join(
1098            data_source
1099                .get_block(leaves[0].height() as usize)
1100                .await
1101                .into_future(),
1102            data_source
1103                .get_block(leaves[1].height() as usize)
1104                .await
1105                .into_future(),
1106        )
1107        .await;
1108        assert_eq!(block1.header(), leaves[0].header());
1109        assert_eq!(block2.header(), leaves[1].header());
1110    }
1111
1112    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1113    async fn test_fetch_stream() {
1114        // Create the consensus network.
1115        let mut network = MockNetwork::<MockDataSource>::init().await;
1116
1117        // Start a web server that the non-consensus node can use to fetch blocks.
1118        let port = reserve_tcp_port().unwrap();
1119        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1120        app.register_module(
1121            "availability",
1122            define_api(
1123                &Default::default(),
1124                MockBase::instance(),
1125                "1.0.0".parse().unwrap(),
1126            )
1127            .unwrap(),
1128        )
1129        .unwrap();
1130        network.spawn(
1131            "server",
1132            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1133        );
1134
1135        // Start a data source which is not receiving events from consensus, only from a peer.
1136        let db = TmpDb::init().await;
1137        let provider = Provider::new(QueryServiceProvider::new(
1138            format!("http://localhost:{port}").parse().unwrap(),
1139            MockBase::instance(),
1140        ));
1141        let data_source = data_source(&db, &provider).await;
1142
1143        // Start consensus.
1144        network.start().await;
1145
1146        // Subscribe to objects from the future.
1147        let blocks = data_source.subscribe_blocks(0).await;
1148        let leaves = data_source.subscribe_leaves(0).await;
1149        let common = data_source.subscribe_vid_common(0).await;
1150
1151        // Wait for a few blocks to be finalized.
1152        let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1153        let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1154
1155        // Tell the node about a leaf after the range of interest so it learns about the block
1156        // height.
1157        data_source
1158            .append(finalized_leaves.last().cloned().unwrap().into())
1159            .await
1160            .unwrap();
1161
1162        // Check the subscriptions.
1163        let blocks = blocks.take(5).collect::<Vec<_>>().await;
1164        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1165        let common = common.take(5).collect::<Vec<_>>().await;
1166        for i in 0..5 {
1167            tracing::info!("checking block {i}");
1168            assert_eq!(leaves[i], finalized_leaves[i]);
1169            assert_eq!(blocks[i].header(), finalized_leaves[i].header());
1170            assert_eq!(common[i], data_source.get_vid_common(i).await.await);
1171        }
1172    }
1173
1174    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1175    async fn test_fetch_range_start() {
1176        // Create the consensus network.
1177        let mut network = MockNetwork::<MockDataSource>::init().await;
1178
1179        // Start a web server that the non-consensus node can use to fetch blocks.
1180        let port = reserve_tcp_port().unwrap();
1181        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1182        app.register_module(
1183            "availability",
1184            define_api(
1185                &Default::default(),
1186                MockBase::instance(),
1187                "1.0.0".parse().unwrap(),
1188            )
1189            .unwrap(),
1190        )
1191        .unwrap();
1192        network.spawn(
1193            "server",
1194            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1195        );
1196
1197        // Start a data source which is not receiving events from consensus, only from a peer.
1198        let db = TmpDb::init().await;
1199        let provider = Provider::new(QueryServiceProvider::new(
1200            format!("http://localhost:{port}").parse().unwrap(),
1201            MockBase::instance(),
1202        ));
1203        let data_source = data_source(&db, &provider).await;
1204
1205        // Start consensus.
1206        network.start().await;
1207
1208        // Wait for a few blocks to be finalized.
1209        let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1210        let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1211
1212        // Tell the node about a leaf after the range of interest (so it learns about the block
1213        // height) and one in the middle of the range, to test what happens when data is missing
1214        // from the beginning of the range but other data is available.
1215        let mut tx = data_source.write().await.unwrap();
1216        tx.insert_leaf(&finalized_leaves[2]).await.unwrap();
1217        tx.insert_leaf(&finalized_leaves[4]).await.unwrap();
1218        tx.commit().await.unwrap();
1219
1220        // Get the whole range of leaves.
1221        let leaves = data_source
1222            .get_leaf_range(..5)
1223            .await
1224            .then(Fetch::resolve)
1225            .collect::<Vec<_>>()
1226            .await;
1227        for i in 0..5 {
1228            tracing::info!("checking leaf {i}");
1229            assert_eq!(leaves[i], finalized_leaves[i]);
1230        }
1231    }
1232
1233    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1234    async fn fetch_transaction() {
1235        // Create the consensus network.
1236        let mut network = MockNetwork::<MockDataSource>::init().await;
1237
1238        // Start a web server that the non-consensus node can use to fetch blocks.
1239        let port = reserve_tcp_port().unwrap();
1240        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1241        app.register_module(
1242            "availability",
1243            define_api(
1244                &Default::default(),
1245                MockBase::instance(),
1246                "1.0.0".parse().unwrap(),
1247            )
1248            .unwrap(),
1249        )
1250        .unwrap();
1251        network.spawn(
1252            "server",
1253            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1254        );
1255
1256        // Start a data source which is not receiving events from consensus. We don't give it a
1257        // fetcher since transactions are always fetched passively anyways.
1258        let db = TmpDb::init().await;
1259        let data_source = data_source(&db, &NoFetching).await;
1260
1261        // Subscribe to blocks.
1262        let mut leaves = network.data_source().subscribe_leaves(1).await;
1263        let mut blocks = network.data_source().subscribe_blocks(1).await;
1264
1265        // Start consensus.
1266        network.start().await;
1267
1268        // Subscribe to a transaction which hasn't been sequenced yet. This is completely passive
1269        // and works without a fetcher; we don't trigger fetches for transactions that we don't know
1270        // exist.
1271        let tx = mock_transaction(vec![1, 2, 3]);
1272        let fut = data_source
1273            .get_block_containing_transaction(tx.commit())
1274            .await;
1275
1276        // Sequence the transaction.
1277        network.submit_transaction(tx.clone()).await;
1278
1279        // Send blocks to the query service, the future will resolve as soon as it sees a block
1280        // containing the transaction.
1281        let block = loop {
1282            let leaf = leaves.next().await.unwrap();
1283            let block = blocks.next().await.unwrap();
1284
1285            data_source
1286                .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1287                .await
1288                .unwrap();
1289
1290            if block.transaction_by_hash(tx.commit()).is_some() {
1291                break block;
1292            }
1293        };
1294        tracing::info!("transaction included in block {}", block.height());
1295
1296        let fetched_tx = fut.await;
1297        assert_eq!(
1298            fetched_tx,
1299            BlockWithTransaction::with_hash(block, tx.commit()).unwrap()
1300        );
1301
1302        // Future queries for this transaction resolve immediately.
1303        assert_eq!(
1304            fetched_tx,
1305            data_source
1306                .get_block_containing_transaction(tx.commit())
1307                .await
1308                .await
1309        );
1310    }
1311
1312    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1313    async fn test_retry() {
1314        // Create the consensus network.
1315        let mut network = MockNetwork::<MockDataSource>::init().await;
1316
1317        // Start a web server that the non-consensus node can use to fetch blocks.
1318        let port = reserve_tcp_port().unwrap();
1319        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1320        app.register_module(
1321            "availability",
1322            define_api(
1323                &Default::default(),
1324                MockBase::instance(),
1325                "1.0.0".parse().unwrap(),
1326            )
1327            .unwrap(),
1328        )
1329        .unwrap();
1330        network.spawn(
1331            "server",
1332            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1333        );
1334
1335        // Start a data source which is not receiving events from consensus.
1336        let db = TmpDb::init().await;
1337        let provider = Provider::new(QueryServiceProvider::new(
1338            format!("http://localhost:{port}").parse().unwrap(),
1339            MockBase::instance(),
1340        ));
1341        let data_source = builder(&db, &provider)
1342            .await
1343            .with_max_retry_interval(Duration::from_secs(1))
1344            .build()
1345            .await
1346            .unwrap();
1347
1348        // Start consensus.
1349        network.start().await;
1350
1351        // Wait until the block height reaches 3. This gives us the genesis block, one additional
1352        // block at the end, and one block to try fetching.
1353        let leaves = network.data_source().subscribe_leaves(1).await;
1354        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1355        let test_leaf = &leaves[0];
1356
1357        // Cause requests to fail temporarily, so we can test retries.
1358        provider.fail();
1359
1360        // Give the node a leaf after the range of interest so it learns about the correct block
1361        // height.
1362        data_source
1363            .append(leaves.last().cloned().unwrap().into())
1364            .await
1365            .unwrap();
1366
1367        tracing::info!("requesting leaf from failing providers");
1368        let fut = data_source.get_leaf(test_leaf.height() as usize).await;
1369
1370        // Wait a few retries and check that the request has not completed, since the provider is
1371        // failing.
1372        sleep(Duration::from_secs(5)).await;
1373        fut.try_resolve().unwrap_err();
1374
1375        // As soon as the provider recovers, the request can complete.
1376        provider.unfail();
1377        assert_eq!(
1378            data_source
1379                .get_leaf(test_leaf.height() as usize)
1380                .await
1381                .await,
1382            *test_leaf
1383        );
1384    }
1385
1386    // Uses deprecated generic-array 0.14.x via digest, required for VidCommitment compatibility
1387    #[allow(deprecated)]
1388    fn random_vid_commit() -> VidCommitment {
1389        let mut bytes = [0; 32];
1390        rand::thread_rng().fill_bytes(&mut bytes);
1391        VidCommitment::V0(GenericArray::from(bytes).into())
1392    }
1393
1394    fn random_leaf_request() -> LeafRequest<MockTypes> {
1395        let mut bytes = [0; 32];
1396        rand::thread_rng().fill_bytes(&mut bytes);
1397        LeafRequest {
1398            height: 1,
1399            expected_leaf: Commitment::from_raw(bytes),
1400            expected_qc: Commitment::from_raw(bytes),
1401        }
1402    }
1403
1404    async fn malicious_server(port: u16) {
1405        let mut api = load_api::<(), ServerError, MockBase>(
1406            None::<std::path::PathBuf>,
1407            include_str!("../../../api/availability.toml"),
1408            vec![],
1409        )
1410        .unwrap();
1411
1412        api.get("get_leaf", move |req, _| {
1413            async move {
1414                let height = req.integer_param("height")?;
1415
1416                // Respond with a leaf of the correct height, but with a dummy hash.
1417                let mut leaf = LeafQueryData::<MockTypes>::genesis(
1418                    &Default::default(),
1419                    &Default::default(),
1420                    TEST_VERSIONS.test,
1421                )
1422                .await;
1423                leaf.leaf.block_header_mut().block_number = height;
1424                leaf.qc.data.leaf_commit = leaf.hash();
1425                Ok(leaf)
1426            }
1427            .boxed()
1428        })
1429        .unwrap()
1430        .get("get_leaf_range", move |req, _| {
1431            async move {
1432                let start = req.integer_param("from")?;
1433                let end = req.integer_param("until")?;
1434
1435                // Respond with the correct range, but using leaves with dummy data.
1436                let leaf = LeafQueryData::<MockTypes>::genesis(
1437                    &Default::default(),
1438                    &Default::default(),
1439                    TEST_VERSIONS.test,
1440                )
1441                .await;
1442                let leaves = (start..end)
1443                    .map(|i| {
1444                        let mut leaf = leaf.clone();
1445                        leaf.leaf.block_header_mut().block_number = i;
1446                        leaf.qc.data.leaf_commit = leaf.hash();
1447                        leaf
1448                    })
1449                    .collect::<Vec<_>>();
1450
1451                Ok(leaves)
1452            }
1453            .boxed()
1454        })
1455        .unwrap()
1456        .get("get_block", move |_, _| {
1457            async move {
1458                // No matter what data we are asked for, always respond with dummy data.
1459                Ok(BlockQueryData::<MockTypes>::genesis(
1460                    &Default::default(),
1461                    &Default::default(),
1462                    TEST_VERSIONS.test.base,
1463                )
1464                .await)
1465            }
1466            .boxed()
1467        })
1468        .unwrap()
1469        .get("get_block_range", move |req, _| {
1470            async move {
1471                let start = req.integer_param("from")?;
1472                let end = req.integer_param("until")?;
1473
1474                // Respond with the correct range, but using blocks with dummy data.
1475                let block = BlockQueryData::<MockTypes>::genesis(
1476                    &Default::default(),
1477                    &Default::default(),
1478                    TEST_VERSIONS.test.base,
1479                )
1480                .await;
1481                let blocks = (start..end)
1482                    .map(|i| {
1483                        let mut block = block.clone();
1484                        block.header.block_number = i;
1485                        block
1486                    })
1487                    .collect::<Vec<_>>();
1488
1489                Ok(blocks)
1490            }
1491            .boxed()
1492        })
1493        .unwrap()
1494        .get("get_vid_common", move |_, _| {
1495            async move {
1496                // No matter what data we are asked for, always respond with dummy data.
1497                Ok(VidCommonQueryData::<MockTypes>::genesis(
1498                    &Default::default(),
1499                    &Default::default(),
1500                    TEST_VERSIONS.test.base,
1501                )
1502                .await)
1503            }
1504            .boxed()
1505        })
1506        .unwrap()
1507        .get("get_vid_common_range", move |req, _| {
1508            async move {
1509                let start = req.integer_param("from")?;
1510                let end = req.integer_param("until")?;
1511
1512                // Respond with the correct range, but using VID with dummy data.
1513                let vid = VidCommonQueryData::<MockTypes>::genesis(
1514                    &Default::default(),
1515                    &Default::default(),
1516                    TEST_VERSIONS.test.base,
1517                )
1518                .await;
1519                let vids = (start..end)
1520                    .map(|i| {
1521                        let mut vid = vid.clone();
1522                        vid.height = i;
1523                        vid
1524                    })
1525                    .collect::<Vec<_>>();
1526
1527                Ok(vids)
1528            }
1529            .boxed()
1530        })
1531        .unwrap();
1532
1533        let mut app = App::<(), ServerError>::with_state(());
1534        app.register_module("availability", api).unwrap();
1535        app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
1536            .await
1537            .ok();
1538    }
1539
1540    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1541    async fn test_fetch_from_malicious_server() {
1542        let port = reserve_tcp_port().unwrap();
1543        let _server = BackgroundTask::spawn("malicious server", malicious_server(port));
1544
1545        let provider = QueryServiceProvider::new(
1546            format!("http://localhost:{port}").parse().unwrap(),
1547            MockBase::instance(),
1548        );
1549        provider.client.connect(None).await;
1550
1551        // Query for a random leaf, the server will respond with a different leaf, and we should
1552        // detect the error.
1553        tracing::info!("fetch leaf");
1554        let err = provider
1555            .fetch_leaf(random_leaf_request())
1556            .await
1557            .unwrap_err();
1558        assert!(err.to_string().contains("wrong hash"), "{err:#}");
1559
1560        // Ranged leaf request.
1561        tracing::info!("fetch leaf range");
1562        let req = random_leaf_request();
1563        let err = provider
1564            .fetch_leaf_range(LeafRangeRequest {
1565                start: 0,
1566                end: req.height + 1,
1567                last_leaf: req.expected_leaf,
1568                last_qc: req.expected_qc,
1569            })
1570            .await
1571            .unwrap_err();
1572        assert!(err.to_string().contains("wrong hash"), "{err:#}");
1573
1574        // Query for a random payload, the server will respond with a different payload, and we
1575        // should detect the error.
1576        tracing::info!("fetch payload");
1577        let err = provider
1578            .fetch_payload::<MockTypes>(PayloadRequest(random_vid_commit()))
1579            .await
1580            .unwrap_err();
1581        assert!(
1582            err.to_string().contains("does not match request"),
1583            "{err:#}"
1584        );
1585
1586        // Payload range request.
1587        tracing::info!("fetch payload range");
1588        let err = provider
1589            .fetch_payload_range::<MockTypes>(BlockRangeRequest::from(RangeRequest {
1590                start: 0,
1591                end: 2,
1592                expected_hash: RangeRequest::hash_payloads([
1593                    random_vid_commit(),
1594                    random_vid_commit(),
1595                ]),
1596            }))
1597            .await
1598            .unwrap_err();
1599        assert!(err.to_string().contains("wrong payload hash"), "{err:#}");
1600
1601        // Query for a random VID common, the server will respond with a different one, and we
1602        // should detect the error.
1603        tracing::info!("fetch VID");
1604        let err = provider
1605            .fetch_vid_common::<MockTypes>(VidCommonRequest(random_vid_commit()))
1606            .await
1607            .unwrap_err();
1608        assert!(
1609            err.to_string().contains("inconsistent VID common"),
1610            "{err:#}"
1611        );
1612
1613        // VID range request.
1614        tracing::info!("fetch VID range");
1615        let err = provider
1616            .fetch_vid_common_range::<MockTypes>(VidCommonRangeRequest::from(RangeRequest {
1617                start: 0,
1618                end: 2,
1619                expected_hash: RangeRequest::hash_payloads([
1620                    random_vid_commit(),
1621                    random_vid_commit(),
1622                ]),
1623            }))
1624            .await
1625            .unwrap_err();
1626        assert!(err.to_string().contains("wrong VID common"), "{err:#}");
1627    }
1628
1629    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1630    async fn test_archive_recovery() {
1631        // Create the consensus network.
1632        let mut network = MockNetwork::<MockDataSource>::init().await;
1633
1634        // Start a web server that the non-consensus node can use to fetch blocks.
1635        let port = reserve_tcp_port().unwrap();
1636        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1637        app.register_module(
1638            "availability",
1639            define_api(
1640                &Default::default(),
1641                MockBase::instance(),
1642                "1.0.0".parse().unwrap(),
1643            )
1644            .unwrap(),
1645        )
1646        .unwrap();
1647        network.spawn(
1648            "server",
1649            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1650        );
1651
1652        // Start a data source which is not receiving events from consensus, only from a peer. The
1653        // data source is at first configured to aggressively prune data.
1654        let db = TmpDb::init().await;
1655        let provider = Provider::new(QueryServiceProvider::new(
1656            format!("http://localhost:{port}").parse().unwrap(),
1657            MockBase::instance(),
1658        ));
1659        let mut data_source = db
1660            .config()
1661            .pruner_cfg(
1662                PrunerCfg::new()
1663                    .with_target_retention(Duration::from_secs(0))
1664                    .with_interval(Duration::from_secs(5)),
1665            )
1666            .unwrap()
1667            .builder(provider.clone())
1668            .await
1669            .unwrap()
1670            // Set a fast retry for failed operations. Occasionally storage operations will fail due
1671            // to conflicting write-mode transactions running concurrently. This is ok as they will
1672            // be retried. Having a fast retry interval speeds up the test.
1673            .with_min_retry_interval(Duration::from_millis(100))
1674            // Randomize retries a lot. This will temporarlly separate competing transactions write
1675            // transactions with high probability, so that one of them quickly gets exclusive access
1676            // to the database.
1677            .with_retry_randomization_factor(3.)
1678            .build()
1679            .await
1680            .unwrap();
1681
1682        // Start consensus.
1683        network.start().await;
1684
1685        // Wait until a few blocks are produced.
1686        let leaves = network.data_source().subscribe_leaves(1).await;
1687        let leaves = leaves.take(5).collect::<Vec<_>>().await;
1688
1689        // The disconnected data source has no data yet, so it hasn't done any pruning.
1690        let pruned_height = data_source
1691            .read()
1692            .await
1693            .unwrap()
1694            .load_pruned_height()
1695            .await
1696            .unwrap();
1697        // Either None or 0 is acceptable, depending on whether or not the prover has run yet.
1698        assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");
1699
1700        // Send the last leaf to the disconnected data source so it learns about the height and
1701        // fetches the missing data.
1702        let last_leaf = leaves.last().unwrap();
1703        data_source.append(last_leaf.clone().into()).await.unwrap();
1704
1705        // Trigger a fetch of each leaf so the database gets populated.
1706        for i in 1..=last_leaf.height() {
1707            tracing::info!(i, "fetching leaf");
1708            assert_eq!(
1709                data_source.get_leaf(i as usize).await.await,
1710                leaves[i as usize - 1]
1711            );
1712        }
1713
1714        // After a bit of time, the pruner has run and deleted all the missing data we just fetched.
1715        loop {
1716            let pruned_height = data_source
1717                .read()
1718                .await
1719                .unwrap()
1720                .load_pruned_height()
1721                .await
1722                .unwrap();
1723            if pruned_height == Some(last_leaf.height()) {
1724                break;
1725            }
1726            tracing::info!(
1727                ?pruned_height,
1728                target_height = last_leaf.height(),
1729                "waiting for pruner to run"
1730            );
1731            sleep(Duration::from_secs(1)).await;
1732        }
1733
1734        // Now close the data source and restart it with archive recovery.
1735        data_source = db
1736            .config()
1737            .archive()
1738            .builder(provider.clone())
1739            .await
1740            .unwrap()
1741            .with_proactive_interval(Duration::from_secs(1))
1742            .build()
1743            .await
1744            .unwrap();
1745
1746        // Pruned height should be reset.
1747        let pruned_height = data_source
1748            .read()
1749            .await
1750            .unwrap()
1751            .load_pruned_height()
1752            .await
1753            .unwrap();
1754        assert_eq!(pruned_height, None);
1755
1756        // The node has pruned all of it's data including the latest block, so it's forgotten the
1757        // block height. We need to give it another leaf with some height so it will be willing to
1758        // fetch.
1759        data_source.append(last_leaf.clone().into()).await.unwrap();
1760
1761        // Wait for the data to be restored. It should be restored by the next major scan.
1762        loop {
1763            let sync_status = data_source.sync_status().await.unwrap();
1764            if sync_status.is_fully_synced() {
1765                break;
1766            }
1767            tracing::info!(?sync_status, "waiting for node to sync");
1768            sleep(Duration::from_secs(1)).await;
1769        }
1770
1771        // The node remains fully synced even after some time; no pruning.
1772        sleep(Duration::from_secs(3)).await;
1773        let sync_status = data_source.sync_status().await.unwrap();
1774        assert!(sync_status.is_fully_synced(), "{sync_status:#?}");
1775    }
1776
1777    #[derive(Clone, Copy, Debug)]
1778    enum FailureType {
1779        Begin,
1780        Write,
1781        Commit,
1782    }
1783
1784    async fn test_fetch_storage_failure_helper(failure: FailureType) {
1785        // Create the consensus network.
1786        let mut network = MockNetwork::<MockDataSource>::init().await;
1787
1788        // Start a web server that the non-consensus node can use to fetch blocks.
1789        let port = reserve_tcp_port().unwrap();
1790        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1791        app.register_module(
1792            "availability",
1793            define_api(
1794                &Default::default(),
1795                MockBase::instance(),
1796                "1.0.0".parse().unwrap(),
1797            )
1798            .unwrap(),
1799        )
1800        .unwrap();
1801        network.spawn(
1802            "server",
1803            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1804        );
1805
1806        // Start a data source which is not receiving events from consensus, only from a peer.
1807        let provider = Provider::new(QueryServiceProvider::new(
1808            format!("http://localhost:{port}").parse().unwrap(),
1809            MockBase::instance(),
1810        ));
1811        let db = TmpDb::init().await;
1812        let storage = FailStorage::from(
1813            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1814                .await
1815                .unwrap(),
1816        );
1817        let data_source = FetchingDataSource::builder(storage, provider)
1818            .disable_proactive_fetching()
1819            .disable_aggregator()
1820            .with_max_retry_interval(Duration::from_millis(100))
1821            .with_retry_timeout(Duration::from_secs(1))
1822            .build()
1823            .await
1824            .unwrap();
1825
1826        // Start consensus.
1827        network.start().await;
1828
1829        // Wait until a couple of blocks are produced.
1830        let leaves = network.data_source().subscribe_leaves(1).await;
1831        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1832
1833        // Send the last leaf to the disconnected data source so it learns about the height.
1834        let last_leaf = leaves.last().unwrap();
1835        let mut tx = data_source.write().await.unwrap();
1836        tx.insert_leaf(last_leaf).await.unwrap();
1837        tx.commit().await.unwrap();
1838
1839        // Trigger a fetch of the first leaf; it should resolve even if we fail to store the leaf.
1840        tracing::info!("fetch with write failure");
1841        match failure {
1842            FailureType::Begin => {
1843                data_source
1844                    .as_ref()
1845                    .fail_begins_writable(FailableAction::Any)
1846                    .await
1847            },
1848            FailureType::Write => data_source.as_ref().fail_writes(FailableAction::Any).await,
1849            FailureType::Commit => data_source.as_ref().fail_commits(FailableAction::Any).await,
1850        }
1851        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1852        data_source.as_ref().pass().await;
1853
1854        // It is possible that the fetch above completes, notifies the subscriber,
1855        // and the fetch below quickly subscribes and gets notified by the same loop.
1856        // We add a delay here to avoid this situation.
1857        // This is not a bug, as being notified after subscribing is fine.
1858        sleep(Duration::from_secs(1)).await;
1859
1860        // We can get the same leaf again, this will again trigger an active fetch since storage
1861        // failed the first time.
1862        tracing::info!("fetch with write success");
1863        let fetch = data_source.get_leaf(1).await;
1864        assert!(fetch.is_pending());
1865        assert_eq!(leaves[0], fetch.await);
1866
1867        sleep(Duration::from_secs(1)).await;
1868
1869        // Finally, we should have the leaf locally and not need to fetch it.
1870        tracing::info!("retrieve from storage");
1871        let fetch = data_source.get_leaf(1).await;
1872        assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());
1873    }
1874
1875    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1876    async fn test_fetch_storage_failure_on_begin() {
1877        test_fetch_storage_failure_helper(FailureType::Begin).await;
1878    }
1879
1880    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1881    async fn test_fetch_storage_failure_on_write() {
1882        test_fetch_storage_failure_helper(FailureType::Write).await;
1883    }
1884
1885    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1886    async fn test_fetch_storage_failure_on_commit() {
1887        test_fetch_storage_failure_helper(FailureType::Commit).await;
1888    }
1889
1890    async fn test_fetch_storage_failure_retry_helper(failure: FailureType) {
1891        // Create the consensus network.
1892        let mut network = MockNetwork::<MockDataSource>::init().await;
1893
1894        // Start a web server that the non-consensus node can use to fetch blocks.
1895        let port = reserve_tcp_port().unwrap();
1896        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1897        app.register_module(
1898            "availability",
1899            define_api(
1900                &Default::default(),
1901                MockBase::instance(),
1902                "1.0.0".parse().unwrap(),
1903            )
1904            .unwrap(),
1905        )
1906        .unwrap();
1907        network.spawn(
1908            "server",
1909            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1910        );
1911
1912        // Start a data source which is not receiving events from consensus, only from a peer.
1913        let provider = Provider::new(QueryServiceProvider::new(
1914            format!("http://localhost:{port}").parse().unwrap(),
1915            MockBase::instance(),
1916        ));
1917        let db = TmpDb::init().await;
1918        let storage = FailStorage::from(
1919            SqlStorage::connect(db.config(), StorageConnectionType::Query)
1920                .await
1921                .unwrap(),
1922        );
1923        let data_source = FetchingDataSource::builder(storage, provider)
1924            .disable_proactive_fetching()
1925            .disable_aggregator()
1926            .with_min_retry_interval(Duration::from_millis(100))
1927            .build()
1928            .await
1929            .unwrap();
1930
1931        // Start consensus.
1932        network.start().await;
1933
1934        // Wait until a couple of blocks are produced.
1935        let leaves = network.data_source().subscribe_leaves(1).await;
1936        let leaves = leaves.take(2).collect::<Vec<_>>().await;
1937
1938        // Send the last leaf to the disconnected data source so it learns about the height.
1939        let last_leaf = leaves.last().unwrap();
1940        let mut tx = data_source.write().await.unwrap();
1941        tx.insert_leaf(last_leaf).await.unwrap();
1942        tx.commit().await.unwrap();
1943
1944        // Trigger a fetch of the first leaf; it should retry until it successfully stores the leaf.
1945        tracing::info!("fetch with write failure");
1946        match failure {
1947            FailureType::Begin => {
1948                data_source
1949                    .as_ref()
1950                    .fail_one_begin_writable(FailableAction::Any)
1951                    .await
1952            },
1953            FailureType::Write => {
1954                data_source
1955                    .as_ref()
1956                    .fail_one_write(FailableAction::Any)
1957                    .await
1958            },
1959            FailureType::Commit => {
1960                data_source
1961                    .as_ref()
1962                    .fail_one_commit(FailableAction::Any)
1963                    .await
1964            },
1965        }
1966        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1967
1968        // Check that the leaf ended up in local storage.
1969        let mut tx = data_source.read().await.unwrap();
1970        assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap());
1971    }
1972
1973    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1974    async fn test_fetch_storage_failure_retry_on_begin() {
1975        test_fetch_storage_failure_retry_helper(FailureType::Begin).await;
1976    }
1977
1978    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1979    async fn test_fetch_storage_failure_retry_on_write() {
1980        test_fetch_storage_failure_retry_helper(FailureType::Write).await;
1981    }
1982
1983    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1984    async fn test_fetch_storage_failure_retry_on_commit() {
1985        test_fetch_storage_failure_retry_helper(FailureType::Commit).await;
1986    }
1987
1988    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1989    async fn test_fetch_on_decide() {
1990        // Create the consensus network.
1991        let mut network = MockNetwork::<MockDataSource>::init().await;
1992
1993        // Start a web server that the non-consensus node can use to fetch blocks.
1994        let port = reserve_tcp_port().unwrap();
1995        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1996        app.register_module(
1997            "availability",
1998            define_api(
1999                &Default::default(),
2000                MockBase::instance(),
2001                "1.0.0".parse().unwrap(),
2002            )
2003            .unwrap(),
2004        )
2005        .unwrap();
2006        network.spawn(
2007            "server",
2008            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2009        );
2010
2011        // Start a data source which is not receiving events from consensus.
2012        let db = TmpDb::init().await;
2013        let provider = Provider::new(QueryServiceProvider::new(
2014            format!("http://localhost:{port}").parse().unwrap(),
2015            MockBase::instance(),
2016        ));
2017        let data_source = builder(&db, &provider)
2018            .await
2019            .with_max_retry_interval(Duration::from_secs(1))
2020            .build()
2021            .await
2022            .unwrap();
2023
2024        // Start consensus.
2025        network.start().await;
2026
2027        // Wait until a block has been decided.
2028        let leaf = network
2029            .data_source()
2030            .subscribe_leaves(1)
2031            .await
2032            .next()
2033            .await
2034            .unwrap();
2035
2036        // Give the node a decide containing the leaf but no additional information.
2037        tracing::info!("send decide event");
2038        data_source.append(leaf.clone().into()).await.unwrap();
2039
2040        // We will eventually retrieve the corresponding block and VID common, triggered by seeing
2041        // the leaf.
2042        tracing::info!("wait");
2043        sleep(Duration::from_secs(5)).await;
2044
2045        // Read the missing data directly from storage (via a database transaction), rather than
2046        // going through the data source, so that this request itself does not trigger a fetch.
2047        // Thus, this will only work if the data was already fetched, triggered by the leaf.
2048        let mut tx = data_source.read().await.unwrap();
2049        let id = BlockId::<MockTypes>::from(leaf.height() as usize);
2050        let block = tx.get_block(id).await.unwrap();
2051        let vid = tx.get_vid_common(id).await.unwrap();
2052
2053        assert_eq!(block.hash(), leaf.block_hash());
2054        assert_eq!(vid.block_hash(), leaf.block_hash());
2055    }
2056
2057    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2058    async fn test_fetch_begin_failure() {
2059        // Create the consensus network.
2060        let mut network = MockNetwork::<MockDataSource>::init().await;
2061
2062        // Start a web server that the non-consensus node can use to fetch blocks.
2063        let port = reserve_tcp_port().unwrap();
2064        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2065        app.register_module(
2066            "availability",
2067            define_api(
2068                &Default::default(),
2069                MockBase::instance(),
2070                "1.0.0".parse().unwrap(),
2071            )
2072            .unwrap(),
2073        )
2074        .unwrap();
2075        network.spawn(
2076            "server",
2077            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2078        );
2079
2080        // Start a data source which is not receiving events from consensus, only from a peer.
2081        let provider = Provider::new(QueryServiceProvider::new(
2082            format!("http://localhost:{port}").parse().unwrap(),
2083            MockBase::instance(),
2084        ));
2085        let db = TmpDb::init().await;
2086        let storage = FailStorage::from(
2087            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2088                .await
2089                .unwrap(),
2090        );
2091        let data_source = FetchingDataSource::builder(storage, provider)
2092            .disable_proactive_fetching()
2093            .disable_aggregator()
2094            .with_min_retry_interval(Duration::from_millis(100))
2095            .build()
2096            .await
2097            .unwrap();
2098
2099        // Start consensus.
2100        network.start().await;
2101
2102        // Wait until a couple of blocks are produced.
2103        let leaves = network.data_source().subscribe_leaves(1).await;
2104        let leaves = leaves.take(2).collect::<Vec<_>>().await;
2105
2106        // Send the last leaf to the disconnected data source so it learns about the height.
2107        let last_leaf = leaves.last().unwrap();
2108        let mut tx = data_source.write().await.unwrap();
2109        tx.insert_leaf(last_leaf).await.unwrap();
2110        tx.commit().await.unwrap();
2111
2112        // Trigger a fetch of the first leaf; it should retry until it is able to determine
2113        // the leaf is fetchable and trigger the fetch.
2114        tracing::info!("fetch with transaction failure");
2115        data_source
2116            .as_ref()
2117            .fail_one_begin_read_only(FailableAction::Any)
2118            .await;
2119        assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
2120    }
2121
2122    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2123    async fn test_fetch_load_failure_block() {
2124        // Create the consensus network.
2125        let mut network = MockNetwork::<MockDataSource>::init().await;
2126
2127        // Start a web server that the non-consensus node can use to fetch blocks.
2128        let port = reserve_tcp_port().unwrap();
2129        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2130        app.register_module(
2131            "availability",
2132            define_api(
2133                &Default::default(),
2134                MockBase::instance(),
2135                "1.0.0".parse().unwrap(),
2136            )
2137            .unwrap(),
2138        )
2139        .unwrap();
2140        network.spawn(
2141            "server",
2142            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2143        );
2144
2145        // Start a data source which is not receiving events from consensus, only from a peer.
2146        let provider = Provider::new(QueryServiceProvider::new(
2147            format!("http://localhost:{port}").parse().unwrap(),
2148            MockBase::instance(),
2149        ));
2150        let db = TmpDb::init().await;
2151        let storage = FailStorage::from(
2152            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2153                .await
2154                .unwrap(),
2155        );
2156        let data_source = FetchingDataSource::builder(storage, provider)
2157            .disable_proactive_fetching()
2158            .disable_aggregator()
2159            .with_min_retry_interval(Duration::from_millis(100))
2160            .build()
2161            .await
2162            .unwrap();
2163
2164        // Start consensus.
2165        network.start().await;
2166
2167        // Wait until a block is produced.
2168        let mut leaves = network.data_source().subscribe_leaves(1).await;
2169        let leaf = leaves.next().await.unwrap();
2170
2171        // Send the leaf to the disconnected data source, so the corresponding block becomes
2172        // fetchable.
2173        let mut tx = data_source.write().await.unwrap();
2174        tx.insert_leaf(&leaf).await.unwrap();
2175        tx.commit().await.unwrap();
2176
2177        // Trigger a fetch of the block by hash; it should retry until it is able to determine the
2178        // leaf is available, thus the block is fetchable, trigger the fetch.
2179        //
2180        // Failing only on the `get_header` call here hits an edge case which is only possible when
2181        // fetching blocks: we successfully determine that the object is not available locally and
2182        // that it might exist, so we actually call `active_fetch` to try and get it. If we then
2183        // fail to load the header and erroneously treat this as the header not being available, we
2184        // will give up and consider the object unfetchable (since the next step would be to fetch
2185        // the corresponding leaf, but we cannot do this with just a block hash).
2186        //
2187        // Thus, this test will only pass if we correctly retry the `active_fetch` until we are
2188        // successfully able to load the header from storage and determine that the block is
2189        // fetchable.
2190        tracing::info!("fetch with read failure");
2191        data_source
2192            .as_ref()
2193            .fail_one_read(FailableAction::GetHeader)
2194            .await;
2195        let fetch = data_source.get_block(leaf.block_hash()).await;
2196
2197        // Give some time for a few reads to fail before letting them succeed.
2198        sleep(Duration::from_secs(2)).await;
2199        data_source.as_ref().pass().await;
2200
2201        let block: BlockQueryData<MockTypes> = fetch.await;
2202        assert_eq!(block.hash(), leaf.block_hash());
2203    }
2204
2205    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2206    async fn test_fetch_load_failure_tx() {
2207        // Create the consensus network.
2208        let mut network = MockNetwork::<MockDataSource>::init().await;
2209
2210        // Start a web server that the non-consensus node can use to fetch blocks.
2211        let port = reserve_tcp_port().unwrap();
2212        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2213        app.register_module(
2214            "availability",
2215            define_api(
2216                &Default::default(),
2217                MockBase::instance(),
2218                "1.0.0".parse().unwrap(),
2219            )
2220            .unwrap(),
2221        )
2222        .unwrap();
2223        network.spawn(
2224            "server",
2225            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2226        );
2227
2228        // Start a data source which is not receiving events from consensus, only from a peer.
2229        let provider = Provider::new(QueryServiceProvider::new(
2230            format!("http://localhost:{port}").parse().unwrap(),
2231            MockBase::instance(),
2232        ));
2233        let db = TmpDb::init().await;
2234        let storage = FailStorage::from(
2235            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2236                .await
2237                .unwrap(),
2238        );
2239        let data_source = FetchingDataSource::builder(storage, provider)
2240            .disable_proactive_fetching()
2241            .disable_aggregator()
2242            .with_min_retry_interval(Duration::from_millis(100))
2243            .build()
2244            .await
2245            .unwrap();
2246
2247        // Start consensus.
2248        network.start().await;
2249
2250        // Wait until a transaction is sequenced.
2251        let tx = mock_transaction(vec![1, 2, 3]);
2252        network.submit_transaction(tx.clone()).await;
2253        let tx = network
2254            .data_source()
2255            .get_block_containing_transaction(tx.commit())
2256            .await
2257            .await;
2258
2259        // Send the block containing the transaction to the disconnected data source.
2260        {
2261            let leaf = network
2262                .data_source()
2263                .get_leaf(tx.transaction.block_height() as usize)
2264                .await
2265                .await;
2266            let block = network
2267                .data_source()
2268                .get_block(tx.transaction.block_height() as usize)
2269                .await
2270                .await;
2271            let mut tx = data_source.write().await.unwrap();
2272            tx.insert_leaf(&leaf).await.unwrap();
2273            tx.insert_block(&block).await.unwrap();
2274            tx.commit().await.unwrap();
2275        }
2276
2277        // Check that the transaction is there.
2278        tracing::info!("fetch success");
2279        assert_eq!(
2280            tx,
2281            data_source
2282                .get_block_containing_transaction(tx.transaction.hash())
2283                .await
2284                .await
2285        );
2286
2287        // Fetch the transaction with storage failures.
2288        //
2289        // Failing only one read here hits an edge case that only exists for unfetchable objects
2290        // (e.g. transactions). This will cause the initial aload of the transaction to fail, but,
2291        // if we erroneously treat this load failure as the transaction being missing, we will
2292        // succeed in calling `fetch`, since subsequent loads succeed. However, since a transaction
2293        // is not active-fetchable, no active fetch will actually be spawned, and this fetch will
2294        // never resolve.
2295        //
2296        // Thus, the test should only pass if we correctly retry the initial load until it succeeds
2297        // and we discover that the transaction doesn't need to be fetched at all.
2298        tracing::info!("fetch with read failure");
2299        data_source
2300            .as_ref()
2301            .fail_one_read(FailableAction::Any)
2302            .await;
2303        let fetch = data_source
2304            .get_block_containing_transaction(tx.transaction.hash())
2305            .await;
2306
2307        assert_eq!(tx, fetch.await);
2308    }
2309
2310    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2311    async fn test_stream_begin_failure() {
2312        // Create the consensus network.
2313        let mut network = MockNetwork::<MockDataSource>::init().await;
2314
2315        // Start a web server that the non-consensus node can use to fetch blocks.
2316        let port = reserve_tcp_port().unwrap();
2317        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2318        app.register_module(
2319            "availability",
2320            define_api(
2321                &Default::default(),
2322                MockBase::instance(),
2323                "1.0.0".parse().unwrap(),
2324            )
2325            .unwrap(),
2326        )
2327        .unwrap();
2328        network.spawn(
2329            "server",
2330            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2331        );
2332
2333        // Start a data source which is not receiving events from consensus, only from a peer.
2334        let provider = Provider::new(QueryServiceProvider::new(
2335            format!("http://localhost:{port}").parse().unwrap(),
2336            MockBase::instance(),
2337        ));
2338        let db = TmpDb::init().await;
2339        let storage = FailStorage::from(
2340            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2341                .await
2342                .unwrap(),
2343        );
2344        let data_source = FetchingDataSource::builder(storage, provider)
2345            .disable_proactive_fetching()
2346            .disable_aggregator()
2347            .with_min_retry_interval(Duration::from_millis(100))
2348            .with_range_chunk_size(3)
2349            .build()
2350            .await
2351            .unwrap();
2352
2353        // Start consensus.
2354        network.start().await;
2355
2356        // Wait until a few blocks are produced.
2357        let leaves = network.data_source().subscribe_leaves(1).await;
2358        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2359
2360        // Send the last leaf to the disconnected data source so it learns about the height.
2361        let last_leaf = leaves.last().unwrap();
2362        let mut tx = data_source.write().await.unwrap();
2363        tx.insert_leaf(last_leaf).await.unwrap();
2364        tx.commit().await.unwrap();
2365
2366        // Stream the leaves; it should retry until it is able to determine each leaf is fetchable
2367        // and trigger the fetch.
2368        tracing::info!("stream with transaction failure");
2369        data_source
2370            .as_ref()
2371            .fail_one_begin_read_only(FailableAction::Any)
2372            .await;
2373        assert_eq!(
2374            leaves,
2375            data_source
2376                .subscribe_leaves(1)
2377                .await
2378                .take(5)
2379                .collect::<Vec<_>>()
2380                .await
2381        );
2382    }
2383
2384    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2385    async fn test_stream_load_failure() {
2386        // Create the consensus network.
2387        let mut network = MockNetwork::<MockDataSource>::init().await;
2388
2389        // Start a web server that the non-consensus node can use to fetch blocks.
2390        let port = reserve_tcp_port().unwrap();
2391        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2392        app.register_module(
2393            "availability",
2394            define_api(
2395                &Default::default(),
2396                MockBase::instance(),
2397                "1.0.0".parse().unwrap(),
2398            )
2399            .unwrap(),
2400        )
2401        .unwrap();
2402        network.spawn(
2403            "server",
2404            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2405        );
2406
2407        // Start a data source which is not receiving events from consensus, only from a peer.
2408        let provider = Provider::new(QueryServiceProvider::new(
2409            format!("http://localhost:{port}").parse().unwrap(),
2410            MockBase::instance(),
2411        ));
2412        let db = TmpDb::init().await;
2413        let storage = FailStorage::from(
2414            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2415                .await
2416                .unwrap(),
2417        );
2418        let data_source = FetchingDataSource::builder(storage, provider)
2419            .disable_proactive_fetching()
2420            .disable_aggregator()
2421            .with_min_retry_interval(Duration::from_millis(100))
2422            .with_range_chunk_size(3)
2423            .build()
2424            .await
2425            .unwrap();
2426
2427        // Start consensus.
2428        network.start().await;
2429
2430        // Wait until a few blocks are produced.
2431        let leaves = network.data_source().subscribe_leaves(1).await;
2432        let leaves = leaves.take(5).collect::<Vec<_>>().await;
2433
2434        // Send the last leaf to the disconnected data source, so the blocks becomes fetchable.
2435        let last_leaf = leaves.last().unwrap();
2436        let mut tx = data_source.write().await.unwrap();
2437        tx.insert_leaf(last_leaf).await.unwrap();
2438        tx.commit().await.unwrap();
2439
2440        // Stream the blocks with a period of database failures.
2441        tracing::info!("stream with read failure");
2442        data_source.as_ref().fail_reads(FailableAction::Any).await;
2443        let fetches = data_source
2444            .get_block_range(1..=5)
2445            .await
2446            .collect::<Vec<_>>()
2447            .await;
2448
2449        // Give some time for a few reads to fail before letting them succeed.
2450        sleep(Duration::from_secs(2)).await;
2451        data_source.as_ref().pass().await;
2452
2453        for (leaf, fetch) in leaves.iter().zip(fetches) {
2454            let block: BlockQueryData<MockTypes> = fetch.await;
2455            assert_eq!(block.hash(), leaf.block_hash());
2456        }
2457    }
2458
2459    enum MetadataType {
2460        Payload,
2461        Vid,
2462    }
2463
2464    async fn test_metadata_stream_begin_failure_helper(stream: MetadataType) {
2465        // Create the consensus network.
2466        let mut network = MockNetwork::<MockDataSource>::init().await;
2467
2468        // Start a web server that the non-consensus node can use to fetch blocks.
2469        let port = reserve_tcp_port().unwrap();
2470        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2471        app.register_module(
2472            "availability",
2473            define_api(
2474                &Default::default(),
2475                MockBase::instance(),
2476                "1.0.0".parse().unwrap(),
2477            )
2478            .unwrap(),
2479        )
2480        .unwrap();
2481        network.spawn(
2482            "server",
2483            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2484        );
2485
2486        // Start a data source which is not receiving events from consensus, only from a peer.
2487        let provider = Provider::new(QueryServiceProvider::new(
2488            format!("http://localhost:{port}").parse().unwrap(),
2489            MockBase::instance(),
2490        ));
2491        let db = TmpDb::init().await;
2492        let storage = FailStorage::from(
2493            SqlStorage::connect(db.config(), StorageConnectionType::Query)
2494                .await
2495                .unwrap(),
2496        );
2497        let data_source = FetchingDataSource::builder(storage, provider)
2498            .disable_proactive_fetching()
2499            .disable_aggregator()
2500            .with_min_retry_interval(Duration::from_millis(100))
2501            .with_range_chunk_size(3)
2502            .build()
2503            .await
2504            .unwrap();
2505
2506        // Start consensus.
2507        network.start().await;
2508
2509        // Wait until a few blocks are produced.
2510        let leaves = network.data_source().subscribe_leaves(1).await;
2511        let leaves = leaves.take(3).collect::<Vec<_>>().await;
2512
2513        // Send the last leaf to the disconnected data source, so the blocks becomes fetchable.
2514        let last_leaf = leaves.last().unwrap();
2515        let mut tx = data_source.write().await.unwrap();
2516        tx.insert_leaf(last_leaf).await.unwrap();
2517        tx.commit().await.unwrap();
2518
2519        // Send the first object to the disconnected data source, so we hit all the cases:
2520        // * leaf present but not full object (from the last leaf)
2521        // * full object present but inaccessible due to storage failures (first object)
2522        // * nothing present (middle object)
2523        let leaf = network.data_source().get_leaf(1).await.await;
2524        let block = network.data_source().get_block(1).await.await;
2525        let vid = network.data_source().get_vid_common(1).await.await;
2526        data_source
2527            .append(BlockInfo::new(leaf, Some(block), Some(vid), None))
2528            .await
2529            .unwrap();
2530
2531        // Stream the objects with a period of database failures.
2532        tracing::info!("stream with transaction failure");
2533        data_source
2534            .as_ref()
2535            .fail_begins_read_only(FailableAction::Any)
2536            .await;
2537        match stream {
2538            MetadataType::Payload => {
2539                let payloads = data_source.subscribe_payload_metadata(1).await.take(3);
2540
2541                // Give some time for a few reads to fail before letting them succeed.
2542                sleep(Duration::from_secs(2)).await;
2543                tracing::info!("stop failing transactions");
2544                data_source.as_ref().pass().await;
2545
2546                let payloads = payloads.collect::<Vec<_>>().await;
2547                for (leaf, payload) in leaves.iter().zip(payloads) {
2548                    assert_eq!(payload.block_hash, leaf.block_hash());
2549                }
2550            },
2551            MetadataType::Vid => {
2552                let vids = data_source.subscribe_vid_common_metadata(1).await.take(3);
2553
2554                // Give some time for a few reads to fail before letting them succeed.
2555                sleep(Duration::from_secs(2)).await;
2556                tracing::info!("stop failing transactions");
2557                data_source.as_ref().pass().await;
2558
2559                let vids = vids.collect::<Vec<_>>().await;
2560                for (leaf, vid) in leaves.iter().zip(vids) {
2561                    assert_eq!(vid.block_hash, leaf.block_hash());
2562                }
2563            },
2564        }
2565    }
2566
2567    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2568    async fn test_metadata_stream_begin_failure_payload() {
2569        test_metadata_stream_begin_failure_helper(MetadataType::Payload).await
2570    }
2571
2572    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2573    async fn test_metadata_stream_begin_failure_vid() {
2574        test_metadata_stream_begin_failure_helper(MetadataType::Vid).await
2575    }
2576
2577    #[tokio::test(flavor = "multi_thread")]
2578    #[test_log::test]
2579    async fn test_ranged_fetch() {
2580        // Create the consensus network.
2581        let mut network = MockNetwork::<MockDataSource>::init().await;
2582
2583        // Start a web server that the non-consensus node can use to fetch blocks.
2584        let port = reserve_tcp_port().unwrap();
2585        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2586        app.register_module(
2587            "availability",
2588            define_api(
2589                &Default::default(),
2590                MockBase::instance(),
2591                "1.0.0".parse().unwrap(),
2592            )
2593            .unwrap(),
2594        )
2595        .unwrap();
2596        network.spawn(
2597            "server",
2598            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2599        );
2600
2601        // Start consensus.
2602        network.start().await;
2603
2604        // Wait for a few blocks to be produced.
2605        let leaves = network
2606            .data_source()
2607            .subscribe_leaves(0)
2608            .await
2609            .take(5)
2610            .collect::<Vec<_>>()
2611            .await;
2612        let blocks = network
2613            .data_source()
2614            .subscribe_blocks(0)
2615            .await
2616            .take(5)
2617            .collect::<Vec<_>>()
2618            .await;
2619        let vid = network
2620            .data_source()
2621            .subscribe_vid_common(0)
2622            .await
2623            .take(5)
2624            .collect::<Vec<_>>()
2625            .await;
2626
2627        // Connect a fetching provider.
2628        let provider = QueryServiceProvider::new(
2629            format!("http://localhost:{port}").parse().unwrap(),
2630            MockBase::instance(),
2631        );
2632
2633        // Make ranged requests.
2634        tracing::info!("fetch leaf range");
2635        assert_eq!(
2636            provider
2637                .fetch(LeafRangeRequest {
2638                    start: 0,
2639                    end: 5,
2640                    last_leaf: leaves[4].hash(),
2641                    last_qc: leaves[4].qc().commit(),
2642                })
2643                .await
2644                .unwrap(),
2645            leaves
2646        );
2647        let headers = NonEmptyRange::new(leaves.iter().map(|leaf| leaf.header().clone())).unwrap();
2648        tracing::info!(?headers, "fetch block range");
2649        assert_eq!(
2650            ProviderTrait::<MockTypes, _>::fetch(
2651                &provider,
2652                BlockRangeRequest::from(RangeRequest::from_headers::<MockTypes>(&headers))
2653            )
2654            .await
2655            .unwrap(),
2656            blocks
2657        );
2658        tracing::info!(?headers, "fetch VID common range");
2659        assert_eq!(
2660            ProviderTrait::<MockTypes, _>::fetch(
2661                &provider,
2662                VidCommonRangeRequest::from(RangeRequest::from_headers::<MockTypes>(&headers))
2663            )
2664            .await
2665            .unwrap(),
2666            vid
2667        );
2668    }
2669
2670    /// A test server which does not support ranged VID common requests.
2671    async fn old_server(port: u16) {
2672        let mut api = Api::<(), availability::Error, StaticVersion<1, 0>>::new(toml! {
2673            [route.get_vid_common]
2674            PATH = ["vid/common/:height"]
2675            ":height" = "Integer"
2676        })
2677        .unwrap();
2678
2679        api.get("get_vid_common", move |req, _| {
2680            async move {
2681                let mut common = VidCommonQueryData::<MockTypes>::genesis(
2682                    &Default::default(),
2683                    &Default::default(),
2684                    TEST_VERSIONS.test.base,
2685                )
2686                .await;
2687                common.height = req.integer_param("height")?;
2688                Ok(common)
2689            }
2690            .boxed()
2691        })
2692        .unwrap();
2693
2694        let mut app = App::<(), Error>::with_state(());
2695        app.register_module("availability", api).unwrap();
2696        app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
2697            .await
2698            .ok();
2699    }
2700
2701    #[tokio::test]
2702    #[test_log::test]
2703    async fn test_vid_common_fallback() {
2704        let port = reserve_tcp_port().unwrap();
2705        let _server = BackgroundTask::spawn("old server", old_server(port));
2706        let provider = QueryServiceProvider::new(
2707            format!("http://localhost:{port}").parse().unwrap(),
2708            StaticVersion::<1, 0>::instance(),
2709        );
2710
2711        // First fetch a range of VID common one by one, to get a ground truth.
2712        let common = try_join_all((0..5).map(|i| {
2713            provider
2714                .client
2715                .get::<VidCommonQueryData<MockTypes>>(&format!("availability/vid/common/{i}"))
2716                .send()
2717        }))
2718        .await
2719        .unwrap();
2720
2721        // Now fetch the whole thing as a range.
2722        let expected_hash =
2723            RangeRequest::hash_payloads(common.iter().map(|common| common.payload_hash));
2724        let req = RangeRequest {
2725            start: 0,
2726            end: 5,
2727            expected_hash,
2728        };
2729        assert_eq!(
2730            common.as_slice(),
2731            provider
2732                .fetch_vid_common_range(req.into())
2733                .await
2734                .unwrap()
2735                .as_ref()
2736        );
2737    }
2738}