1use std::fmt::Debug;
14
15use anyhow::{Context, ensure};
16use async_trait::async_trait;
17use committable::Committable;
18use futures::{TryFutureExt, future::try_join_all};
19use hotshot_types::{
20 data::{VidCommitment, VidCommon, ns_table},
21 traits::{EncodeBytes, node_implementation::NodeType},
22 vid::{
23 advz::{ADVZScheme, advz_scheme},
24 avidm::{AvidMScheme, init_avidm_param},
25 avidm_gf2::AvidmGf2Scheme,
26 },
27};
28use jf_advz::VidScheme;
29use surf_disco::{Client, Url};
30use vbs::version::StaticVersionType;
31
32use super::Provider;
33use crate::{
34 Error, Payload,
35 availability::{BlockQueryData, LeafQueryData, VidCommonQueryData},
36 fetching::{
37 NonEmptyRange,
38 request::{
39 BlockRangeRequest, LeafRangeRequest, LeafRequest, PayloadRequest, RangeRequest,
40 VidCommonRangeRequest, VidCommonRequest,
41 },
42 },
43 types::HeightIndexed,
44};
45
46#[derive(Clone, Debug)]
51pub struct QueryServiceProvider<Ver: StaticVersionType> {
52 client: Client<Error, Ver>,
53}
54
55impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
56 pub fn new(url: Url, _: Ver) -> Self {
57 Self {
58 client: Client::new(url),
59 }
60 }
61}
62
63impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
64 pub async fn fetch_payload<Types: NodeType>(
65 &self,
66 req: PayloadRequest,
67 ) -> anyhow::Result<Payload<Types>> {
68 let req_hash = req.0;
69
70 let block = self
74 .client
75 .get::<BlockQueryData<Types>>(&format!("availability/block/payload-hash/{req_hash}"))
76 .send()
77 .await
78 .context("fetching block")?;
79 let common = self
80 .client
81 .get::<VidCommonQueryData<Types>>(&format!(
82 "availability/vid/common/payload-hash/{req_hash}",
83 ))
84 .send()
85 .await
86 .context("fetching VID common")?;
87
88 let comm =
89 recompute_payload_commitment(&block, &common).context("computing VID commitment")?;
90 ensure!(
91 comm == req_hash,
92 "VID commitment {comm} does not match request"
93 );
94
95 Ok(block.payload)
96 }
97
98 pub async fn fetch_payload_range<Types: NodeType>(
99 &self,
100 req: BlockRangeRequest,
101 ) -> anyhow::Result<NonEmptyRange<BlockQueryData<Types>>> {
102 let req = RangeRequest::from(req);
103
104 let blocks = self
108 .client
109 .get::<NonEmptyRange<BlockQueryData<Types>>>(&format!(
110 "availability/block/{}/{}",
111 req.start, req.end
112 ))
113 .send()
114 .await
115 .context("fetching blocks")?;
116 let common = self
117 .fetch_vid_common_range_with_fallback(req.start, req.end)
118 .await?;
119
120 ensure!(
121 blocks.start() == req.start && blocks.end() == req.end,
122 "wrong block range ({}..{})",
123 blocks.start(),
124 blocks.end()
125 );
126 ensure!(
127 common.start() == req.start && common.end() == req.end,
128 "wrong VID common range (expected {}..{})",
129 common.start(),
130 common.end()
131 );
132
133 let commits = blocks
134 .iter()
135 .zip(&common)
136 .map(|(block, common)| recompute_payload_commitment(block, common))
137 .collect::<Result<Vec<VidCommitment>, _>>()
138 .context("computing VID commitments")?;
139 let hash = RangeRequest::hash_payloads(commits.iter().copied());
140 ensure!(
141 hash == req.expected_hash,
142 "server returned blocks with wrong payload hash ({hash}, commits {commits:?})",
143 );
144
145 Ok(blocks)
146 }
147
148 fn handle_result<R: Debug, T>(&self, req: R, res: anyhow::Result<T>) -> Option<T> {
149 match res {
150 Ok(res) => Some(res),
151 Err(err) => {
152 tracing::warn!(upstream = %self.client.base_url(), ?req, "failed to fetch: {err:#}");
153 None
154 },
155 }
156 }
157}
158
159#[async_trait]
160impl<Types, Ver: StaticVersionType> Provider<Types, PayloadRequest> for QueryServiceProvider<Ver>
161where
162 Types: NodeType,
163{
164 async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
165 self.handle_result(req, self.fetch_payload::<Types>(req).await)
166 }
167}
168
169#[async_trait]
170impl<Types, Ver: StaticVersionType> Provider<Types, BlockRangeRequest> for QueryServiceProvider<Ver>
171where
172 Types: NodeType,
173{
174 async fn fetch(&self, req: BlockRangeRequest) -> Option<NonEmptyRange<BlockQueryData<Types>>> {
175 self.handle_result(req, self.fetch_payload_range::<Types>(req).await)
176 }
177}
178
179fn recompute_payload_commitment<Types>(
180 block: &BlockQueryData<Types>,
181 common: &VidCommonQueryData<Types>,
182) -> anyhow::Result<VidCommitment>
183where
184 Types: NodeType,
185{
186 match common.common() {
187 VidCommon::V0(common) => {
188 let num_storage_nodes = ADVZScheme::get_num_storage_nodes(common) as usize;
189 let bytes = block.payload().encode();
190 advz_scheme(num_storage_nodes)
191 .commit_only(bytes)
192 .map(VidCommitment::V0)
193 .context("failed to compute VID commitment (V0)")
194 },
195 VidCommon::V1(common) => {
196 let bytes = block.payload().encode();
197 let avidm_param = init_avidm_param(common.total_weights).context(format!(
198 "failed to initialize AVIDM params. total_weight={}",
199 common.total_weights
200 ))?;
201 let metadata = block.metadata().encode();
202 AvidMScheme::commit(
203 &avidm_param,
204 &bytes,
205 ns_table::parse_ns_table(bytes.len(), &metadata),
206 )
207 .map(VidCommitment::V1)
208 .map_err(anyhow::Error::msg)
209 .context("failed to compute AVIDM commitment")
210 },
211 VidCommon::V2(common) => {
212 let bytes = block.payload().encode();
213 let metadata = block.metadata().encode();
214 AvidmGf2Scheme::commit(
215 &common.param,
216 &bytes,
217 ns_table::parse_ns_table(bytes.len(), &metadata),
218 )
219 .map(|(commit, _)| VidCommitment::V2(commit))
220 .map_err(anyhow::Error::msg)
221 .context("failed to compute AvidmGf2 commitment")
222 },
223 }
224}
225
226impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
227 pub async fn fetch_leaf<Types: NodeType>(
228 &self,
229 req: LeafRequest<Types>,
230 ) -> anyhow::Result<LeafQueryData<Types>> {
231 let leaf = self
232 .client
233 .get::<LeafQueryData<Types>>(&format!("availability/leaf/{}", req.height))
234 .send()
235 .await
236 .context("fetching leaf")?;
237
238 ensure!(
239 leaf.height() == req.height,
240 "received leaf with the wrong height ({})",
241 leaf.height(),
242 );
243 ensure!(
244 leaf.hash() == req.expected_leaf,
245 "received leaf with the wrong hash ({})",
246 leaf.hash()
247 );
248 ensure!(
249 leaf.qc().commit() == req.expected_qc,
250 "received leaf with the wrong QC ({})",
251 leaf.qc().commit()
252 );
253
254 Ok(leaf)
255 }
256
257 pub async fn fetch_leaf_range<Types: NodeType>(
258 &self,
259 req: LeafRangeRequest<Types>,
260 ) -> anyhow::Result<NonEmptyRange<LeafQueryData<Types>>> {
261 let leaves = self
262 .client
263 .get::<NonEmptyRange<LeafQueryData<Types>>>(&format!(
264 "availability/leaf/{}/{}",
265 req.start, req.end
266 ))
267 .send()
268 .await
269 .context("fetching leaf chain")?;
270
271 ensure!(
272 leaves.start() == req.start && leaves.end() == req.end,
273 "server returned wrong range of leaves ({}..{})",
274 leaves.start(),
275 leaves.end()
276 );
277
278 let mut expected_leaf = req.last_leaf;
280 let mut expected_qc = req.last_qc;
281 for leaf in leaves.iter().rev() {
282 let leaf_hash = leaf.hash();
283 let qc_hash = leaf.qc().commit();
284 ensure!(
285 leaf_hash == expected_leaf,
286 "received leaf {} with wrong hash {leaf_hash}",
287 leaf.height(),
288 );
289 ensure!(
290 qc_hash == expected_qc,
291 "received leaf {} with wrong QC {qc_hash}",
292 leaf.height()
293 );
294 expected_leaf = leaf.leaf().parent_commitment();
295 expected_qc = leaf.leaf().justify_qc().commit();
296 }
297
298 Ok(leaves)
299 }
300}
301
302#[async_trait]
303impl<Types, Ver: StaticVersionType> Provider<Types, LeafRequest<Types>>
304 for QueryServiceProvider<Ver>
305where
306 Types: NodeType,
307{
308 async fn fetch(&self, req: LeafRequest<Types>) -> Option<LeafQueryData<Types>> {
309 self.handle_result(req, self.fetch_leaf(req).await)
310 }
311}
312
313#[async_trait]
314impl<Types, Ver: StaticVersionType> Provider<Types, LeafRangeRequest<Types>>
315 for QueryServiceProvider<Ver>
316where
317 Types: NodeType,
318{
319 async fn fetch(
320 &self,
321 req: LeafRangeRequest<Types>,
322 ) -> Option<NonEmptyRange<LeafQueryData<Types>>> {
323 self.handle_result(req, self.fetch_leaf_range(req).await)
324 }
325}
326
327impl<Ver: StaticVersionType> QueryServiceProvider<Ver> {
328 pub async fn fetch_vid_common<Types: NodeType>(
329 &self,
330 req: VidCommonRequest,
331 ) -> anyhow::Result<VidCommon> {
332 let res = self
333 .client
334 .get::<VidCommonQueryData<Types>>(&format!(
335 "availability/vid/common/payload-hash/{}",
336 req.0
337 ))
338 .send()
339 .await
340 .context("fetching VID common")?;
341
342 ensure!(
343 res.common().is_consistent(&req.0),
344 "inconsistent VID common data {:?}",
345 res.common,
346 );
347 Ok(res.common)
348 }
349
350 async fn fetch_vid_common_range_with_fallback<Types: NodeType>(
351 &self,
352 start: u64,
353 end: u64,
354 ) -> anyhow::Result<NonEmptyRange<VidCommonQueryData<Types>>> {
355 let res = self
356 .client
357 .get::<NonEmptyRange<VidCommonQueryData<Types>>>(&format!(
358 "availability/vid/common/{start}/{end}",
359 ))
360 .send()
361 .await;
362 match res {
363 Ok(common) => Ok(common),
364 Err(Error::Custom { message, .. }) if message.contains("No route matches") => {
365 tracing::info!(
368 start,
369 end,
370 "server does not support ranged VID fetching, falling back to individual \
371 fetches"
372 );
373 let common = try_join_all((start..end).map(|i| {
374 self.client
375 .get::<VidCommonQueryData<Types>>(&format!("availability/vid/common/{i}"))
376 .send()
377 .map_err(move |err| {
378 anyhow::Error::new(err).context(format!("fetching VID common {i}"))
379 })
380 }))
381 .await?;
382 NonEmptyRange::new(common)
383 .context("converting individually fetched VID common into range")
384 },
385 Err(err) => Err(err).context("fetching VID common range"),
386 }
387 }
388
389 pub async fn fetch_vid_common_range<Types: NodeType>(
390 &self,
391 req: VidCommonRangeRequest,
392 ) -> anyhow::Result<NonEmptyRange<VidCommonQueryData<Types>>> {
393 let req = RangeRequest::from(req);
394 let common = self
395 .fetch_vid_common_range_with_fallback(req.start, req.end)
396 .await?;
397
398 ensure!(
399 common.start() == req.start && common.end() == req.end,
400 "server returned wrong VID common ({}..{})",
401 common.start(),
402 common.end()
403 );
404
405 let commits = common
406 .iter()
407 .map(|common| {
408 ensure!(
410 common.common().is_consistent(&common.payload_hash()),
411 "server returned VID common with inconsistent commitment {common:?}"
412 );
413
414 Ok(common.payload_hash())
417 })
418 .collect::<anyhow::Result<Vec<_>>>()?;
419 let hash = RangeRequest::hash_payloads(commits.iter().copied());
420 ensure!(
421 hash == req.expected_hash,
422 "server returned wrong VID common (hash {hash}, commits {commits:?})"
423 );
424
425 Ok(common)
426 }
427}
428
429#[async_trait]
430impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRequest> for QueryServiceProvider<Ver>
431where
432 Types: NodeType,
433{
434 async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
435 self.handle_result(req, self.fetch_vid_common::<Types>(req).await)
436 }
437}
438
439#[async_trait]
440impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRangeRequest>
441 for QueryServiceProvider<Ver>
442where
443 Types: NodeType,
444{
445 async fn fetch(
446 &self,
447 req: VidCommonRangeRequest,
448 ) -> Option<NonEmptyRange<VidCommonQueryData<Types>>> {
449 self.handle_result(req, self.fetch_vid_common_range::<Types>(req).await)
450 }
451}
452
453#[cfg(all(test, not(target_os = "windows")))]
455mod test {
456 use std::{future::IntoFuture, time::Duration};
457
458 use committable::{Commitment, Committable};
459 use futures::{
460 future::{FutureExt, join},
461 stream::StreamExt,
462 };
463 #[allow(deprecated)]
466 use generic_array::GenericArray;
467 use hotshot_example_types::node_types::{EpochVersion, TEST_VERSIONS};
468 use rand::RngCore;
469 use test_utils::reserve_tcp_port;
470 use tide_disco::{Api, App, error::ServerError};
471 use toml::toml;
472 use vbs::version::StaticVersion;
473
474 use super::*;
475 use crate::{
476 ApiState,
477 api::load_api,
478 availability::{
479 self, AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction,
480 Fetch, UpdateAvailabilityData, define_api,
481 },
482 data_source::{
483 AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource,
484 sql::{self, SqlDataSource},
485 storage::{
486 AvailabilityStorage, SqlStorage, StorageConnectionType, UpdateAvailabilityStorage,
487 fail_storage::{FailStorage, FailableAction},
488 pruning::{PrunedHeightStorage, PrunerCfg},
489 sql::testing::TmpDb,
490 },
491 },
492 fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
493 node::data_source::NodeDataSource,
494 task::BackgroundTask,
495 testing::{
496 consensus::{MockDataSource, MockNetwork},
497 mocks::{MockBase, MockTypes, mock_transaction},
498 sleep,
499 },
500 types::HeightIndexed,
501 };
502
503 type Provider = TestProvider<QueryServiceProvider<MockBase>>;
504 type EpochProvider = TestProvider<QueryServiceProvider<EpochVersion>>;
505
506 fn ignore<T>(_: T) {}
507
508 async fn builder<P: AvailabilityProvider<MockTypes> + Clone>(
510 db: &TmpDb,
511 provider: &P,
512 ) -> sql::Builder<MockTypes, P> {
513 db.config()
514 .builder((*provider).clone())
515 .await
516 .unwrap()
517 .disable_proactive_fetching()
520 }
521
522 async fn data_source<P: AvailabilityProvider<MockTypes> + Clone>(
524 db: &TmpDb,
525 provider: &P,
526 ) -> SqlDataSource<MockTypes, P> {
527 builder(db, provider).await.build().await.unwrap()
528 }
529
530 #[test_log::test(tokio::test(flavor = "multi_thread"))]
531 async fn test_fetch_on_request() {
532 let mut network = MockNetwork::<MockDataSource>::init().await;
534
535 let port = reserve_tcp_port().unwrap();
537 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
538 app.register_module(
539 "availability",
540 define_api(
541 &Default::default(),
542 MockBase::instance(),
543 "1.0.0".parse().unwrap(),
544 )
545 .unwrap(),
546 )
547 .unwrap();
548 network.spawn(
549 "server",
550 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
551 );
552
553 let db = TmpDb::init().await;
555 let provider = Provider::new(QueryServiceProvider::new(
556 format!("http://localhost:{port}").parse().unwrap(),
557 MockBase::instance(),
558 ));
559 let data_source = data_source(&db, &provider).await;
560
561 network.start().await;
563
564 let leaves = network.data_source().subscribe_leaves(1).await;
571 let leaves = leaves.take(5).collect::<Vec<_>>().await;
572 let test_leaf = &leaves[0];
573 let test_block = &leaves[1];
574 let test_payload = &leaves[2];
575 let test_common = &leaves[3];
576
577 tracing::info!("requesting unfetchable resources");
579 let mut fetches = vec![];
580 fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
582 fetches.push(
584 data_source
585 .get_leaf(test_leaf.height() as usize)
586 .await
587 .map(ignore),
588 );
589 fetches.push(
591 data_source
592 .get_block(test_block.block_hash())
593 .await
594 .map(ignore),
595 );
596 fetches.push(
597 data_source
598 .get_payload(test_payload.block_hash())
599 .await
600 .map(ignore),
601 );
602 fetches.push(
603 data_source
604 .get_vid_common(test_common.block_hash())
605 .await
606 .map(ignore),
607 );
608 fetches.push(
610 data_source
611 .get_block(test_block.height() as usize)
612 .await
613 .map(ignore),
614 );
615 fetches.push(
616 data_source
617 .get_payload(test_payload.height() as usize)
618 .await
619 .map(ignore),
620 );
621 fetches.push(
622 data_source
623 .get_vid_common(test_common.height() as usize)
624 .await
625 .map(ignore),
626 );
627 fetches.push(data_source.get_vid_common(0).await.map(ignore));
629 fetches.push(
631 data_source
632 .get_block_containing_transaction(mock_transaction(vec![]).commit())
633 .await
634 .map(ignore),
635 );
636
637 sleep(Duration::from_secs(1)).await;
640 for (i, fetch) in fetches.into_iter().enumerate() {
641 tracing::info!("checking fetch {i} is unresolved");
642 fetch.try_resolve().unwrap_err();
643 }
644
645 provider.block().await;
650 data_source
651 .append(leaves.last().cloned().unwrap().into())
652 .await
653 .unwrap();
654
655 tracing::info!("requesting fetchable resources");
656 let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
657 let req_block = data_source.get_block(test_block.height() as usize).await;
658 let req_payload = data_source
659 .get_payload(test_payload.height() as usize)
660 .await;
661 let req_common = data_source
662 .get_vid_common(test_common.height() as usize)
663 .await;
664
665 sleep(Duration::from_secs(1)).await;
671 req_leaf.try_resolve().unwrap_err();
672 req_block.try_resolve().unwrap_err();
673 req_payload.try_resolve().unwrap_err();
674 req_common.try_resolve().unwrap_err();
675
676 provider.unblock().await;
678 let leaf = data_source
679 .get_leaf(test_leaf.height() as usize)
680 .await
681 .await;
682 let block = data_source
683 .get_block(test_block.height() as usize)
684 .await
685 .await;
686 let payload = data_source
687 .get_payload(test_payload.height() as usize)
688 .await
689 .await;
690 let common = data_source
691 .get_vid_common(test_common.height() as usize)
692 .await
693 .await;
694 {
695 let truth = network.data_source();
697 assert_eq!(
698 leaf,
699 truth.get_leaf(test_leaf.height() as usize).await.await
700 );
701 assert_eq!(
702 block,
703 truth.get_block(test_block.height() as usize).await.await
704 );
705 assert_eq!(
706 payload,
707 truth
708 .get_payload(test_payload.height() as usize)
709 .await
710 .await
711 );
712 assert_eq!(
713 common,
714 truth
715 .get_vid_common(test_common.height() as usize)
716 .await
717 .await
718 );
719 }
720
721 provider.block().await;
726 for leaf in [test_block, test_payload] {
727 tracing::info!("fetching existing leaf {}", leaf.height());
728 let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
729 assert_eq!(*leaf, fetched_leaf);
730 }
731
732 tracing::info!("fetching block by hash");
737 provider.unblock().await;
738 {
739 let block = data_source.get_block(test_leaf.block_hash()).await.await;
740 assert_eq!(block.hash(), leaf.block_hash());
741 }
742
743 tracing::info!("fetching payload by hash");
747 {
748 let leaf = leaves.last().unwrap();
749 let payload = data_source.get_payload(leaf.block_hash()).await.await;
750 assert_eq!(payload.height(), leaf.height());
751 assert_eq!(payload.block_hash(), leaf.block_hash());
752 assert_eq!(payload.hash(), leaf.payload_hash());
753 }
754 }
755
756 #[tokio::test(flavor = "multi_thread")]
757 async fn test_fetch_on_request_epoch_version() {
758 tracing::info!("Starting test_fetch_on_request_epoch_version");
761
762 let mut network = MockNetwork::<MockDataSource>::init().await;
764
765 let port = reserve_tcp_port().unwrap();
767 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
768 app.register_module(
769 "availability",
770 define_api(
771 &Default::default(),
772 EpochVersion::instance(),
773 "1.0.0".parse().unwrap(),
774 )
775 .unwrap(),
776 )
777 .unwrap();
778 network.spawn(
779 "server",
780 app.serve(format!("0.0.0.0:{port}"), EpochVersion::instance()),
781 );
782
783 let db = TmpDb::init().await;
786 let provider = EpochProvider::new(QueryServiceProvider::new(
787 format!("http://localhost:{port}").parse().unwrap(),
788 EpochVersion::instance(),
789 ));
790 let data_source = data_source(&db, &provider).await;
791
792 network.start().await;
794
795 let leaves = network.data_source().subscribe_leaves(1).await;
802 let leaves = leaves.take(5).collect::<Vec<_>>().await;
803 let test_leaf = &leaves[0];
804 let test_block = &leaves[1];
805 let test_payload = &leaves[2];
806 let test_common = &leaves[3];
807
808 let mut fetches = vec![];
810 fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
812 fetches.push(
814 data_source
815 .get_leaf(test_leaf.height() as usize)
816 .await
817 .map(ignore),
818 );
819 fetches.push(
821 data_source
822 .get_block(test_block.block_hash())
823 .await
824 .map(ignore),
825 );
826 fetches.push(
827 data_source
828 .get_payload(test_payload.block_hash())
829 .await
830 .map(ignore),
831 );
832 fetches.push(
833 data_source
834 .get_vid_common(test_common.block_hash())
835 .await
836 .map(ignore),
837 );
838 fetches.push(
840 data_source
841 .get_block(test_block.height() as usize)
842 .await
843 .map(ignore),
844 );
845 fetches.push(
846 data_source
847 .get_payload(test_payload.height() as usize)
848 .await
849 .map(ignore),
850 );
851 fetches.push(
852 data_source
853 .get_vid_common(test_common.height() as usize)
854 .await
855 .map(ignore),
856 );
857 fetches.push(data_source.get_vid_common(0).await.map(ignore));
859 fetches.push(
861 data_source
862 .get_block_containing_transaction(mock_transaction(vec![]).commit())
863 .await
864 .map(ignore),
865 );
866
867 sleep(Duration::from_secs(1)).await;
870 for (i, fetch) in fetches.into_iter().enumerate() {
871 tracing::info!("checking fetch {i} is unresolved");
872 fetch.try_resolve().unwrap_err();
873 }
874
875 provider.block().await;
880 data_source
881 .append(leaves.last().cloned().unwrap().into())
882 .await
883 .unwrap();
884
885 let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
886 let req_block = data_source.get_block(test_block.height() as usize).await;
887 let req_payload = data_source
888 .get_payload(test_payload.height() as usize)
889 .await;
890 let req_common = data_source
891 .get_vid_common(test_common.height() as usize)
892 .await;
893
894 sleep(Duration::from_secs(1)).await;
900 req_leaf.try_resolve().unwrap_err();
901 req_block.try_resolve().unwrap_err();
902 req_payload.try_resolve().unwrap_err();
903 req_common.try_resolve().unwrap_err();
904
905 provider.unblock().await;
907 let leaf = data_source
908 .get_leaf(test_leaf.height() as usize)
909 .await
910 .await;
911 let block = data_source
912 .get_block(test_block.height() as usize)
913 .await
914 .await;
915 let payload = data_source
916 .get_payload(test_payload.height() as usize)
917 .await
918 .await;
919 let common = data_source
920 .get_vid_common(test_common.height() as usize)
921 .await
922 .await;
923 {
924 let truth = network.data_source();
926 assert_eq!(
927 leaf,
928 truth.get_leaf(test_leaf.height() as usize).await.await
929 );
930 assert_eq!(
931 block,
932 truth.get_block(test_block.height() as usize).await.await
933 );
934 assert_eq!(
935 payload,
936 truth
937 .get_payload(test_payload.height() as usize)
938 .await
939 .await
940 );
941 assert_eq!(
942 common,
943 truth
944 .get_vid_common(test_common.height() as usize)
945 .await
946 .await
947 );
948 }
949
950 provider.block().await;
955 for leaf in [test_block, test_payload] {
956 tracing::info!("fetching existing leaf {}", leaf.height());
957 let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
958 assert_eq!(*leaf, fetched_leaf);
959 }
960
961 provider.unblock().await;
966 {
967 let block = data_source.get_block(test_leaf.block_hash()).await.await;
968 assert_eq!(block.hash(), leaf.block_hash());
969 }
970
971 {
975 let leaf = leaves.last().unwrap();
976 let payload = data_source.get_payload(leaf.block_hash()).await.await;
977 assert_eq!(payload.height(), leaf.height());
978 assert_eq!(payload.block_hash(), leaf.block_hash());
979 assert_eq!(payload.hash(), leaf.payload_hash());
980 }
981
982 tracing::info!("Test completed successfully!");
984 }
985
986 #[test_log::test(tokio::test(flavor = "multi_thread"))]
987 async fn test_fetch_block_and_leaf_concurrently() {
988 let mut network = MockNetwork::<MockDataSource>::init().await;
990
991 let port = reserve_tcp_port().unwrap();
993 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
994 app.register_module(
995 "availability",
996 define_api(
997 &Default::default(),
998 MockBase::instance(),
999 "1.0.0".parse().unwrap(),
1000 )
1001 .unwrap(),
1002 )
1003 .unwrap();
1004 network.spawn(
1005 "server",
1006 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1007 );
1008
1009 let db = TmpDb::init().await;
1011 let provider = Provider::new(QueryServiceProvider::new(
1012 format!("http://localhost:{port}").parse().unwrap(),
1013 MockBase::instance(),
1014 ));
1015 let data_source = data_source(&db, &provider).await;
1016
1017 network.start().await;
1019
1020 let leaves = network.data_source().subscribe_leaves(1).await;
1023 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1024 let test_leaf = &leaves[0];
1025
1026 data_source.append(leaves[1].clone().into()).await.unwrap();
1028
1029 let (leaf, block) = join(
1033 data_source
1034 .get_leaf(test_leaf.height() as usize)
1035 .await
1036 .into_future(),
1037 data_source
1038 .get_block(test_leaf.height() as usize)
1039 .await
1040 .into_future(),
1041 )
1042 .await;
1043 assert_eq!(leaf, *test_leaf);
1044 assert_eq!(leaf.header(), block.header());
1045 }
1046
1047 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1048 async fn test_fetch_different_blocks_same_payload() {
1049 let mut network = MockNetwork::<MockDataSource>::init().await;
1051
1052 let port = reserve_tcp_port().unwrap();
1054 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1055 app.register_module(
1056 "availability",
1057 define_api(
1058 &Default::default(),
1059 MockBase::instance(),
1060 "1.0.0".parse().unwrap(),
1061 )
1062 .unwrap(),
1063 )
1064 .unwrap();
1065 network.spawn(
1066 "server",
1067 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1068 );
1069
1070 let db = TmpDb::init().await;
1072 let provider = Provider::new(QueryServiceProvider::new(
1073 format!("http://localhost:{port}").parse().unwrap(),
1074 MockBase::instance(),
1075 ));
1076 let data_source = data_source(&db, &provider).await;
1077
1078 network.start().await;
1080
1081 let leaves = network.data_source().subscribe_leaves(1).await;
1084 let leaves = leaves.take(4).collect::<Vec<_>>().await;
1085
1086 data_source
1089 .append(leaves.last().cloned().unwrap().into())
1090 .await
1091 .unwrap();
1092
1093 assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash());
1095 let (block1, block2) = join(
1098 data_source
1099 .get_block(leaves[0].height() as usize)
1100 .await
1101 .into_future(),
1102 data_source
1103 .get_block(leaves[1].height() as usize)
1104 .await
1105 .into_future(),
1106 )
1107 .await;
1108 assert_eq!(block1.header(), leaves[0].header());
1109 assert_eq!(block2.header(), leaves[1].header());
1110 }
1111
1112 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1113 async fn test_fetch_stream() {
1114 let mut network = MockNetwork::<MockDataSource>::init().await;
1116
1117 let port = reserve_tcp_port().unwrap();
1119 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1120 app.register_module(
1121 "availability",
1122 define_api(
1123 &Default::default(),
1124 MockBase::instance(),
1125 "1.0.0".parse().unwrap(),
1126 )
1127 .unwrap(),
1128 )
1129 .unwrap();
1130 network.spawn(
1131 "server",
1132 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1133 );
1134
1135 let db = TmpDb::init().await;
1137 let provider = Provider::new(QueryServiceProvider::new(
1138 format!("http://localhost:{port}").parse().unwrap(),
1139 MockBase::instance(),
1140 ));
1141 let data_source = data_source(&db, &provider).await;
1142
1143 network.start().await;
1145
1146 let blocks = data_source.subscribe_blocks(0).await;
1148 let leaves = data_source.subscribe_leaves(0).await;
1149 let common = data_source.subscribe_vid_common(0).await;
1150
1151 let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1153 let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1154
1155 data_source
1158 .append(finalized_leaves.last().cloned().unwrap().into())
1159 .await
1160 .unwrap();
1161
1162 let blocks = blocks.take(5).collect::<Vec<_>>().await;
1164 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1165 let common = common.take(5).collect::<Vec<_>>().await;
1166 for i in 0..5 {
1167 tracing::info!("checking block {i}");
1168 assert_eq!(leaves[i], finalized_leaves[i]);
1169 assert_eq!(blocks[i].header(), finalized_leaves[i].header());
1170 assert_eq!(common[i], data_source.get_vid_common(i).await.await);
1171 }
1172 }
1173
1174 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1175 async fn test_fetch_range_start() {
1176 let mut network = MockNetwork::<MockDataSource>::init().await;
1178
1179 let port = reserve_tcp_port().unwrap();
1181 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1182 app.register_module(
1183 "availability",
1184 define_api(
1185 &Default::default(),
1186 MockBase::instance(),
1187 "1.0.0".parse().unwrap(),
1188 )
1189 .unwrap(),
1190 )
1191 .unwrap();
1192 network.spawn(
1193 "server",
1194 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1195 );
1196
1197 let db = TmpDb::init().await;
1199 let provider = Provider::new(QueryServiceProvider::new(
1200 format!("http://localhost:{port}").parse().unwrap(),
1201 MockBase::instance(),
1202 ));
1203 let data_source = data_source(&db, &provider).await;
1204
1205 network.start().await;
1207
1208 let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1210 let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1211
1212 let mut tx = data_source.write().await.unwrap();
1216 tx.insert_leaf(&finalized_leaves[2]).await.unwrap();
1217 tx.insert_leaf(&finalized_leaves[4]).await.unwrap();
1218 tx.commit().await.unwrap();
1219
1220 let leaves = data_source
1222 .get_leaf_range(..5)
1223 .await
1224 .then(Fetch::resolve)
1225 .collect::<Vec<_>>()
1226 .await;
1227 for i in 0..5 {
1228 tracing::info!("checking leaf {i}");
1229 assert_eq!(leaves[i], finalized_leaves[i]);
1230 }
1231 }
1232
1233 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1234 async fn fetch_transaction() {
1235 let mut network = MockNetwork::<MockDataSource>::init().await;
1237
1238 let port = reserve_tcp_port().unwrap();
1240 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1241 app.register_module(
1242 "availability",
1243 define_api(
1244 &Default::default(),
1245 MockBase::instance(),
1246 "1.0.0".parse().unwrap(),
1247 )
1248 .unwrap(),
1249 )
1250 .unwrap();
1251 network.spawn(
1252 "server",
1253 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1254 );
1255
1256 let db = TmpDb::init().await;
1259 let data_source = data_source(&db, &NoFetching).await;
1260
1261 let mut leaves = network.data_source().subscribe_leaves(1).await;
1263 let mut blocks = network.data_source().subscribe_blocks(1).await;
1264
1265 network.start().await;
1267
1268 let tx = mock_transaction(vec![1, 2, 3]);
1272 let fut = data_source
1273 .get_block_containing_transaction(tx.commit())
1274 .await;
1275
1276 network.submit_transaction(tx.clone()).await;
1278
1279 let block = loop {
1282 let leaf = leaves.next().await.unwrap();
1283 let block = blocks.next().await.unwrap();
1284
1285 data_source
1286 .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1287 .await
1288 .unwrap();
1289
1290 if block.transaction_by_hash(tx.commit()).is_some() {
1291 break block;
1292 }
1293 };
1294 tracing::info!("transaction included in block {}", block.height());
1295
1296 let fetched_tx = fut.await;
1297 assert_eq!(
1298 fetched_tx,
1299 BlockWithTransaction::with_hash(block, tx.commit()).unwrap()
1300 );
1301
1302 assert_eq!(
1304 fetched_tx,
1305 data_source
1306 .get_block_containing_transaction(tx.commit())
1307 .await
1308 .await
1309 );
1310 }
1311
1312 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1313 async fn test_retry() {
1314 let mut network = MockNetwork::<MockDataSource>::init().await;
1316
1317 let port = reserve_tcp_port().unwrap();
1319 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1320 app.register_module(
1321 "availability",
1322 define_api(
1323 &Default::default(),
1324 MockBase::instance(),
1325 "1.0.0".parse().unwrap(),
1326 )
1327 .unwrap(),
1328 )
1329 .unwrap();
1330 network.spawn(
1331 "server",
1332 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1333 );
1334
1335 let db = TmpDb::init().await;
1337 let provider = Provider::new(QueryServiceProvider::new(
1338 format!("http://localhost:{port}").parse().unwrap(),
1339 MockBase::instance(),
1340 ));
1341 let data_source = builder(&db, &provider)
1342 .await
1343 .with_max_retry_interval(Duration::from_secs(1))
1344 .build()
1345 .await
1346 .unwrap();
1347
1348 network.start().await;
1350
1351 let leaves = network.data_source().subscribe_leaves(1).await;
1354 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1355 let test_leaf = &leaves[0];
1356
1357 provider.fail();
1359
1360 data_source
1363 .append(leaves.last().cloned().unwrap().into())
1364 .await
1365 .unwrap();
1366
1367 tracing::info!("requesting leaf from failing providers");
1368 let fut = data_source.get_leaf(test_leaf.height() as usize).await;
1369
1370 sleep(Duration::from_secs(5)).await;
1373 fut.try_resolve().unwrap_err();
1374
1375 provider.unfail();
1377 assert_eq!(
1378 data_source
1379 .get_leaf(test_leaf.height() as usize)
1380 .await
1381 .await,
1382 *test_leaf
1383 );
1384 }
1385
1386 #[allow(deprecated)]
1388 fn random_vid_commit() -> VidCommitment {
1389 let mut bytes = [0; 32];
1390 rand::thread_rng().fill_bytes(&mut bytes);
1391 VidCommitment::V0(GenericArray::from(bytes).into())
1392 }
1393
1394 fn random_leaf_request() -> LeafRequest<MockTypes> {
1395 let mut bytes = [0; 32];
1396 rand::thread_rng().fill_bytes(&mut bytes);
1397 LeafRequest {
1398 height: 1,
1399 expected_leaf: Commitment::from_raw(bytes),
1400 expected_qc: Commitment::from_raw(bytes),
1401 }
1402 }
1403
1404 async fn malicious_server(port: u16) {
1405 let mut api = load_api::<(), ServerError, MockBase>(
1406 None::<std::path::PathBuf>,
1407 include_str!("../../../api/availability.toml"),
1408 vec![],
1409 )
1410 .unwrap();
1411
1412 api.get("get_leaf", move |req, _| {
1413 async move {
1414 let height = req.integer_param("height")?;
1415
1416 let mut leaf = LeafQueryData::<MockTypes>::genesis(
1418 &Default::default(),
1419 &Default::default(),
1420 TEST_VERSIONS.test,
1421 )
1422 .await;
1423 leaf.leaf.block_header_mut().block_number = height;
1424 leaf.qc.data.leaf_commit = leaf.hash();
1425 Ok(leaf)
1426 }
1427 .boxed()
1428 })
1429 .unwrap()
1430 .get("get_leaf_range", move |req, _| {
1431 async move {
1432 let start = req.integer_param("from")?;
1433 let end = req.integer_param("until")?;
1434
1435 let leaf = LeafQueryData::<MockTypes>::genesis(
1437 &Default::default(),
1438 &Default::default(),
1439 TEST_VERSIONS.test,
1440 )
1441 .await;
1442 let leaves = (start..end)
1443 .map(|i| {
1444 let mut leaf = leaf.clone();
1445 leaf.leaf.block_header_mut().block_number = i;
1446 leaf.qc.data.leaf_commit = leaf.hash();
1447 leaf
1448 })
1449 .collect::<Vec<_>>();
1450
1451 Ok(leaves)
1452 }
1453 .boxed()
1454 })
1455 .unwrap()
1456 .get("get_block", move |_, _| {
1457 async move {
1458 Ok(BlockQueryData::<MockTypes>::genesis(
1460 &Default::default(),
1461 &Default::default(),
1462 TEST_VERSIONS.test.base,
1463 )
1464 .await)
1465 }
1466 .boxed()
1467 })
1468 .unwrap()
1469 .get("get_block_range", move |req, _| {
1470 async move {
1471 let start = req.integer_param("from")?;
1472 let end = req.integer_param("until")?;
1473
1474 let block = BlockQueryData::<MockTypes>::genesis(
1476 &Default::default(),
1477 &Default::default(),
1478 TEST_VERSIONS.test.base,
1479 )
1480 .await;
1481 let blocks = (start..end)
1482 .map(|i| {
1483 let mut block = block.clone();
1484 block.header.block_number = i;
1485 block
1486 })
1487 .collect::<Vec<_>>();
1488
1489 Ok(blocks)
1490 }
1491 .boxed()
1492 })
1493 .unwrap()
1494 .get("get_vid_common", move |_, _| {
1495 async move {
1496 Ok(VidCommonQueryData::<MockTypes>::genesis(
1498 &Default::default(),
1499 &Default::default(),
1500 TEST_VERSIONS.test.base,
1501 )
1502 .await)
1503 }
1504 .boxed()
1505 })
1506 .unwrap()
1507 .get("get_vid_common_range", move |req, _| {
1508 async move {
1509 let start = req.integer_param("from")?;
1510 let end = req.integer_param("until")?;
1511
1512 let vid = VidCommonQueryData::<MockTypes>::genesis(
1514 &Default::default(),
1515 &Default::default(),
1516 TEST_VERSIONS.test.base,
1517 )
1518 .await;
1519 let vids = (start..end)
1520 .map(|i| {
1521 let mut vid = vid.clone();
1522 vid.height = i;
1523 vid
1524 })
1525 .collect::<Vec<_>>();
1526
1527 Ok(vids)
1528 }
1529 .boxed()
1530 })
1531 .unwrap();
1532
1533 let mut app = App::<(), ServerError>::with_state(());
1534 app.register_module("availability", api).unwrap();
1535 app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
1536 .await
1537 .ok();
1538 }
1539
1540 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1541 async fn test_fetch_from_malicious_server() {
1542 let port = reserve_tcp_port().unwrap();
1543 let _server = BackgroundTask::spawn("malicious server", malicious_server(port));
1544
1545 let provider = QueryServiceProvider::new(
1546 format!("http://localhost:{port}").parse().unwrap(),
1547 MockBase::instance(),
1548 );
1549 provider.client.connect(None).await;
1550
1551 tracing::info!("fetch leaf");
1554 let err = provider
1555 .fetch_leaf(random_leaf_request())
1556 .await
1557 .unwrap_err();
1558 assert!(err.to_string().contains("wrong hash"), "{err:#}");
1559
1560 tracing::info!("fetch leaf range");
1562 let req = random_leaf_request();
1563 let err = provider
1564 .fetch_leaf_range(LeafRangeRequest {
1565 start: 0,
1566 end: req.height + 1,
1567 last_leaf: req.expected_leaf,
1568 last_qc: req.expected_qc,
1569 })
1570 .await
1571 .unwrap_err();
1572 assert!(err.to_string().contains("wrong hash"), "{err:#}");
1573
1574 tracing::info!("fetch payload");
1577 let err = provider
1578 .fetch_payload::<MockTypes>(PayloadRequest(random_vid_commit()))
1579 .await
1580 .unwrap_err();
1581 assert!(
1582 err.to_string().contains("does not match request"),
1583 "{err:#}"
1584 );
1585
1586 tracing::info!("fetch payload range");
1588 let err = provider
1589 .fetch_payload_range::<MockTypes>(BlockRangeRequest::from(RangeRequest {
1590 start: 0,
1591 end: 2,
1592 expected_hash: RangeRequest::hash_payloads([
1593 random_vid_commit(),
1594 random_vid_commit(),
1595 ]),
1596 }))
1597 .await
1598 .unwrap_err();
1599 assert!(err.to_string().contains("wrong payload hash"), "{err:#}");
1600
1601 tracing::info!("fetch VID");
1604 let err = provider
1605 .fetch_vid_common::<MockTypes>(VidCommonRequest(random_vid_commit()))
1606 .await
1607 .unwrap_err();
1608 assert!(
1609 err.to_string().contains("inconsistent VID common"),
1610 "{err:#}"
1611 );
1612
1613 tracing::info!("fetch VID range");
1615 let err = provider
1616 .fetch_vid_common_range::<MockTypes>(VidCommonRangeRequest::from(RangeRequest {
1617 start: 0,
1618 end: 2,
1619 expected_hash: RangeRequest::hash_payloads([
1620 random_vid_commit(),
1621 random_vid_commit(),
1622 ]),
1623 }))
1624 .await
1625 .unwrap_err();
1626 assert!(err.to_string().contains("wrong VID common"), "{err:#}");
1627 }
1628
1629 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1630 async fn test_archive_recovery() {
1631 let mut network = MockNetwork::<MockDataSource>::init().await;
1633
1634 let port = reserve_tcp_port().unwrap();
1636 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1637 app.register_module(
1638 "availability",
1639 define_api(
1640 &Default::default(),
1641 MockBase::instance(),
1642 "1.0.0".parse().unwrap(),
1643 )
1644 .unwrap(),
1645 )
1646 .unwrap();
1647 network.spawn(
1648 "server",
1649 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1650 );
1651
1652 let db = TmpDb::init().await;
1655 let provider = Provider::new(QueryServiceProvider::new(
1656 format!("http://localhost:{port}").parse().unwrap(),
1657 MockBase::instance(),
1658 ));
1659 let mut data_source = db
1660 .config()
1661 .pruner_cfg(
1662 PrunerCfg::new()
1663 .with_target_retention(Duration::from_secs(0))
1664 .with_interval(Duration::from_secs(5)),
1665 )
1666 .unwrap()
1667 .builder(provider.clone())
1668 .await
1669 .unwrap()
1670 .with_min_retry_interval(Duration::from_millis(100))
1674 .with_retry_randomization_factor(3.)
1678 .build()
1679 .await
1680 .unwrap();
1681
1682 network.start().await;
1684
1685 let leaves = network.data_source().subscribe_leaves(1).await;
1687 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1688
1689 let pruned_height = data_source
1691 .read()
1692 .await
1693 .unwrap()
1694 .load_pruned_height()
1695 .await
1696 .unwrap();
1697 assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");
1699
1700 let last_leaf = leaves.last().unwrap();
1703 data_source.append(last_leaf.clone().into()).await.unwrap();
1704
1705 for i in 1..=last_leaf.height() {
1707 tracing::info!(i, "fetching leaf");
1708 assert_eq!(
1709 data_source.get_leaf(i as usize).await.await,
1710 leaves[i as usize - 1]
1711 );
1712 }
1713
1714 loop {
1716 let pruned_height = data_source
1717 .read()
1718 .await
1719 .unwrap()
1720 .load_pruned_height()
1721 .await
1722 .unwrap();
1723 if pruned_height == Some(last_leaf.height()) {
1724 break;
1725 }
1726 tracing::info!(
1727 ?pruned_height,
1728 target_height = last_leaf.height(),
1729 "waiting for pruner to run"
1730 );
1731 sleep(Duration::from_secs(1)).await;
1732 }
1733
1734 data_source = db
1736 .config()
1737 .archive()
1738 .builder(provider.clone())
1739 .await
1740 .unwrap()
1741 .with_proactive_interval(Duration::from_secs(1))
1742 .build()
1743 .await
1744 .unwrap();
1745
1746 let pruned_height = data_source
1748 .read()
1749 .await
1750 .unwrap()
1751 .load_pruned_height()
1752 .await
1753 .unwrap();
1754 assert_eq!(pruned_height, None);
1755
1756 data_source.append(last_leaf.clone().into()).await.unwrap();
1760
1761 loop {
1763 let sync_status = data_source.sync_status().await.unwrap();
1764 if sync_status.is_fully_synced() {
1765 break;
1766 }
1767 tracing::info!(?sync_status, "waiting for node to sync");
1768 sleep(Duration::from_secs(1)).await;
1769 }
1770
1771 sleep(Duration::from_secs(3)).await;
1773 let sync_status = data_source.sync_status().await.unwrap();
1774 assert!(sync_status.is_fully_synced(), "{sync_status:#?}");
1775 }
1776
1777 #[derive(Clone, Copy, Debug)]
1778 enum FailureType {
1779 Begin,
1780 Write,
1781 Commit,
1782 }
1783
1784 async fn test_fetch_storage_failure_helper(failure: FailureType) {
1785 let mut network = MockNetwork::<MockDataSource>::init().await;
1787
1788 let port = reserve_tcp_port().unwrap();
1790 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1791 app.register_module(
1792 "availability",
1793 define_api(
1794 &Default::default(),
1795 MockBase::instance(),
1796 "1.0.0".parse().unwrap(),
1797 )
1798 .unwrap(),
1799 )
1800 .unwrap();
1801 network.spawn(
1802 "server",
1803 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1804 );
1805
1806 let provider = Provider::new(QueryServiceProvider::new(
1808 format!("http://localhost:{port}").parse().unwrap(),
1809 MockBase::instance(),
1810 ));
1811 let db = TmpDb::init().await;
1812 let storage = FailStorage::from(
1813 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1814 .await
1815 .unwrap(),
1816 );
1817 let data_source = FetchingDataSource::builder(storage, provider)
1818 .disable_proactive_fetching()
1819 .disable_aggregator()
1820 .with_max_retry_interval(Duration::from_millis(100))
1821 .with_retry_timeout(Duration::from_secs(1))
1822 .build()
1823 .await
1824 .unwrap();
1825
1826 network.start().await;
1828
1829 let leaves = network.data_source().subscribe_leaves(1).await;
1831 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1832
1833 let last_leaf = leaves.last().unwrap();
1835 let mut tx = data_source.write().await.unwrap();
1836 tx.insert_leaf(last_leaf).await.unwrap();
1837 tx.commit().await.unwrap();
1838
1839 tracing::info!("fetch with write failure");
1841 match failure {
1842 FailureType::Begin => {
1843 data_source
1844 .as_ref()
1845 .fail_begins_writable(FailableAction::Any)
1846 .await
1847 },
1848 FailureType::Write => data_source.as_ref().fail_writes(FailableAction::Any).await,
1849 FailureType::Commit => data_source.as_ref().fail_commits(FailableAction::Any).await,
1850 }
1851 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1852 data_source.as_ref().pass().await;
1853
1854 sleep(Duration::from_secs(1)).await;
1859
1860 tracing::info!("fetch with write success");
1863 let fetch = data_source.get_leaf(1).await;
1864 assert!(fetch.is_pending());
1865 assert_eq!(leaves[0], fetch.await);
1866
1867 sleep(Duration::from_secs(1)).await;
1868
1869 tracing::info!("retrieve from storage");
1871 let fetch = data_source.get_leaf(1).await;
1872 assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());
1873 }
1874
1875 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1876 async fn test_fetch_storage_failure_on_begin() {
1877 test_fetch_storage_failure_helper(FailureType::Begin).await;
1878 }
1879
1880 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1881 async fn test_fetch_storage_failure_on_write() {
1882 test_fetch_storage_failure_helper(FailureType::Write).await;
1883 }
1884
1885 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1886 async fn test_fetch_storage_failure_on_commit() {
1887 test_fetch_storage_failure_helper(FailureType::Commit).await;
1888 }
1889
1890 async fn test_fetch_storage_failure_retry_helper(failure: FailureType) {
1891 let mut network = MockNetwork::<MockDataSource>::init().await;
1893
1894 let port = reserve_tcp_port().unwrap();
1896 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1897 app.register_module(
1898 "availability",
1899 define_api(
1900 &Default::default(),
1901 MockBase::instance(),
1902 "1.0.0".parse().unwrap(),
1903 )
1904 .unwrap(),
1905 )
1906 .unwrap();
1907 network.spawn(
1908 "server",
1909 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1910 );
1911
1912 let provider = Provider::new(QueryServiceProvider::new(
1914 format!("http://localhost:{port}").parse().unwrap(),
1915 MockBase::instance(),
1916 ));
1917 let db = TmpDb::init().await;
1918 let storage = FailStorage::from(
1919 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1920 .await
1921 .unwrap(),
1922 );
1923 let data_source = FetchingDataSource::builder(storage, provider)
1924 .disable_proactive_fetching()
1925 .disable_aggregator()
1926 .with_min_retry_interval(Duration::from_millis(100))
1927 .build()
1928 .await
1929 .unwrap();
1930
1931 network.start().await;
1933
1934 let leaves = network.data_source().subscribe_leaves(1).await;
1936 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1937
1938 let last_leaf = leaves.last().unwrap();
1940 let mut tx = data_source.write().await.unwrap();
1941 tx.insert_leaf(last_leaf).await.unwrap();
1942 tx.commit().await.unwrap();
1943
1944 tracing::info!("fetch with write failure");
1946 match failure {
1947 FailureType::Begin => {
1948 data_source
1949 .as_ref()
1950 .fail_one_begin_writable(FailableAction::Any)
1951 .await
1952 },
1953 FailureType::Write => {
1954 data_source
1955 .as_ref()
1956 .fail_one_write(FailableAction::Any)
1957 .await
1958 },
1959 FailureType::Commit => {
1960 data_source
1961 .as_ref()
1962 .fail_one_commit(FailableAction::Any)
1963 .await
1964 },
1965 }
1966 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1967
1968 let mut tx = data_source.read().await.unwrap();
1970 assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap());
1971 }
1972
1973 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1974 async fn test_fetch_storage_failure_retry_on_begin() {
1975 test_fetch_storage_failure_retry_helper(FailureType::Begin).await;
1976 }
1977
1978 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1979 async fn test_fetch_storage_failure_retry_on_write() {
1980 test_fetch_storage_failure_retry_helper(FailureType::Write).await;
1981 }
1982
1983 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1984 async fn test_fetch_storage_failure_retry_on_commit() {
1985 test_fetch_storage_failure_retry_helper(FailureType::Commit).await;
1986 }
1987
1988 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1989 async fn test_fetch_on_decide() {
1990 let mut network = MockNetwork::<MockDataSource>::init().await;
1992
1993 let port = reserve_tcp_port().unwrap();
1995 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1996 app.register_module(
1997 "availability",
1998 define_api(
1999 &Default::default(),
2000 MockBase::instance(),
2001 "1.0.0".parse().unwrap(),
2002 )
2003 .unwrap(),
2004 )
2005 .unwrap();
2006 network.spawn(
2007 "server",
2008 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2009 );
2010
2011 let db = TmpDb::init().await;
2013 let provider = Provider::new(QueryServiceProvider::new(
2014 format!("http://localhost:{port}").parse().unwrap(),
2015 MockBase::instance(),
2016 ));
2017 let data_source = builder(&db, &provider)
2018 .await
2019 .with_max_retry_interval(Duration::from_secs(1))
2020 .build()
2021 .await
2022 .unwrap();
2023
2024 network.start().await;
2026
2027 let leaf = network
2029 .data_source()
2030 .subscribe_leaves(1)
2031 .await
2032 .next()
2033 .await
2034 .unwrap();
2035
2036 tracing::info!("send decide event");
2038 data_source.append(leaf.clone().into()).await.unwrap();
2039
2040 tracing::info!("wait");
2043 sleep(Duration::from_secs(5)).await;
2044
2045 let mut tx = data_source.read().await.unwrap();
2049 let id = BlockId::<MockTypes>::from(leaf.height() as usize);
2050 let block = tx.get_block(id).await.unwrap();
2051 let vid = tx.get_vid_common(id).await.unwrap();
2052
2053 assert_eq!(block.hash(), leaf.block_hash());
2054 assert_eq!(vid.block_hash(), leaf.block_hash());
2055 }
2056
2057 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2058 async fn test_fetch_begin_failure() {
2059 let mut network = MockNetwork::<MockDataSource>::init().await;
2061
2062 let port = reserve_tcp_port().unwrap();
2064 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2065 app.register_module(
2066 "availability",
2067 define_api(
2068 &Default::default(),
2069 MockBase::instance(),
2070 "1.0.0".parse().unwrap(),
2071 )
2072 .unwrap(),
2073 )
2074 .unwrap();
2075 network.spawn(
2076 "server",
2077 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2078 );
2079
2080 let provider = Provider::new(QueryServiceProvider::new(
2082 format!("http://localhost:{port}").parse().unwrap(),
2083 MockBase::instance(),
2084 ));
2085 let db = TmpDb::init().await;
2086 let storage = FailStorage::from(
2087 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2088 .await
2089 .unwrap(),
2090 );
2091 let data_source = FetchingDataSource::builder(storage, provider)
2092 .disable_proactive_fetching()
2093 .disable_aggregator()
2094 .with_min_retry_interval(Duration::from_millis(100))
2095 .build()
2096 .await
2097 .unwrap();
2098
2099 network.start().await;
2101
2102 let leaves = network.data_source().subscribe_leaves(1).await;
2104 let leaves = leaves.take(2).collect::<Vec<_>>().await;
2105
2106 let last_leaf = leaves.last().unwrap();
2108 let mut tx = data_source.write().await.unwrap();
2109 tx.insert_leaf(last_leaf).await.unwrap();
2110 tx.commit().await.unwrap();
2111
2112 tracing::info!("fetch with transaction failure");
2115 data_source
2116 .as_ref()
2117 .fail_one_begin_read_only(FailableAction::Any)
2118 .await;
2119 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
2120 }
2121
2122 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2123 async fn test_fetch_load_failure_block() {
2124 let mut network = MockNetwork::<MockDataSource>::init().await;
2126
2127 let port = reserve_tcp_port().unwrap();
2129 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2130 app.register_module(
2131 "availability",
2132 define_api(
2133 &Default::default(),
2134 MockBase::instance(),
2135 "1.0.0".parse().unwrap(),
2136 )
2137 .unwrap(),
2138 )
2139 .unwrap();
2140 network.spawn(
2141 "server",
2142 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2143 );
2144
2145 let provider = Provider::new(QueryServiceProvider::new(
2147 format!("http://localhost:{port}").parse().unwrap(),
2148 MockBase::instance(),
2149 ));
2150 let db = TmpDb::init().await;
2151 let storage = FailStorage::from(
2152 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2153 .await
2154 .unwrap(),
2155 );
2156 let data_source = FetchingDataSource::builder(storage, provider)
2157 .disable_proactive_fetching()
2158 .disable_aggregator()
2159 .with_min_retry_interval(Duration::from_millis(100))
2160 .build()
2161 .await
2162 .unwrap();
2163
2164 network.start().await;
2166
2167 let mut leaves = network.data_source().subscribe_leaves(1).await;
2169 let leaf = leaves.next().await.unwrap();
2170
2171 let mut tx = data_source.write().await.unwrap();
2174 tx.insert_leaf(&leaf).await.unwrap();
2175 tx.commit().await.unwrap();
2176
2177 tracing::info!("fetch with read failure");
2191 data_source
2192 .as_ref()
2193 .fail_one_read(FailableAction::GetHeader)
2194 .await;
2195 let fetch = data_source.get_block(leaf.block_hash()).await;
2196
2197 sleep(Duration::from_secs(2)).await;
2199 data_source.as_ref().pass().await;
2200
2201 let block: BlockQueryData<MockTypes> = fetch.await;
2202 assert_eq!(block.hash(), leaf.block_hash());
2203 }
2204
2205 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2206 async fn test_fetch_load_failure_tx() {
2207 let mut network = MockNetwork::<MockDataSource>::init().await;
2209
2210 let port = reserve_tcp_port().unwrap();
2212 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2213 app.register_module(
2214 "availability",
2215 define_api(
2216 &Default::default(),
2217 MockBase::instance(),
2218 "1.0.0".parse().unwrap(),
2219 )
2220 .unwrap(),
2221 )
2222 .unwrap();
2223 network.spawn(
2224 "server",
2225 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2226 );
2227
2228 let provider = Provider::new(QueryServiceProvider::new(
2230 format!("http://localhost:{port}").parse().unwrap(),
2231 MockBase::instance(),
2232 ));
2233 let db = TmpDb::init().await;
2234 let storage = FailStorage::from(
2235 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2236 .await
2237 .unwrap(),
2238 );
2239 let data_source = FetchingDataSource::builder(storage, provider)
2240 .disable_proactive_fetching()
2241 .disable_aggregator()
2242 .with_min_retry_interval(Duration::from_millis(100))
2243 .build()
2244 .await
2245 .unwrap();
2246
2247 network.start().await;
2249
2250 let tx = mock_transaction(vec![1, 2, 3]);
2252 network.submit_transaction(tx.clone()).await;
2253 let tx = network
2254 .data_source()
2255 .get_block_containing_transaction(tx.commit())
2256 .await
2257 .await;
2258
2259 {
2261 let leaf = network
2262 .data_source()
2263 .get_leaf(tx.transaction.block_height() as usize)
2264 .await
2265 .await;
2266 let block = network
2267 .data_source()
2268 .get_block(tx.transaction.block_height() as usize)
2269 .await
2270 .await;
2271 let mut tx = data_source.write().await.unwrap();
2272 tx.insert_leaf(&leaf).await.unwrap();
2273 tx.insert_block(&block).await.unwrap();
2274 tx.commit().await.unwrap();
2275 }
2276
2277 tracing::info!("fetch success");
2279 assert_eq!(
2280 tx,
2281 data_source
2282 .get_block_containing_transaction(tx.transaction.hash())
2283 .await
2284 .await
2285 );
2286
2287 tracing::info!("fetch with read failure");
2299 data_source
2300 .as_ref()
2301 .fail_one_read(FailableAction::Any)
2302 .await;
2303 let fetch = data_source
2304 .get_block_containing_transaction(tx.transaction.hash())
2305 .await;
2306
2307 assert_eq!(tx, fetch.await);
2308 }
2309
2310 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2311 async fn test_stream_begin_failure() {
2312 let mut network = MockNetwork::<MockDataSource>::init().await;
2314
2315 let port = reserve_tcp_port().unwrap();
2317 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2318 app.register_module(
2319 "availability",
2320 define_api(
2321 &Default::default(),
2322 MockBase::instance(),
2323 "1.0.0".parse().unwrap(),
2324 )
2325 .unwrap(),
2326 )
2327 .unwrap();
2328 network.spawn(
2329 "server",
2330 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2331 );
2332
2333 let provider = Provider::new(QueryServiceProvider::new(
2335 format!("http://localhost:{port}").parse().unwrap(),
2336 MockBase::instance(),
2337 ));
2338 let db = TmpDb::init().await;
2339 let storage = FailStorage::from(
2340 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2341 .await
2342 .unwrap(),
2343 );
2344 let data_source = FetchingDataSource::builder(storage, provider)
2345 .disable_proactive_fetching()
2346 .disable_aggregator()
2347 .with_min_retry_interval(Duration::from_millis(100))
2348 .with_range_chunk_size(3)
2349 .build()
2350 .await
2351 .unwrap();
2352
2353 network.start().await;
2355
2356 let leaves = network.data_source().subscribe_leaves(1).await;
2358 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2359
2360 let last_leaf = leaves.last().unwrap();
2362 let mut tx = data_source.write().await.unwrap();
2363 tx.insert_leaf(last_leaf).await.unwrap();
2364 tx.commit().await.unwrap();
2365
2366 tracing::info!("stream with transaction failure");
2369 data_source
2370 .as_ref()
2371 .fail_one_begin_read_only(FailableAction::Any)
2372 .await;
2373 assert_eq!(
2374 leaves,
2375 data_source
2376 .subscribe_leaves(1)
2377 .await
2378 .take(5)
2379 .collect::<Vec<_>>()
2380 .await
2381 );
2382 }
2383
2384 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2385 async fn test_stream_load_failure() {
2386 let mut network = MockNetwork::<MockDataSource>::init().await;
2388
2389 let port = reserve_tcp_port().unwrap();
2391 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2392 app.register_module(
2393 "availability",
2394 define_api(
2395 &Default::default(),
2396 MockBase::instance(),
2397 "1.0.0".parse().unwrap(),
2398 )
2399 .unwrap(),
2400 )
2401 .unwrap();
2402 network.spawn(
2403 "server",
2404 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2405 );
2406
2407 let provider = Provider::new(QueryServiceProvider::new(
2409 format!("http://localhost:{port}").parse().unwrap(),
2410 MockBase::instance(),
2411 ));
2412 let db = TmpDb::init().await;
2413 let storage = FailStorage::from(
2414 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2415 .await
2416 .unwrap(),
2417 );
2418 let data_source = FetchingDataSource::builder(storage, provider)
2419 .disable_proactive_fetching()
2420 .disable_aggregator()
2421 .with_min_retry_interval(Duration::from_millis(100))
2422 .with_range_chunk_size(3)
2423 .build()
2424 .await
2425 .unwrap();
2426
2427 network.start().await;
2429
2430 let leaves = network.data_source().subscribe_leaves(1).await;
2432 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2433
2434 let last_leaf = leaves.last().unwrap();
2436 let mut tx = data_source.write().await.unwrap();
2437 tx.insert_leaf(last_leaf).await.unwrap();
2438 tx.commit().await.unwrap();
2439
2440 tracing::info!("stream with read failure");
2442 data_source.as_ref().fail_reads(FailableAction::Any).await;
2443 let fetches = data_source
2444 .get_block_range(1..=5)
2445 .await
2446 .collect::<Vec<_>>()
2447 .await;
2448
2449 sleep(Duration::from_secs(2)).await;
2451 data_source.as_ref().pass().await;
2452
2453 for (leaf, fetch) in leaves.iter().zip(fetches) {
2454 let block: BlockQueryData<MockTypes> = fetch.await;
2455 assert_eq!(block.hash(), leaf.block_hash());
2456 }
2457 }
2458
2459 enum MetadataType {
2460 Payload,
2461 Vid,
2462 }
2463
2464 async fn test_metadata_stream_begin_failure_helper(stream: MetadataType) {
2465 let mut network = MockNetwork::<MockDataSource>::init().await;
2467
2468 let port = reserve_tcp_port().unwrap();
2470 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2471 app.register_module(
2472 "availability",
2473 define_api(
2474 &Default::default(),
2475 MockBase::instance(),
2476 "1.0.0".parse().unwrap(),
2477 )
2478 .unwrap(),
2479 )
2480 .unwrap();
2481 network.spawn(
2482 "server",
2483 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2484 );
2485
2486 let provider = Provider::new(QueryServiceProvider::new(
2488 format!("http://localhost:{port}").parse().unwrap(),
2489 MockBase::instance(),
2490 ));
2491 let db = TmpDb::init().await;
2492 let storage = FailStorage::from(
2493 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2494 .await
2495 .unwrap(),
2496 );
2497 let data_source = FetchingDataSource::builder(storage, provider)
2498 .disable_proactive_fetching()
2499 .disable_aggregator()
2500 .with_min_retry_interval(Duration::from_millis(100))
2501 .with_range_chunk_size(3)
2502 .build()
2503 .await
2504 .unwrap();
2505
2506 network.start().await;
2508
2509 let leaves = network.data_source().subscribe_leaves(1).await;
2511 let leaves = leaves.take(3).collect::<Vec<_>>().await;
2512
2513 let last_leaf = leaves.last().unwrap();
2515 let mut tx = data_source.write().await.unwrap();
2516 tx.insert_leaf(last_leaf).await.unwrap();
2517 tx.commit().await.unwrap();
2518
2519 let leaf = network.data_source().get_leaf(1).await.await;
2524 let block = network.data_source().get_block(1).await.await;
2525 let vid = network.data_source().get_vid_common(1).await.await;
2526 data_source
2527 .append(BlockInfo::new(leaf, Some(block), Some(vid), None))
2528 .await
2529 .unwrap();
2530
2531 tracing::info!("stream with transaction failure");
2533 data_source
2534 .as_ref()
2535 .fail_begins_read_only(FailableAction::Any)
2536 .await;
2537 match stream {
2538 MetadataType::Payload => {
2539 let payloads = data_source.subscribe_payload_metadata(1).await.take(3);
2540
2541 sleep(Duration::from_secs(2)).await;
2543 tracing::info!("stop failing transactions");
2544 data_source.as_ref().pass().await;
2545
2546 let payloads = payloads.collect::<Vec<_>>().await;
2547 for (leaf, payload) in leaves.iter().zip(payloads) {
2548 assert_eq!(payload.block_hash, leaf.block_hash());
2549 }
2550 },
2551 MetadataType::Vid => {
2552 let vids = data_source.subscribe_vid_common_metadata(1).await.take(3);
2553
2554 sleep(Duration::from_secs(2)).await;
2556 tracing::info!("stop failing transactions");
2557 data_source.as_ref().pass().await;
2558
2559 let vids = vids.collect::<Vec<_>>().await;
2560 for (leaf, vid) in leaves.iter().zip(vids) {
2561 assert_eq!(vid.block_hash, leaf.block_hash());
2562 }
2563 },
2564 }
2565 }
2566
2567 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2568 async fn test_metadata_stream_begin_failure_payload() {
2569 test_metadata_stream_begin_failure_helper(MetadataType::Payload).await
2570 }
2571
2572 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2573 async fn test_metadata_stream_begin_failure_vid() {
2574 test_metadata_stream_begin_failure_helper(MetadataType::Vid).await
2575 }
2576
2577 #[tokio::test(flavor = "multi_thread")]
2578 #[test_log::test]
2579 async fn test_ranged_fetch() {
2580 let mut network = MockNetwork::<MockDataSource>::init().await;
2582
2583 let port = reserve_tcp_port().unwrap();
2585 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2586 app.register_module(
2587 "availability",
2588 define_api(
2589 &Default::default(),
2590 MockBase::instance(),
2591 "1.0.0".parse().unwrap(),
2592 )
2593 .unwrap(),
2594 )
2595 .unwrap();
2596 network.spawn(
2597 "server",
2598 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2599 );
2600
2601 network.start().await;
2603
2604 let leaves = network
2606 .data_source()
2607 .subscribe_leaves(0)
2608 .await
2609 .take(5)
2610 .collect::<Vec<_>>()
2611 .await;
2612 let blocks = network
2613 .data_source()
2614 .subscribe_blocks(0)
2615 .await
2616 .take(5)
2617 .collect::<Vec<_>>()
2618 .await;
2619 let vid = network
2620 .data_source()
2621 .subscribe_vid_common(0)
2622 .await
2623 .take(5)
2624 .collect::<Vec<_>>()
2625 .await;
2626
2627 let provider = QueryServiceProvider::new(
2629 format!("http://localhost:{port}").parse().unwrap(),
2630 MockBase::instance(),
2631 );
2632
2633 tracing::info!("fetch leaf range");
2635 assert_eq!(
2636 provider
2637 .fetch(LeafRangeRequest {
2638 start: 0,
2639 end: 5,
2640 last_leaf: leaves[4].hash(),
2641 last_qc: leaves[4].qc().commit(),
2642 })
2643 .await
2644 .unwrap(),
2645 leaves
2646 );
2647 let headers = NonEmptyRange::new(leaves.iter().map(|leaf| leaf.header().clone())).unwrap();
2648 tracing::info!(?headers, "fetch block range");
2649 assert_eq!(
2650 ProviderTrait::<MockTypes, _>::fetch(
2651 &provider,
2652 BlockRangeRequest::from(RangeRequest::from_headers::<MockTypes>(&headers))
2653 )
2654 .await
2655 .unwrap(),
2656 blocks
2657 );
2658 tracing::info!(?headers, "fetch VID common range");
2659 assert_eq!(
2660 ProviderTrait::<MockTypes, _>::fetch(
2661 &provider,
2662 VidCommonRangeRequest::from(RangeRequest::from_headers::<MockTypes>(&headers))
2663 )
2664 .await
2665 .unwrap(),
2666 vid
2667 );
2668 }
2669
2670 async fn old_server(port: u16) {
2672 let mut api = Api::<(), availability::Error, StaticVersion<1, 0>>::new(toml! {
2673 [route.get_vid_common]
2674 PATH = ["vid/common/:height"]
2675 ":height" = "Integer"
2676 })
2677 .unwrap();
2678
2679 api.get("get_vid_common", move |req, _| {
2680 async move {
2681 let mut common = VidCommonQueryData::<MockTypes>::genesis(
2682 &Default::default(),
2683 &Default::default(),
2684 TEST_VERSIONS.test.base,
2685 )
2686 .await;
2687 common.height = req.integer_param("height")?;
2688 Ok(common)
2689 }
2690 .boxed()
2691 })
2692 .unwrap();
2693
2694 let mut app = App::<(), Error>::with_state(());
2695 app.register_module("availability", api).unwrap();
2696 app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
2697 .await
2698 .ok();
2699 }
2700
2701 #[tokio::test]
2702 #[test_log::test]
2703 async fn test_vid_common_fallback() {
2704 let port = reserve_tcp_port().unwrap();
2705 let _server = BackgroundTask::spawn("old server", old_server(port));
2706 let provider = QueryServiceProvider::new(
2707 format!("http://localhost:{port}").parse().unwrap(),
2708 StaticVersion::<1, 0>::instance(),
2709 );
2710
2711 let common = try_join_all((0..5).map(|i| {
2713 provider
2714 .client
2715 .get::<VidCommonQueryData<MockTypes>>(&format!("availability/vid/common/{i}"))
2716 .send()
2717 }))
2718 .await
2719 .unwrap();
2720
2721 let expected_hash =
2723 RangeRequest::hash_payloads(common.iter().map(|common| common.payload_hash));
2724 let req = RangeRequest {
2725 start: 0,
2726 end: 5,
2727 expected_hash,
2728 };
2729 assert_eq!(
2730 common.as_slice(),
2731 provider
2732 .fetch_vid_common_range(req.into())
2733 .await
2734 .unwrap()
2735 .as_ref()
2736 );
2737 }
2738}