1use std::fmt::Debug;
14
15use anyhow::{Context, ensure};
16use async_trait::async_trait;
17use futures::{TryFutureExt, future::try_join_all};
18use hotshot_types::{data::VidCommon, traits::node_implementation::NodeType};
19use surf_disco::{Client, Url};
20use vbs::version::StaticVersionType;
21
22use super::Provider;
23use crate::{
24 Error, Payload,
25 availability::{BlockQueryData, Certificate2, LeafQueryData, VidCommonQueryData},
26 fetching::{
27 NonEmptyRange,
28 request::{
29 BlockRangeRequest, Certificate2Request, LeafRangeRequest, LeafRequest, PayloadRequest,
30 VidCommonRangeRequest, VidCommonRequest,
31 },
32 },
33 types::HeightIndexed,
34};
35
36#[derive(Clone, Debug)]
50pub struct TrustedQueryServiceProvider<Ver: StaticVersionType> {
51 client: Client<Error, Ver>,
52}
53
54impl<Ver: StaticVersionType> TrustedQueryServiceProvider<Ver> {
55 pub fn new(url: Url, _: Ver) -> Self {
56 Self {
57 client: Client::new(url),
58 }
59 }
60}
61
62impl<Ver: StaticVersionType> TrustedQueryServiceProvider<Ver> {
63 pub async fn fetch_payload<Types: NodeType>(
64 &self,
65 req: PayloadRequest,
66 ) -> anyhow::Result<Payload<Types>> {
67 let block = self
68 .client
69 .get::<BlockQueryData<Types>>(&format!("availability/block/payload-hash/{}", req.0))
70 .send()
71 .await
72 .context("fetching block")?;
73
74 Ok(block.payload)
75 }
76
77 pub async fn fetch_payload_range<Types: NodeType>(
78 &self,
79 req: BlockRangeRequest,
80 ) -> anyhow::Result<NonEmptyRange<BlockQueryData<Types>>> {
81 let blocks = self
82 .client
83 .get::<NonEmptyRange<BlockQueryData<Types>>>(&format!(
84 "availability/block/{}/{}",
85 req.start, req.end
86 ))
87 .send()
88 .await
89 .context("fetching blocks")?;
90
91 ensure!(
92 blocks.start() == req.start && blocks.end() == req.end,
93 "wrong block range ({}..{})",
94 blocks.start(),
95 blocks.end()
96 );
97
98 Ok(blocks)
99 }
100
101 fn handle_result<R: Debug, T>(&self, req: R, res: anyhow::Result<T>) -> Option<T> {
102 match res {
103 Ok(res) => Some(res),
104 Err(err) => {
105 tracing::warn!(upstream = %self.client.base_url(), ?req, "failed to fetch: {err:#}");
106 None
107 },
108 }
109 }
110}
111
112#[async_trait]
113impl<Types, Ver: StaticVersionType> Provider<Types, PayloadRequest>
114 for TrustedQueryServiceProvider<Ver>
115where
116 Types: NodeType,
117{
118 async fn fetch(&self, req: PayloadRequest) -> Option<Payload<Types>> {
119 self.handle_result(req, self.fetch_payload::<Types>(req).await)
120 }
121}
122
123#[async_trait]
124impl<Types, Ver: StaticVersionType> Provider<Types, BlockRangeRequest>
125 for TrustedQueryServiceProvider<Ver>
126where
127 Types: NodeType,
128{
129 async fn fetch(&self, req: BlockRangeRequest) -> Option<NonEmptyRange<BlockQueryData<Types>>> {
130 self.handle_result(req, self.fetch_payload_range::<Types>(req).await)
131 }
132}
133
134impl<Ver: StaticVersionType> TrustedQueryServiceProvider<Ver> {
135 pub async fn fetch_leaf<Types: NodeType>(
136 &self,
137 req: LeafRequest,
138 ) -> anyhow::Result<LeafQueryData<Types>> {
139 let leaf = self
140 .client
141 .get::<LeafQueryData<Types>>(&format!("availability/leaf/{}", req.height))
142 .send()
143 .await
144 .context("fetching leaf")?;
145
146 ensure!(
147 leaf.height() == req.height,
148 "received leaf with the wrong height ({})",
149 leaf.height(),
150 );
151
152 Ok(leaf)
153 }
154
155 pub async fn fetch_leaf_range<Types: NodeType>(
156 &self,
157 req: LeafRangeRequest,
158 ) -> anyhow::Result<NonEmptyRange<LeafQueryData<Types>>> {
159 let leaves = self
160 .client
161 .get::<NonEmptyRange<LeafQueryData<Types>>>(&format!(
162 "availability/leaf/{}/{}",
163 req.start, req.end
164 ))
165 .send()
166 .await
167 .context("fetching leaf chain")?;
168
169 ensure!(
170 leaves.start() == req.start && leaves.end() == req.end,
171 "server returned wrong range of leaves ({}..{})",
172 leaves.start(),
173 leaves.end()
174 );
175
176 Ok(leaves)
177 }
178}
179
180#[async_trait]
181impl<Types, Ver: StaticVersionType> Provider<Types, LeafRequest>
182 for TrustedQueryServiceProvider<Ver>
183where
184 Types: NodeType,
185{
186 async fn fetch(&self, req: LeafRequest) -> Option<LeafQueryData<Types>> {
187 self.handle_result(req, self.fetch_leaf(req).await)
188 }
189}
190
191#[async_trait]
192impl<Types, Ver: StaticVersionType> Provider<Types, LeafRangeRequest>
193 for TrustedQueryServiceProvider<Ver>
194where
195 Types: NodeType,
196{
197 async fn fetch(&self, req: LeafRangeRequest) -> Option<NonEmptyRange<LeafQueryData<Types>>> {
198 self.handle_result(req, self.fetch_leaf_range(req).await)
199 }
200}
201
202impl<Ver: StaticVersionType> TrustedQueryServiceProvider<Ver> {
203 pub async fn fetch_cert2<Types: NodeType>(
205 &self,
206 height: u64,
207 ) -> anyhow::Result<Option<Certificate2<Types>>> {
208 self.client
209 .get(&format!("availability/cert2/{height}"))
210 .send()
211 .await
212 .context("fetching cert2")
213 }
214}
215
216#[async_trait]
217impl<Types, Ver: StaticVersionType> Provider<Types, Certificate2Request>
218 for TrustedQueryServiceProvider<Ver>
219where
220 Types: NodeType,
221{
222 async fn fetch(&self, req: Certificate2Request) -> Option<Option<Certificate2<Types>>> {
223 self.handle_result(req, self.fetch_cert2(req.height).await)
224 }
225}
226
227impl<Ver: StaticVersionType> TrustedQueryServiceProvider<Ver> {
228 pub async fn fetch_vid_common<Types: NodeType>(
229 &self,
230 req: VidCommonRequest,
231 ) -> anyhow::Result<VidCommon> {
232 let res = self
233 .client
234 .get::<VidCommonQueryData<Types>>(&format!(
235 "availability/vid/common/payload-hash/{}",
236 req.0
237 ))
238 .send()
239 .await
240 .context("fetching VID common")?;
241
242 Ok(res.common)
243 }
244
245 async fn fetch_vid_common_range_with_fallback<Types: NodeType>(
246 &self,
247 start: u64,
248 end: u64,
249 ) -> anyhow::Result<NonEmptyRange<VidCommonQueryData<Types>>> {
250 let res = self
251 .client
252 .get::<NonEmptyRange<VidCommonQueryData<Types>>>(&format!(
253 "availability/vid/common/{start}/{end}",
254 ))
255 .send()
256 .await;
257 match res {
258 Ok(common) => Ok(common),
259 Err(Error::Custom { message, .. }) if message.contains("No route matches") => {
260 tracing::info!(
263 start,
264 end,
265 "server does not support ranged VID fetching, falling back to individual \
266 fetches"
267 );
268 let common = try_join_all((start..end).map(|i| {
269 self.client
270 .get::<VidCommonQueryData<Types>>(&format!("availability/vid/common/{i}"))
271 .send()
272 .map_err(move |err| {
273 anyhow::Error::new(err).context(format!("fetching VID common {i}"))
274 })
275 }))
276 .await?;
277 NonEmptyRange::new(common)
278 .context("converting individually fetched VID common into range")
279 },
280 Err(err) => Err(err).context("fetching VID common range"),
281 }
282 }
283
284 pub async fn fetch_vid_common_range<Types: NodeType>(
285 &self,
286 req: VidCommonRangeRequest,
287 ) -> anyhow::Result<NonEmptyRange<VidCommonQueryData<Types>>> {
288 let common = self
289 .fetch_vid_common_range_with_fallback(req.start, req.end)
290 .await?;
291
292 ensure!(
293 common.start() == req.start && common.end() == req.end,
294 "server returned wrong VID common ({}..{})",
295 common.start(),
296 common.end()
297 );
298
299 Ok(common)
300 }
301}
302
303#[async_trait]
304impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRequest>
305 for TrustedQueryServiceProvider<Ver>
306where
307 Types: NodeType,
308{
309 async fn fetch(&self, req: VidCommonRequest) -> Option<VidCommon> {
310 self.handle_result(req, self.fetch_vid_common::<Types>(req).await)
311 }
312}
313
314#[async_trait]
315impl<Types, Ver: StaticVersionType> Provider<Types, VidCommonRangeRequest>
316 for TrustedQueryServiceProvider<Ver>
317where
318 Types: NodeType,
319{
320 async fn fetch(
321 &self,
322 req: VidCommonRangeRequest,
323 ) -> Option<NonEmptyRange<VidCommonQueryData<Types>>> {
324 self.handle_result(req, self.fetch_vid_common_range::<Types>(req).await)
325 }
326}
327
328#[cfg(all(test, not(target_os = "windows")))]
330mod test {
331 use std::{future::IntoFuture, time::Duration};
332
333 use committable::Committable;
334 use futures::{
335 future::{FutureExt, join},
336 stream::StreamExt,
337 };
338 use hotshot_example_types::node_types::{EpochVersion, TEST_VERSIONS};
339 use test_utils::reserve_tcp_port;
340 use tide_disco::{Api, App};
341 use toml::toml;
342 use vbs::version::StaticVersion;
343
344 use super::*;
345 use crate::{
346 ApiState,
347 availability::{
348 self, AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction,
349 Fetch, UpdateAvailabilityData, define_api,
350 },
351 data_source::{
352 AvailabilityProvider, FetchingDataSource, Transaction, VersionedDataSource,
353 sql::{self, SqlDataSource},
354 storage::{
355 AvailabilityStorage, SqlStorage, StorageConnectionType, UpdateAvailabilityStorage,
356 fail_storage::{FailStorage, FailableAction},
357 pruning::{PrunedHeightStorage, PrunerCfg},
358 sql::testing::TmpDb,
359 },
360 },
361 fetching::provider::{NoFetching, Provider as ProviderTrait, TestProvider},
362 node::data_source::NodeDataSource,
363 task::BackgroundTask,
364 testing::{
365 consensus::{MockDataSource, MockNetwork},
366 mocks::{MockBase, MockTypes, mock_transaction},
367 sleep,
368 },
369 types::HeightIndexed,
370 };
371
372 type Provider = TestProvider<TrustedQueryServiceProvider<MockBase>>;
373 type EpochProvider = TestProvider<TrustedQueryServiceProvider<EpochVersion>>;
374
375 fn ignore<T>(_: T) {}
376
377 async fn builder<P: AvailabilityProvider<MockTypes> + Clone>(
379 db: &TmpDb,
380 provider: &P,
381 ) -> sql::Builder<MockTypes, P> {
382 db.config()
383 .builder((*provider).clone())
384 .await
385 .unwrap()
386 .disable_proactive_fetching()
389 }
390
391 async fn data_source<P: AvailabilityProvider<MockTypes> + Clone>(
393 db: &TmpDb,
394 provider: &P,
395 ) -> SqlDataSource<MockTypes, P> {
396 builder(db, provider).await.build().await.unwrap()
397 }
398
399 #[test_log::test(tokio::test(flavor = "multi_thread"))]
400 async fn test_fetch_on_request() {
401 let mut network = MockNetwork::<MockDataSource>::init().await;
403
404 let port = reserve_tcp_port().unwrap();
406 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
407 app.register_module(
408 "availability",
409 define_api(
410 &Default::default(),
411 MockBase::instance(),
412 "1.0.0".parse().unwrap(),
413 )
414 .unwrap(),
415 )
416 .unwrap();
417 network.spawn(
418 "server",
419 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
420 );
421
422 let db = TmpDb::init().await;
424 let provider = Provider::new(TrustedQueryServiceProvider::new(
425 format!("http://localhost:{port}").parse().unwrap(),
426 MockBase::instance(),
427 ));
428 let data_source = data_source(&db, &provider).await;
429
430 network.start().await;
432
433 let leaves = network.data_source().subscribe_leaves(1).await;
440 let leaves = leaves.take(5).collect::<Vec<_>>().await;
441 let test_leaf = &leaves[0];
442 let test_block = &leaves[1];
443 let test_payload = &leaves[2];
444 let test_common = &leaves[3];
445
446 tracing::info!("requesting unfetchable resources");
448 let mut fetches = vec![];
449 fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
451 fetches.push(
453 data_source
454 .get_leaf(test_leaf.height() as usize)
455 .await
456 .map(ignore),
457 );
458 fetches.push(
460 data_source
461 .get_block(test_block.block_hash())
462 .await
463 .map(ignore),
464 );
465 fetches.push(
466 data_source
467 .get_payload(test_payload.block_hash())
468 .await
469 .map(ignore),
470 );
471 fetches.push(
472 data_source
473 .get_vid_common(test_common.block_hash())
474 .await
475 .map(ignore),
476 );
477 fetches.push(
479 data_source
480 .get_block(test_block.height() as usize)
481 .await
482 .map(ignore),
483 );
484 fetches.push(
485 data_source
486 .get_payload(test_payload.height() as usize)
487 .await
488 .map(ignore),
489 );
490 fetches.push(
491 data_source
492 .get_vid_common(test_common.height() as usize)
493 .await
494 .map(ignore),
495 );
496 fetches.push(data_source.get_vid_common(0).await.map(ignore));
498 fetches.push(
500 data_source
501 .get_block_containing_transaction(mock_transaction(vec![]).commit())
502 .await
503 .map(ignore),
504 );
505
506 sleep(Duration::from_secs(1)).await;
509 for (i, fetch) in fetches.into_iter().enumerate() {
510 tracing::info!("checking fetch {i} is unresolved");
511 fetch.try_resolve().unwrap_err();
512 }
513
514 provider.block().await;
519 data_source
520 .append(leaves.last().cloned().unwrap().into())
521 .await
522 .unwrap();
523
524 tracing::info!("requesting fetchable resources");
525 let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
526 let req_block = data_source.get_block(test_block.height() as usize).await;
527 let req_payload = data_source
528 .get_payload(test_payload.height() as usize)
529 .await;
530 let req_common = data_source
531 .get_vid_common(test_common.height() as usize)
532 .await;
533
534 sleep(Duration::from_secs(1)).await;
540 req_leaf.try_resolve().unwrap_err();
541 req_block.try_resolve().unwrap_err();
542 req_payload.try_resolve().unwrap_err();
543 req_common.try_resolve().unwrap_err();
544
545 provider.unblock().await;
547 let leaf = data_source
548 .get_leaf(test_leaf.height() as usize)
549 .await
550 .await;
551 let block = data_source
552 .get_block(test_block.height() as usize)
553 .await
554 .await;
555 let payload = data_source
556 .get_payload(test_payload.height() as usize)
557 .await
558 .await;
559 let common = data_source
560 .get_vid_common(test_common.height() as usize)
561 .await
562 .await;
563 {
564 let truth = network.data_source();
566 assert_eq!(
567 leaf,
568 truth.get_leaf(test_leaf.height() as usize).await.await
569 );
570 assert_eq!(
571 block,
572 truth.get_block(test_block.height() as usize).await.await
573 );
574 assert_eq!(
575 payload,
576 truth
577 .get_payload(test_payload.height() as usize)
578 .await
579 .await
580 );
581 assert_eq!(
582 common,
583 truth
584 .get_vid_common(test_common.height() as usize)
585 .await
586 .await
587 );
588 }
589
590 provider.block().await;
595 for leaf in [test_block, test_payload] {
596 tracing::info!("fetching existing leaf {}", leaf.height());
597 let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
598 assert_eq!(*leaf, fetched_leaf);
599 }
600
601 tracing::info!("fetching block by hash");
606 provider.unblock().await;
607 {
608 let block = data_source.get_block(test_leaf.block_hash()).await.await;
609 assert_eq!(block.hash(), leaf.block_hash());
610 }
611
612 tracing::info!("fetching payload by hash");
616 {
617 let leaf = leaves.last().unwrap();
618 let payload = data_source.get_payload(leaf.block_hash()).await.await;
619 assert_eq!(payload.height(), leaf.height());
620 assert_eq!(payload.block_hash(), leaf.block_hash());
621 assert_eq!(payload.hash(), leaf.payload_hash());
622 }
623 }
624
625 #[tokio::test(flavor = "multi_thread")]
626 async fn test_fetch_on_request_epoch_version() {
627 tracing::info!("Starting test_fetch_on_request_epoch_version");
630
631 let mut network = MockNetwork::<MockDataSource>::init().await;
633
634 let port = reserve_tcp_port().unwrap();
636 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
637 app.register_module(
638 "availability",
639 define_api(
640 &Default::default(),
641 EpochVersion::instance(),
642 "1.0.0".parse().unwrap(),
643 )
644 .unwrap(),
645 )
646 .unwrap();
647 network.spawn(
648 "server",
649 app.serve(format!("0.0.0.0:{port}"), EpochVersion::instance()),
650 );
651
652 let db = TmpDb::init().await;
655 let provider = EpochProvider::new(TrustedQueryServiceProvider::new(
656 format!("http://localhost:{port}").parse().unwrap(),
657 EpochVersion::instance(),
658 ));
659 let data_source = data_source(&db, &provider).await;
660
661 network.start().await;
663
664 let leaves = network.data_source().subscribe_leaves(1).await;
671 let leaves = leaves.take(5).collect::<Vec<_>>().await;
672 let test_leaf = &leaves[0];
673 let test_block = &leaves[1];
674 let test_payload = &leaves[2];
675 let test_common = &leaves[3];
676
677 let mut fetches = vec![];
679 fetches.push(data_source.get_leaf(test_leaf.hash()).await.map(ignore));
681 fetches.push(
683 data_source
684 .get_leaf(test_leaf.height() as usize)
685 .await
686 .map(ignore),
687 );
688 fetches.push(
690 data_source
691 .get_block(test_block.block_hash())
692 .await
693 .map(ignore),
694 );
695 fetches.push(
696 data_source
697 .get_payload(test_payload.block_hash())
698 .await
699 .map(ignore),
700 );
701 fetches.push(
702 data_source
703 .get_vid_common(test_common.block_hash())
704 .await
705 .map(ignore),
706 );
707 fetches.push(
709 data_source
710 .get_block(test_block.height() as usize)
711 .await
712 .map(ignore),
713 );
714 fetches.push(
715 data_source
716 .get_payload(test_payload.height() as usize)
717 .await
718 .map(ignore),
719 );
720 fetches.push(
721 data_source
722 .get_vid_common(test_common.height() as usize)
723 .await
724 .map(ignore),
725 );
726 fetches.push(data_source.get_vid_common(0).await.map(ignore));
728 fetches.push(
730 data_source
731 .get_block_containing_transaction(mock_transaction(vec![]).commit())
732 .await
733 .map(ignore),
734 );
735
736 sleep(Duration::from_secs(1)).await;
739 for (i, fetch) in fetches.into_iter().enumerate() {
740 tracing::info!("checking fetch {i} is unresolved");
741 fetch.try_resolve().unwrap_err();
742 }
743
744 provider.block().await;
749 data_source
750 .append(leaves.last().cloned().unwrap().into())
751 .await
752 .unwrap();
753
754 let req_leaf = data_source.get_leaf(test_leaf.height() as usize).await;
755 let req_block = data_source.get_block(test_block.height() as usize).await;
756 let req_payload = data_source
757 .get_payload(test_payload.height() as usize)
758 .await;
759 let req_common = data_source
760 .get_vid_common(test_common.height() as usize)
761 .await;
762
763 sleep(Duration::from_secs(1)).await;
769 req_leaf.try_resolve().unwrap_err();
770 req_block.try_resolve().unwrap_err();
771 req_payload.try_resolve().unwrap_err();
772 req_common.try_resolve().unwrap_err();
773
774 provider.unblock().await;
776 let leaf = data_source
777 .get_leaf(test_leaf.height() as usize)
778 .await
779 .await;
780 let block = data_source
781 .get_block(test_block.height() as usize)
782 .await
783 .await;
784 let payload = data_source
785 .get_payload(test_payload.height() as usize)
786 .await
787 .await;
788 let common = data_source
789 .get_vid_common(test_common.height() as usize)
790 .await
791 .await;
792 {
793 let truth = network.data_source();
795 assert_eq!(
796 leaf,
797 truth.get_leaf(test_leaf.height() as usize).await.await
798 );
799 assert_eq!(
800 block,
801 truth.get_block(test_block.height() as usize).await.await
802 );
803 assert_eq!(
804 payload,
805 truth
806 .get_payload(test_payload.height() as usize)
807 .await
808 .await
809 );
810 assert_eq!(
811 common,
812 truth
813 .get_vid_common(test_common.height() as usize)
814 .await
815 .await
816 );
817 }
818
819 provider.block().await;
824 for leaf in [test_block, test_payload] {
825 tracing::info!("fetching existing leaf {}", leaf.height());
826 let fetched_leaf = data_source.get_leaf(leaf.height() as usize).await.await;
827 assert_eq!(*leaf, fetched_leaf);
828 }
829
830 provider.unblock().await;
835 {
836 let block = data_source.get_block(test_leaf.block_hash()).await.await;
837 assert_eq!(block.hash(), leaf.block_hash());
838 }
839
840 {
844 let leaf = leaves.last().unwrap();
845 let payload = data_source.get_payload(leaf.block_hash()).await.await;
846 assert_eq!(payload.height(), leaf.height());
847 assert_eq!(payload.block_hash(), leaf.block_hash());
848 assert_eq!(payload.hash(), leaf.payload_hash());
849 }
850
851 tracing::info!("Test completed successfully!");
853 }
854
855 #[test_log::test(tokio::test(flavor = "multi_thread"))]
856 async fn test_fetch_block_and_leaf_concurrently() {
857 let mut network = MockNetwork::<MockDataSource>::init().await;
859
860 let port = reserve_tcp_port().unwrap();
862 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
863 app.register_module(
864 "availability",
865 define_api(
866 &Default::default(),
867 MockBase::instance(),
868 "1.0.0".parse().unwrap(),
869 )
870 .unwrap(),
871 )
872 .unwrap();
873 network.spawn(
874 "server",
875 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
876 );
877
878 let db = TmpDb::init().await;
880 let provider = Provider::new(TrustedQueryServiceProvider::new(
881 format!("http://localhost:{port}").parse().unwrap(),
882 MockBase::instance(),
883 ));
884 let data_source = data_source(&db, &provider).await;
885
886 network.start().await;
888
889 let leaves = network.data_source().subscribe_leaves(1).await;
892 let leaves = leaves.take(2).collect::<Vec<_>>().await;
893 let test_leaf = &leaves[0];
894
895 data_source.append(leaves[1].clone().into()).await.unwrap();
897
898 let (leaf, block) = join(
902 data_source
903 .get_leaf(test_leaf.height() as usize)
904 .await
905 .into_future(),
906 data_source
907 .get_block(test_leaf.height() as usize)
908 .await
909 .into_future(),
910 )
911 .await;
912 assert_eq!(leaf, *test_leaf);
913 assert_eq!(leaf.header(), block.header());
914 }
915
916 #[test_log::test(tokio::test(flavor = "multi_thread"))]
917 async fn test_fetch_different_blocks_same_payload() {
918 let mut network = MockNetwork::<MockDataSource>::init().await;
920
921 let port = reserve_tcp_port().unwrap();
923 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
924 app.register_module(
925 "availability",
926 define_api(
927 &Default::default(),
928 MockBase::instance(),
929 "1.0.0".parse().unwrap(),
930 )
931 .unwrap(),
932 )
933 .unwrap();
934 network.spawn(
935 "server",
936 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
937 );
938
939 let db = TmpDb::init().await;
941 let provider = Provider::new(TrustedQueryServiceProvider::new(
942 format!("http://localhost:{port}").parse().unwrap(),
943 MockBase::instance(),
944 ));
945 let data_source = data_source(&db, &provider).await;
946
947 network.start().await;
949
950 let leaves = network.data_source().subscribe_leaves(1).await;
953 let leaves = leaves.take(4).collect::<Vec<_>>().await;
954
955 data_source
958 .append(leaves.last().cloned().unwrap().into())
959 .await
960 .unwrap();
961
962 assert_eq!(leaves[0].payload_hash(), leaves[1].payload_hash());
964 let (block1, block2) = join(
967 data_source
968 .get_block(leaves[0].height() as usize)
969 .await
970 .into_future(),
971 data_source
972 .get_block(leaves[1].height() as usize)
973 .await
974 .into_future(),
975 )
976 .await;
977 assert_eq!(block1.header(), leaves[0].header());
978 assert_eq!(block2.header(), leaves[1].header());
979 }
980
981 #[test_log::test(tokio::test(flavor = "multi_thread"))]
982 async fn test_fetch_stream() {
983 let mut network = MockNetwork::<MockDataSource>::init().await;
985
986 let port = reserve_tcp_port().unwrap();
988 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
989 app.register_module(
990 "availability",
991 define_api(
992 &Default::default(),
993 MockBase::instance(),
994 "1.0.0".parse().unwrap(),
995 )
996 .unwrap(),
997 )
998 .unwrap();
999 network.spawn(
1000 "server",
1001 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1002 );
1003
1004 let db = TmpDb::init().await;
1006 let provider = Provider::new(TrustedQueryServiceProvider::new(
1007 format!("http://localhost:{port}").parse().unwrap(),
1008 MockBase::instance(),
1009 ));
1010 let data_source = data_source(&db, &provider).await;
1011
1012 network.start().await;
1014
1015 let blocks = data_source.subscribe_blocks(0).await;
1017 let leaves = data_source.subscribe_leaves(0).await;
1018 let common = data_source.subscribe_vid_common(0).await;
1019
1020 let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1022 let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1023
1024 data_source
1027 .append(finalized_leaves.last().cloned().unwrap().into())
1028 .await
1029 .unwrap();
1030
1031 let blocks = blocks.take(5).collect::<Vec<_>>().await;
1033 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1034 let common = common.take(5).collect::<Vec<_>>().await;
1035 for i in 0..5 {
1036 tracing::info!("checking block {i}");
1037 assert_eq!(leaves[i], finalized_leaves[i]);
1038 assert_eq!(blocks[i].header(), finalized_leaves[i].header());
1039 assert_eq!(common[i], data_source.get_vid_common(i).await.await);
1040 }
1041 }
1042
1043 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1044 async fn test_fetch_range_start() {
1045 let mut network = MockNetwork::<MockDataSource>::init().await;
1047
1048 let port = reserve_tcp_port().unwrap();
1050 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1051 app.register_module(
1052 "availability",
1053 define_api(
1054 &Default::default(),
1055 MockBase::instance(),
1056 "1.0.0".parse().unwrap(),
1057 )
1058 .unwrap(),
1059 )
1060 .unwrap();
1061 network.spawn(
1062 "server",
1063 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1064 );
1065
1066 let db = TmpDb::init().await;
1068 let provider = Provider::new(TrustedQueryServiceProvider::new(
1069 format!("http://localhost:{port}").parse().unwrap(),
1070 MockBase::instance(),
1071 ));
1072 let data_source = data_source(&db, &provider).await;
1073
1074 network.start().await;
1076
1077 let finalized_leaves = network.data_source().subscribe_leaves(0).await;
1079 let finalized_leaves = finalized_leaves.take(5).collect::<Vec<_>>().await;
1080
1081 let mut tx = data_source.write().await.unwrap();
1085 tx.insert_leaf(&finalized_leaves[2]).await.unwrap();
1086 tx.insert_leaf(&finalized_leaves[4]).await.unwrap();
1087 tx.commit().await.unwrap();
1088
1089 let leaves = data_source
1091 .get_leaf_range(..5)
1092 .await
1093 .then(Fetch::resolve)
1094 .collect::<Vec<_>>()
1095 .await;
1096 for i in 0..5 {
1097 tracing::info!("checking leaf {i}");
1098 assert_eq!(leaves[i], finalized_leaves[i]);
1099 }
1100 }
1101
1102 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1103 async fn fetch_transaction() {
1104 let mut network = MockNetwork::<MockDataSource>::init().await;
1106
1107 let port = reserve_tcp_port().unwrap();
1109 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1110 app.register_module(
1111 "availability",
1112 define_api(
1113 &Default::default(),
1114 MockBase::instance(),
1115 "1.0.0".parse().unwrap(),
1116 )
1117 .unwrap(),
1118 )
1119 .unwrap();
1120 network.spawn(
1121 "server",
1122 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1123 );
1124
1125 let db = TmpDb::init().await;
1128 let data_source = data_source(&db, &NoFetching).await;
1129
1130 let mut leaves = network.data_source().subscribe_leaves(1).await;
1132 let mut blocks = network.data_source().subscribe_blocks(1).await;
1133
1134 network.start().await;
1136
1137 let tx = mock_transaction(vec![1, 2, 3]);
1141 let fut = data_source
1142 .get_block_containing_transaction(tx.commit())
1143 .await;
1144
1145 network.submit_transaction(tx.clone()).await;
1147
1148 let block = loop {
1151 let leaf = leaves.next().await.unwrap();
1152 let block = blocks.next().await.unwrap();
1153
1154 data_source
1155 .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1156 .await
1157 .unwrap();
1158
1159 if block.transaction_by_hash(tx.commit()).is_some() {
1160 break block;
1161 }
1162 };
1163 tracing::info!("transaction included in block {}", block.height());
1164
1165 let fetched_tx = fut.await;
1166 assert_eq!(
1167 fetched_tx,
1168 BlockWithTransaction::with_hash(block, tx.commit()).unwrap()
1169 );
1170
1171 assert_eq!(
1173 fetched_tx,
1174 data_source
1175 .get_block_containing_transaction(tx.commit())
1176 .await
1177 .await
1178 );
1179 }
1180
1181 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1182 async fn test_retry() {
1183 let mut network = MockNetwork::<MockDataSource>::init().await;
1185
1186 let port = reserve_tcp_port().unwrap();
1188 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1189 app.register_module(
1190 "availability",
1191 define_api(
1192 &Default::default(),
1193 MockBase::instance(),
1194 "1.0.0".parse().unwrap(),
1195 )
1196 .unwrap(),
1197 )
1198 .unwrap();
1199 network.spawn(
1200 "server",
1201 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1202 );
1203
1204 let db = TmpDb::init().await;
1206 let provider = Provider::new(TrustedQueryServiceProvider::new(
1207 format!("http://localhost:{port}").parse().unwrap(),
1208 MockBase::instance(),
1209 ));
1210 let data_source = builder(&db, &provider)
1211 .await
1212 .with_max_retry_interval(Duration::from_secs(1))
1213 .build()
1214 .await
1215 .unwrap();
1216
1217 network.start().await;
1219
1220 let leaves = network.data_source().subscribe_leaves(1).await;
1223 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1224 let test_leaf = &leaves[0];
1225
1226 provider.fail();
1228
1229 data_source
1232 .append(leaves.last().cloned().unwrap().into())
1233 .await
1234 .unwrap();
1235
1236 tracing::info!("requesting leaf from failing providers");
1237 let fut = data_source.get_leaf(test_leaf.height() as usize).await;
1238
1239 sleep(Duration::from_secs(5)).await;
1242 fut.try_resolve().unwrap_err();
1243
1244 provider.unfail();
1246 assert_eq!(
1247 data_source
1248 .get_leaf(test_leaf.height() as usize)
1249 .await
1250 .await,
1251 *test_leaf
1252 );
1253 }
1254
1255 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1256 async fn test_archive_recovery() {
1257 let mut network = MockNetwork::<MockDataSource>::init().await;
1259
1260 let port = reserve_tcp_port().unwrap();
1262 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1263 app.register_module(
1264 "availability",
1265 define_api(
1266 &Default::default(),
1267 MockBase::instance(),
1268 "1.0.0".parse().unwrap(),
1269 )
1270 .unwrap(),
1271 )
1272 .unwrap();
1273 network.spawn(
1274 "server",
1275 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1276 );
1277
1278 let db = TmpDb::init().await;
1281 let provider = Provider::new(TrustedQueryServiceProvider::new(
1282 format!("http://localhost:{port}").parse().unwrap(),
1283 MockBase::instance(),
1284 ));
1285 let mut data_source = db
1286 .config()
1287 .pruner_cfg(
1288 PrunerCfg::new()
1289 .with_target_retention(Duration::from_secs(0))
1290 .with_interval(Duration::from_secs(5)),
1291 )
1292 .unwrap()
1293 .builder(provider.clone())
1294 .await
1295 .unwrap()
1296 .with_min_retry_interval(Duration::from_millis(100))
1300 .with_retry_randomization_factor(3.)
1304 .build()
1305 .await
1306 .unwrap();
1307
1308 network.start().await;
1310
1311 let leaves = network.data_source().subscribe_leaves(1).await;
1313 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1314
1315 let pruned_height = data_source
1317 .read()
1318 .await
1319 .unwrap()
1320 .load_pruned_height()
1321 .await
1322 .unwrap();
1323 assert!(matches!(pruned_height, None | Some(0)), "{pruned_height:?}");
1325
1326 let last_leaf = leaves.last().unwrap();
1329 data_source.append(last_leaf.clone().into()).await.unwrap();
1330
1331 for i in 1..=last_leaf.height() {
1333 tracing::info!(i, "fetching leaf");
1334 assert_eq!(
1335 data_source.get_leaf(i as usize).await.await,
1336 leaves[i as usize - 1]
1337 );
1338 }
1339
1340 loop {
1342 let pruned_height = data_source
1343 .read()
1344 .await
1345 .unwrap()
1346 .load_pruned_height()
1347 .await
1348 .unwrap();
1349 if pruned_height == Some(last_leaf.height()) {
1350 break;
1351 }
1352 tracing::info!(
1353 ?pruned_height,
1354 target_height = last_leaf.height(),
1355 "waiting for pruner to run"
1356 );
1357 sleep(Duration::from_secs(1)).await;
1358 }
1359
1360 data_source = db
1362 .config()
1363 .archive()
1364 .builder(provider.clone())
1365 .await
1366 .unwrap()
1367 .with_proactive_interval(Duration::from_secs(1))
1368 .with_sync_status_ttl(Duration::from_secs(1))
1369 .build()
1370 .await
1371 .unwrap();
1372
1373 let pruned_height = data_source
1375 .read()
1376 .await
1377 .unwrap()
1378 .load_pruned_height()
1379 .await
1380 .unwrap();
1381 assert_eq!(pruned_height, None);
1382
1383 data_source.append(last_leaf.clone().into()).await.unwrap();
1387
1388 loop {
1390 let sync_status = data_source.sync_status().await.unwrap();
1391 if sync_status.is_fully_synced() {
1392 break;
1393 }
1394 tracing::info!(?sync_status, "waiting for node to sync");
1395 sleep(Duration::from_secs(1)).await;
1396 }
1397
1398 sleep(Duration::from_secs(3)).await;
1400 let sync_status = data_source.sync_status().await.unwrap();
1401 assert!(sync_status.is_fully_synced(), "{sync_status:#?}");
1402 }
1403
1404 #[derive(Clone, Copy, Debug)]
1405 enum FailureType {
1406 Begin,
1407 Write,
1408 Commit,
1409 }
1410
1411 async fn test_fetch_storage_failure_helper(failure: FailureType) {
1412 let mut network = MockNetwork::<MockDataSource>::init().await;
1414
1415 let port = reserve_tcp_port().unwrap();
1417 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1418 app.register_module(
1419 "availability",
1420 define_api(
1421 &Default::default(),
1422 MockBase::instance(),
1423 "1.0.0".parse().unwrap(),
1424 )
1425 .unwrap(),
1426 )
1427 .unwrap();
1428 network.spawn(
1429 "server",
1430 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1431 );
1432
1433 let provider = Provider::new(TrustedQueryServiceProvider::new(
1435 format!("http://localhost:{port}").parse().unwrap(),
1436 MockBase::instance(),
1437 ));
1438 let db = TmpDb::init().await;
1439 let storage = FailStorage::from(
1440 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1441 .await
1442 .unwrap(),
1443 );
1444 let data_source = FetchingDataSource::builder(storage, provider)
1445 .disable_proactive_fetching()
1446 .disable_aggregator()
1447 .with_max_retry_interval(Duration::from_millis(100))
1448 .with_retry_timeout(Duration::from_secs(1))
1449 .build()
1450 .await
1451 .unwrap();
1452
1453 network.start().await;
1455
1456 let leaves = network.data_source().subscribe_leaves(1).await;
1458 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1459
1460 let last_leaf = leaves.last().unwrap();
1462 let mut tx = data_source.write().await.unwrap();
1463 tx.insert_leaf(last_leaf).await.unwrap();
1464 tx.commit().await.unwrap();
1465
1466 tracing::info!("fetch with write failure");
1468 match failure {
1469 FailureType::Begin => {
1470 data_source
1471 .as_ref()
1472 .fail_begins_writable(FailableAction::Any)
1473 .await
1474 },
1475 FailureType::Write => data_source.as_ref().fail_writes(FailableAction::Any).await,
1476 FailureType::Commit => data_source.as_ref().fail_commits(FailableAction::Any).await,
1477 }
1478 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1479 data_source.as_ref().pass().await;
1480
1481 sleep(Duration::from_secs(1)).await;
1486
1487 tracing::info!("fetch with write success");
1490 let fetch = data_source.get_leaf(1).await;
1491 assert!(fetch.is_pending());
1492 assert_eq!(leaves[0], fetch.await);
1493
1494 sleep(Duration::from_secs(1)).await;
1495
1496 tracing::info!("retrieve from storage");
1498 let fetch = data_source.get_leaf(1).await;
1499 assert_eq!(leaves[0], fetch.try_resolve().ok().unwrap());
1500 }
1501
1502 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1503 async fn test_fetch_storage_failure_on_begin() {
1504 test_fetch_storage_failure_helper(FailureType::Begin).await;
1505 }
1506
1507 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1508 async fn test_fetch_storage_failure_on_write() {
1509 test_fetch_storage_failure_helper(FailureType::Write).await;
1510 }
1511
1512 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1513 async fn test_fetch_storage_failure_on_commit() {
1514 test_fetch_storage_failure_helper(FailureType::Commit).await;
1515 }
1516
1517 async fn test_fetch_storage_failure_retry_helper(failure: FailureType) {
1518 let mut network = MockNetwork::<MockDataSource>::init().await;
1520
1521 let port = reserve_tcp_port().unwrap();
1523 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1524 app.register_module(
1525 "availability",
1526 define_api(
1527 &Default::default(),
1528 MockBase::instance(),
1529 "1.0.0".parse().unwrap(),
1530 )
1531 .unwrap(),
1532 )
1533 .unwrap();
1534 network.spawn(
1535 "server",
1536 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1537 );
1538
1539 let provider = Provider::new(TrustedQueryServiceProvider::new(
1541 format!("http://localhost:{port}").parse().unwrap(),
1542 MockBase::instance(),
1543 ));
1544 let db = TmpDb::init().await;
1545 let storage = FailStorage::from(
1546 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1547 .await
1548 .unwrap(),
1549 );
1550 let data_source = FetchingDataSource::builder(storage, provider)
1551 .disable_proactive_fetching()
1552 .disable_aggregator()
1553 .with_min_retry_interval(Duration::from_millis(100))
1554 .build()
1555 .await
1556 .unwrap();
1557
1558 network.start().await;
1560
1561 let leaves = network.data_source().subscribe_leaves(1).await;
1563 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1564
1565 let last_leaf = leaves.last().unwrap();
1567 let mut tx = data_source.write().await.unwrap();
1568 tx.insert_leaf(last_leaf).await.unwrap();
1569 tx.commit().await.unwrap();
1570
1571 tracing::info!("fetch with write failure");
1573 match failure {
1574 FailureType::Begin => {
1575 data_source
1576 .as_ref()
1577 .fail_one_begin_writable(FailableAction::Any)
1578 .await
1579 },
1580 FailureType::Write => {
1581 data_source
1582 .as_ref()
1583 .fail_one_write(FailableAction::Any)
1584 .await
1585 },
1586 FailureType::Commit => {
1587 data_source
1588 .as_ref()
1589 .fail_one_commit(FailableAction::Any)
1590 .await
1591 },
1592 }
1593 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1594
1595 let mut tx = data_source.read().await.unwrap();
1597 assert_eq!(leaves[0], tx.get_leaf(1.into()).await.unwrap());
1598 }
1599
1600 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1601 async fn test_fetch_storage_failure_retry_on_begin() {
1602 test_fetch_storage_failure_retry_helper(FailureType::Begin).await;
1603 }
1604
1605 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1606 async fn test_fetch_storage_failure_retry_on_write() {
1607 test_fetch_storage_failure_retry_helper(FailureType::Write).await;
1608 }
1609
1610 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1611 async fn test_fetch_storage_failure_retry_on_commit() {
1612 test_fetch_storage_failure_retry_helper(FailureType::Commit).await;
1613 }
1614
1615 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1616 async fn test_fetch_on_decide() {
1617 let mut network = MockNetwork::<MockDataSource>::init().await;
1619
1620 let port = reserve_tcp_port().unwrap();
1622 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1623 app.register_module(
1624 "availability",
1625 define_api(
1626 &Default::default(),
1627 MockBase::instance(),
1628 "1.0.0".parse().unwrap(),
1629 )
1630 .unwrap(),
1631 )
1632 .unwrap();
1633 network.spawn(
1634 "server",
1635 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1636 );
1637
1638 let db = TmpDb::init().await;
1640 let provider = Provider::new(TrustedQueryServiceProvider::new(
1641 format!("http://localhost:{port}").parse().unwrap(),
1642 MockBase::instance(),
1643 ));
1644 let data_source = builder(&db, &provider)
1645 .await
1646 .with_max_retry_interval(Duration::from_secs(1))
1647 .build()
1648 .await
1649 .unwrap();
1650
1651 network.start().await;
1653
1654 let leaf = network
1656 .data_source()
1657 .subscribe_leaves(1)
1658 .await
1659 .next()
1660 .await
1661 .unwrap();
1662
1663 tracing::info!("send decide event");
1665 data_source.append(leaf.clone().into()).await.unwrap();
1666
1667 tracing::info!("wait");
1670 sleep(Duration::from_secs(5)).await;
1671
1672 let mut tx = data_source.read().await.unwrap();
1676 let id = BlockId::<MockTypes>::from(leaf.height() as usize);
1677 let block = tx.get_block(id).await.unwrap();
1678 let vid = tx.get_vid_common(id).await.unwrap();
1679
1680 assert_eq!(block.hash(), leaf.block_hash());
1681 assert_eq!(vid.block_hash(), leaf.block_hash());
1682 }
1683
1684 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1685 async fn test_fetch_begin_failure() {
1686 let mut network = MockNetwork::<MockDataSource>::init().await;
1688
1689 let port = reserve_tcp_port().unwrap();
1691 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1692 app.register_module(
1693 "availability",
1694 define_api(
1695 &Default::default(),
1696 MockBase::instance(),
1697 "1.0.0".parse().unwrap(),
1698 )
1699 .unwrap(),
1700 )
1701 .unwrap();
1702 network.spawn(
1703 "server",
1704 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1705 );
1706
1707 let provider = Provider::new(TrustedQueryServiceProvider::new(
1709 format!("http://localhost:{port}").parse().unwrap(),
1710 MockBase::instance(),
1711 ));
1712 let db = TmpDb::init().await;
1713 let storage = FailStorage::from(
1714 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1715 .await
1716 .unwrap(),
1717 );
1718 let data_source = FetchingDataSource::builder(storage, provider)
1719 .disable_proactive_fetching()
1720 .disable_aggregator()
1721 .with_min_retry_interval(Duration::from_millis(100))
1722 .build()
1723 .await
1724 .unwrap();
1725
1726 network.start().await;
1728
1729 let leaves = network.data_source().subscribe_leaves(1).await;
1731 let leaves = leaves.take(2).collect::<Vec<_>>().await;
1732
1733 let last_leaf = leaves.last().unwrap();
1735 let mut tx = data_source.write().await.unwrap();
1736 tx.insert_leaf(last_leaf).await.unwrap();
1737 tx.commit().await.unwrap();
1738
1739 tracing::info!("fetch with transaction failure");
1742 data_source
1743 .as_ref()
1744 .fail_one_begin_read_only(FailableAction::Any)
1745 .await;
1746 assert_eq!(leaves[0], data_source.get_leaf(1).await.await);
1747 }
1748
1749 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1750 async fn test_fetch_load_failure_block() {
1751 let mut network = MockNetwork::<MockDataSource>::init().await;
1753
1754 let port = reserve_tcp_port().unwrap();
1756 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1757 app.register_module(
1758 "availability",
1759 define_api(
1760 &Default::default(),
1761 MockBase::instance(),
1762 "1.0.0".parse().unwrap(),
1763 )
1764 .unwrap(),
1765 )
1766 .unwrap();
1767 network.spawn(
1768 "server",
1769 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1770 );
1771
1772 let provider = Provider::new(TrustedQueryServiceProvider::new(
1774 format!("http://localhost:{port}").parse().unwrap(),
1775 MockBase::instance(),
1776 ));
1777 let db = TmpDb::init().await;
1778 let storage = FailStorage::from(
1779 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1780 .await
1781 .unwrap(),
1782 );
1783 let data_source = FetchingDataSource::builder(storage, provider)
1784 .disable_proactive_fetching()
1785 .disable_aggregator()
1786 .with_min_retry_interval(Duration::from_millis(100))
1787 .build()
1788 .await
1789 .unwrap();
1790
1791 network.start().await;
1793
1794 let mut leaves = network.data_source().subscribe_leaves(1).await;
1796 let leaf = leaves.next().await.unwrap();
1797
1798 let mut tx = data_source.write().await.unwrap();
1801 tx.insert_leaf(&leaf).await.unwrap();
1802 tx.commit().await.unwrap();
1803
1804 tracing::info!("fetch with read failure");
1818 data_source
1819 .as_ref()
1820 .fail_one_read(FailableAction::GetHeader)
1821 .await;
1822 let fetch = data_source.get_block(leaf.block_hash()).await;
1823
1824 sleep(Duration::from_secs(2)).await;
1826 data_source.as_ref().pass().await;
1827
1828 let block: BlockQueryData<MockTypes> = fetch.await;
1829 assert_eq!(block.hash(), leaf.block_hash());
1830 }
1831
1832 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1833 async fn test_fetch_load_failure_tx() {
1834 let mut network = MockNetwork::<MockDataSource>::init().await;
1836
1837 let port = reserve_tcp_port().unwrap();
1839 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1840 app.register_module(
1841 "availability",
1842 define_api(
1843 &Default::default(),
1844 MockBase::instance(),
1845 "1.0.0".parse().unwrap(),
1846 )
1847 .unwrap(),
1848 )
1849 .unwrap();
1850 network.spawn(
1851 "server",
1852 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1853 );
1854
1855 let provider = Provider::new(TrustedQueryServiceProvider::new(
1857 format!("http://localhost:{port}").parse().unwrap(),
1858 MockBase::instance(),
1859 ));
1860 let db = TmpDb::init().await;
1861 let storage = FailStorage::from(
1862 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1863 .await
1864 .unwrap(),
1865 );
1866 let data_source = FetchingDataSource::builder(storage, provider)
1867 .disable_proactive_fetching()
1868 .disable_aggregator()
1869 .with_min_retry_interval(Duration::from_millis(100))
1870 .build()
1871 .await
1872 .unwrap();
1873
1874 network.start().await;
1876
1877 let tx = mock_transaction(vec![1, 2, 3]);
1879 network.submit_transaction(tx.clone()).await;
1880 let tx = network
1881 .data_source()
1882 .get_block_containing_transaction(tx.commit())
1883 .await
1884 .await;
1885
1886 {
1888 let leaf = network
1889 .data_source()
1890 .get_leaf(tx.transaction.block_height() as usize)
1891 .await
1892 .await;
1893 let block = network
1894 .data_source()
1895 .get_block(tx.transaction.block_height() as usize)
1896 .await
1897 .await;
1898 let mut tx = data_source.write().await.unwrap();
1899 tx.insert_leaf(&leaf).await.unwrap();
1900 tx.insert_block(&block).await.unwrap();
1901 tx.commit().await.unwrap();
1902 }
1903
1904 tracing::info!("fetch success");
1906 assert_eq!(
1907 tx,
1908 data_source
1909 .get_block_containing_transaction(tx.transaction.hash())
1910 .await
1911 .await
1912 );
1913
1914 tracing::info!("fetch with read failure");
1926 data_source
1927 .as_ref()
1928 .fail_one_read(FailableAction::Any)
1929 .await;
1930 let fetch = data_source
1931 .get_block_containing_transaction(tx.transaction.hash())
1932 .await;
1933
1934 assert_eq!(tx, fetch.await);
1935 }
1936
1937 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1938 async fn test_stream_begin_failure() {
1939 let mut network = MockNetwork::<MockDataSource>::init().await;
1941
1942 let port = reserve_tcp_port().unwrap();
1944 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1945 app.register_module(
1946 "availability",
1947 define_api(
1948 &Default::default(),
1949 MockBase::instance(),
1950 "1.0.0".parse().unwrap(),
1951 )
1952 .unwrap(),
1953 )
1954 .unwrap();
1955 network.spawn(
1956 "server",
1957 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1958 );
1959
1960 let provider = Provider::new(TrustedQueryServiceProvider::new(
1962 format!("http://localhost:{port}").parse().unwrap(),
1963 MockBase::instance(),
1964 ));
1965 let db = TmpDb::init().await;
1966 let storage = FailStorage::from(
1967 SqlStorage::connect(db.config(), StorageConnectionType::Query)
1968 .await
1969 .unwrap(),
1970 );
1971 let data_source = FetchingDataSource::builder(storage, provider)
1972 .disable_proactive_fetching()
1973 .disable_aggregator()
1974 .with_min_retry_interval(Duration::from_millis(100))
1975 .with_range_chunk_size(3)
1976 .build()
1977 .await
1978 .unwrap();
1979
1980 network.start().await;
1982
1983 let leaves = network.data_source().subscribe_leaves(1).await;
1985 let leaves = leaves.take(5).collect::<Vec<_>>().await;
1986
1987 let last_leaf = leaves.last().unwrap();
1989 let mut tx = data_source.write().await.unwrap();
1990 tx.insert_leaf(last_leaf).await.unwrap();
1991 tx.commit().await.unwrap();
1992
1993 tracing::info!("stream with transaction failure");
1996 data_source
1997 .as_ref()
1998 .fail_one_begin_read_only(FailableAction::Any)
1999 .await;
2000 assert_eq!(
2001 leaves,
2002 data_source
2003 .subscribe_leaves(1)
2004 .await
2005 .take(5)
2006 .collect::<Vec<_>>()
2007 .await
2008 );
2009 }
2010
2011 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2012 async fn test_stream_load_failure() {
2013 let mut network = MockNetwork::<MockDataSource>::init().await;
2015
2016 let port = reserve_tcp_port().unwrap();
2018 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2019 app.register_module(
2020 "availability",
2021 define_api(
2022 &Default::default(),
2023 MockBase::instance(),
2024 "1.0.0".parse().unwrap(),
2025 )
2026 .unwrap(),
2027 )
2028 .unwrap();
2029 network.spawn(
2030 "server",
2031 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2032 );
2033
2034 let provider = Provider::new(TrustedQueryServiceProvider::new(
2036 format!("http://localhost:{port}").parse().unwrap(),
2037 MockBase::instance(),
2038 ));
2039 let db = TmpDb::init().await;
2040 let storage = FailStorage::from(
2041 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2042 .await
2043 .unwrap(),
2044 );
2045 let data_source = FetchingDataSource::builder(storage, provider)
2046 .disable_proactive_fetching()
2047 .disable_aggregator()
2048 .with_min_retry_interval(Duration::from_millis(100))
2049 .with_range_chunk_size(3)
2050 .build()
2051 .await
2052 .unwrap();
2053
2054 network.start().await;
2056
2057 let leaves = network.data_source().subscribe_leaves(1).await;
2059 let leaves = leaves.take(5).collect::<Vec<_>>().await;
2060
2061 let last_leaf = leaves.last().unwrap();
2063 let mut tx = data_source.write().await.unwrap();
2064 tx.insert_leaf(last_leaf).await.unwrap();
2065 tx.commit().await.unwrap();
2066
2067 tracing::info!("stream with read failure");
2069 data_source.as_ref().fail_reads(FailableAction::Any).await;
2070 let fetches = data_source
2071 .get_block_range(1..=5)
2072 .await
2073 .collect::<Vec<_>>()
2074 .await;
2075
2076 sleep(Duration::from_secs(2)).await;
2078 data_source.as_ref().pass().await;
2079
2080 for (leaf, fetch) in leaves.iter().zip(fetches) {
2081 let block: BlockQueryData<MockTypes> = fetch.await;
2082 assert_eq!(block.hash(), leaf.block_hash());
2083 }
2084 }
2085
2086 enum MetadataType {
2087 Payload,
2088 Vid,
2089 }
2090
2091 async fn test_metadata_stream_begin_failure_helper(stream: MetadataType) {
2092 let mut network = MockNetwork::<MockDataSource>::init().await;
2094
2095 let port = reserve_tcp_port().unwrap();
2097 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2098 app.register_module(
2099 "availability",
2100 define_api(
2101 &Default::default(),
2102 MockBase::instance(),
2103 "1.0.0".parse().unwrap(),
2104 )
2105 .unwrap(),
2106 )
2107 .unwrap();
2108 network.spawn(
2109 "server",
2110 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2111 );
2112
2113 let provider = Provider::new(TrustedQueryServiceProvider::new(
2115 format!("http://localhost:{port}").parse().unwrap(),
2116 MockBase::instance(),
2117 ));
2118 let db = TmpDb::init().await;
2119 let storage = FailStorage::from(
2120 SqlStorage::connect(db.config(), StorageConnectionType::Query)
2121 .await
2122 .unwrap(),
2123 );
2124 let data_source = FetchingDataSource::builder(storage, provider)
2125 .disable_proactive_fetching()
2126 .disable_aggregator()
2127 .with_min_retry_interval(Duration::from_millis(100))
2128 .with_range_chunk_size(3)
2129 .build()
2130 .await
2131 .unwrap();
2132
2133 network.start().await;
2135
2136 let leaves = network.data_source().subscribe_leaves(1).await;
2138 let leaves = leaves.take(3).collect::<Vec<_>>().await;
2139
2140 let last_leaf = leaves.last().unwrap();
2142 let mut tx = data_source.write().await.unwrap();
2143 tx.insert_leaf(last_leaf).await.unwrap();
2144 tx.commit().await.unwrap();
2145
2146 let leaf = network.data_source().get_leaf(1).await.await;
2151 let block = network.data_source().get_block(1).await.await;
2152 let vid = network.data_source().get_vid_common(1).await.await;
2153 data_source
2154 .append(BlockInfo::new(leaf, Some(block), Some(vid), None))
2155 .await
2156 .unwrap();
2157
2158 tracing::info!("stream with transaction failure");
2160 data_source
2161 .as_ref()
2162 .fail_begins_read_only(FailableAction::Any)
2163 .await;
2164 match stream {
2165 MetadataType::Payload => {
2166 let payloads = data_source.subscribe_payload_metadata(1).await.take(3);
2167
2168 sleep(Duration::from_secs(2)).await;
2170 tracing::info!("stop failing transactions");
2171 data_source.as_ref().pass().await;
2172
2173 let payloads = payloads.collect::<Vec<_>>().await;
2174 for (leaf, payload) in leaves.iter().zip(payloads) {
2175 assert_eq!(payload.block_hash, leaf.block_hash());
2176 }
2177 },
2178 MetadataType::Vid => {
2179 let vids = data_source.subscribe_vid_common_metadata(1).await.take(3);
2180
2181 sleep(Duration::from_secs(2)).await;
2183 tracing::info!("stop failing transactions");
2184 data_source.as_ref().pass().await;
2185
2186 let vids = vids.collect::<Vec<_>>().await;
2187 for (leaf, vid) in leaves.iter().zip(vids) {
2188 assert_eq!(vid.block_hash, leaf.block_hash());
2189 }
2190 },
2191 }
2192 }
2193
2194 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2195 async fn test_metadata_stream_begin_failure_payload() {
2196 test_metadata_stream_begin_failure_helper(MetadataType::Payload).await
2197 }
2198
2199 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2200 async fn test_metadata_stream_begin_failure_vid() {
2201 test_metadata_stream_begin_failure_helper(MetadataType::Vid).await
2202 }
2203
2204 #[tokio::test(flavor = "multi_thread")]
2205 #[test_log::test]
2206 async fn test_ranged_fetch() {
2207 let mut network = MockNetwork::<MockDataSource>::init().await;
2209
2210 let port = reserve_tcp_port().unwrap();
2212 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
2213 app.register_module(
2214 "availability",
2215 define_api(
2216 &Default::default(),
2217 MockBase::instance(),
2218 "1.0.0".parse().unwrap(),
2219 )
2220 .unwrap(),
2221 )
2222 .unwrap();
2223 network.spawn(
2224 "server",
2225 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
2226 );
2227
2228 network.start().await;
2230
2231 let leaves = network
2233 .data_source()
2234 .subscribe_leaves(0)
2235 .await
2236 .take(5)
2237 .collect::<Vec<_>>()
2238 .await;
2239 let blocks = network
2240 .data_source()
2241 .subscribe_blocks(0)
2242 .await
2243 .take(5)
2244 .collect::<Vec<_>>()
2245 .await;
2246 let vid = network
2247 .data_source()
2248 .subscribe_vid_common(0)
2249 .await
2250 .take(5)
2251 .collect::<Vec<_>>()
2252 .await;
2253
2254 let provider = TrustedQueryServiceProvider::new(
2256 format!("http://localhost:{port}").parse().unwrap(),
2257 MockBase::instance(),
2258 );
2259
2260 tracing::info!("fetch leaf range");
2262 assert_eq!(
2263 provider
2264 .fetch(LeafRangeRequest { start: 0, end: 5 })
2265 .await
2266 .unwrap(),
2267 leaves
2268 );
2269 tracing::info!("fetch block range");
2270 assert_eq!(
2271 ProviderTrait::<MockTypes, _>::fetch(&provider, BlockRangeRequest { start: 0, end: 5 })
2272 .await
2273 .unwrap(),
2274 blocks
2275 );
2276 tracing::info!("fetch VID common range");
2277 assert_eq!(
2278 ProviderTrait::<MockTypes, _>::fetch(
2279 &provider,
2280 VidCommonRangeRequest { start: 0, end: 5 }
2281 )
2282 .await
2283 .unwrap(),
2284 vid
2285 );
2286 }
2287
2288 async fn old_server(port: u16) {
2290 let mut api = Api::<(), availability::Error, StaticVersion<1, 0>>::new(toml! {
2291 [route.get_vid_common]
2292 PATH = ["vid/common/:height"]
2293 ":height" = "Integer"
2294 })
2295 .unwrap();
2296
2297 api.get("get_vid_common", move |req, _| {
2298 async move {
2299 let mut common = VidCommonQueryData::<MockTypes>::genesis(
2300 &Default::default(),
2301 &Default::default(),
2302 TEST_VERSIONS.test.base,
2303 )
2304 .await;
2305 common.height = req.integer_param("height")?;
2306 Ok(common)
2307 }
2308 .boxed()
2309 })
2310 .unwrap();
2311
2312 let mut app = App::<(), Error>::with_state(());
2313 app.register_module("availability", api).unwrap();
2314 app.serve(format!("0.0.0.0:{port}"), MockBase::instance())
2315 .await
2316 .ok();
2317 }
2318
2319 #[tokio::test]
2320 #[test_log::test]
2321 async fn test_vid_common_fallback() {
2322 let port = reserve_tcp_port().unwrap();
2323 let _server = BackgroundTask::spawn("old server", old_server(port));
2324 let provider = TrustedQueryServiceProvider::new(
2325 format!("http://localhost:{port}").parse().unwrap(),
2326 StaticVersion::<1, 0>::instance(),
2327 );
2328
2329 let common = try_join_all((0..5).map(|i| {
2331 provider
2332 .client
2333 .get::<VidCommonQueryData<MockTypes>>(&format!("availability/vid/common/{i}"))
2334 .send()
2335 }))
2336 .await
2337 .unwrap();
2338
2339 let req = VidCommonRangeRequest { start: 0, end: 5 };
2341 assert_eq!(
2342 common.as_slice(),
2343 provider.fetch_vid_common_range(req).await.unwrap().as_ref()
2344 );
2345 }
2346}