1use std::{path::PathBuf, time::Duration};
30
31use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
32use hotshot_types::{
33 data::{Leaf, Leaf2, QuorumProposal, VidCommitment, VidCommon},
34 simple_certificate::QuorumCertificate,
35 traits::node_implementation::NodeType,
36};
37use serde::{Deserialize, Serialize};
38use snafu::OptionExt;
39use tide_disco::{Api, RequestParams, StatusCode, api::ApiError, method::ReadState};
40use vbs::version::StaticVersionType;
41
42use crate::{Header, Payload, api::load_api, types::HeightIndexed};
43
44pub(crate) mod data_source;
45mod fetch;
46pub(crate) mod query_data;
47pub use data_source::*;
48pub use fetch::Fetch;
49pub use hotshot_query_service_types::availability::Error;
50pub use query_data::*;
51
52#[derive(Debug)]
53pub struct Options {
54 pub api_path: Option<PathBuf>,
55
56 pub fetch_timeout: Duration,
62
63 pub extensions: Vec<toml::Value>,
68
69 pub small_object_range_limit: usize,
77
78 pub large_object_range_limit: usize,
86}
87
88impl Default for Options {
89 fn default() -> Self {
90 Self {
91 api_path: None,
92 fetch_timeout: Duration::from_millis(500),
93 extensions: vec![],
94 large_object_range_limit: 100,
95 small_object_range_limit: 500,
96 }
97 }
98}
99
100#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
101#[serde(bound = "")]
102pub struct Leaf1QueryData<Types: NodeType> {
103 pub(crate) leaf: Leaf<Types>,
104 pub(crate) qc: QuorumCertificate<Types>,
105}
106
107impl<Types: NodeType> Leaf1QueryData<Types> {
108 pub fn new(leaf: Leaf<Types>, qc: QuorumCertificate<Types>) -> Self {
109 Self { leaf, qc }
110 }
111
112 pub fn leaf(&self) -> &Leaf<Types> {
113 &self.leaf
114 }
115
116 pub fn qc(&self) -> &QuorumCertificate<Types> {
117 &self.qc
118 }
119}
120
121fn downgrade_leaf<Types: NodeType>(leaf2: Leaf2<Types>) -> Leaf<Types> {
122 let quorum_proposal = QuorumProposal {
128 block_header: leaf2.block_header().clone(),
129 view_number: leaf2.view_number(),
130 justify_qc: leaf2.justify_qc().to_qc(),
131 upgrade_certificate: leaf2.upgrade_certificate(),
132 proposal_certificate: None,
133 };
134 let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
135 if let Some(payload) = leaf2.block_payload() {
136 leaf.fill_block_payload_unchecked(payload);
137 }
138 leaf
139}
140
141fn downgrade_leaf_query_data<Types: NodeType>(leaf: LeafQueryData<Types>) -> Leaf1QueryData<Types> {
142 Leaf1QueryData {
143 leaf: downgrade_leaf(leaf.leaf),
144 qc: leaf.qc.to_qc(),
145 }
146}
147
148async fn get_leaf_handler<Types, State>(
149 req: tide_disco::RequestParams,
150 state: &State,
151 timeout: Duration,
152) -> Result<LeafQueryData<Types>, Error>
153where
154 State: 'static + Send + Sync + ReadState,
155 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
156 Types: NodeType,
157 Header<Types>: QueryableHeader<Types>,
158 Payload<Types>: QueryablePayload<Types>,
159{
160 let id = match req.opt_integer_param("height")? {
161 Some(height) => LeafId::Number(height),
162 None => LeafId::Hash(req.blob_param("hash")?),
163 };
164 let fetch = state.read(|state| state.get_leaf(id).boxed()).await;
165 fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
166 resource: id.to_string(),
167 })
168}
169
170async fn get_leaf_range_handler<Types, State>(
171 req: tide_disco::RequestParams,
172 state: &State,
173 timeout: Duration,
174 small_object_range_limit: usize,
175) -> Result<Vec<LeafQueryData<Types>>, Error>
176where
177 State: 'static + Send + Sync + ReadState,
178 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
179 Types: NodeType,
180 Header<Types>: QueryableHeader<Types>,
181 Payload<Types>: QueryablePayload<Types>,
182{
183 let from = req.integer_param::<_, usize>("from")?;
184 let until = req.integer_param("until")?;
185 enforce_range_limit(from, until, small_object_range_limit)?;
186
187 let leaves = state
188 .read(|state| state.get_leaf_range(from..until).boxed())
189 .await;
190 leaves
191 .enumerate()
192 .then(|(index, fetch)| async move {
193 fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
194 resource: (index + from).to_string(),
195 })
196 })
197 .try_collect::<Vec<_>>()
198 .await
199}
200
201fn downgrade_vid_common_query_data<Types: NodeType>(
202 data: VidCommonQueryData<Types>,
203) -> Option<ADVZCommonQueryData<Types>> {
204 let VidCommonQueryData {
205 height,
206 block_hash,
207 payload_hash: VidCommitment::V0(payload_hash),
208 common: VidCommon::V0(common),
209 } = data
210 else {
211 return None;
212 };
213 Some(ADVZCommonQueryData {
214 height,
215 block_hash,
216 payload_hash,
217 common,
218 })
219}
220
221async fn get_vid_common_handler<Types, State>(
222 req: tide_disco::RequestParams,
223 state: &State,
224 timeout: Duration,
225) -> Result<VidCommonQueryData<Types>, Error>
226where
227 State: 'static + Send + Sync + ReadState,
228 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
229 Types: NodeType,
230 Header<Types>: QueryableHeader<Types>,
231 Payload<Types>: QueryablePayload<Types>,
232{
233 let id = if let Some(height) = req.opt_integer_param("height")? {
234 BlockId::Number(height)
235 } else if let Some(hash) = req.opt_blob_param("hash")? {
236 BlockId::Hash(hash)
237 } else {
238 BlockId::PayloadHash(req.blob_param("payload-hash")?)
239 };
240 let fetch = state.read(|state| state.get_vid_common(id).boxed()).await;
241 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
242 resource: id.to_string(),
243 })
244}
245
246async fn get_vid_common_range_handler<Types, State>(
247 req: tide_disco::RequestParams,
248 state: &State,
249 limit: usize,
250 timeout: Duration,
251) -> Result<Vec<VidCommonQueryData<Types>>, Error>
252where
253 State: 'static + Send + Sync + ReadState,
254 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
255 Types: NodeType,
256 Header<Types>: QueryableHeader<Types>,
257 Payload<Types>: QueryablePayload<Types>,
258{
259 let from = req.integer_param("from")?;
260 let until = req.integer_param("until")?;
261 enforce_range_limit(from, until, limit)?;
262
263 let vid = state
264 .read(|state| state.get_vid_common_range(from..until).boxed())
265 .await;
266 vid.enumerate()
267 .then(|(index, fetch)| async move {
268 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
269 resource: (index + from).to_string(),
270 })
271 })
272 .try_collect::<Vec<_>>()
273 .await
274}
275
276pub fn define_api<State, Types: NodeType, Ver: StaticVersionType + 'static>(
277 options: &Options,
278 _: Ver,
279 api_ver: semver::Version,
280) -> Result<Api<State, Error, Ver>, ApiError>
281where
282 State: 'static + Send + Sync + ReadState,
283 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
284 Header<Types>: QueryableHeader<Types>,
285 Payload<Types>: QueryablePayload<Types>,
286{
287 let mut api = load_api::<State, Error, Ver>(
288 options.api_path.as_ref(),
289 include_str!("../api/availability.toml"),
290 options.extensions.clone(),
291 )?;
292 let timeout = options.fetch_timeout;
293 let small_object_range_limit = options.small_object_range_limit;
294 let large_object_range_limit = options.large_object_range_limit;
295
296 api.with_version(api_ver.clone());
297
298 if api_ver.major == 0 {
306 api.at("get_leaf", move |req, state| {
307 get_leaf_handler(req, state, timeout)
308 .map(|res| res.map(downgrade_leaf_query_data))
309 .boxed()
310 })?;
311
312 api.at("get_leaf_range", move |req, state| {
313 get_leaf_range_handler(req, state, timeout, small_object_range_limit)
314 .map(|res| {
315 res.map(|r| {
316 r.into_iter()
317 .map(downgrade_leaf_query_data)
318 .collect::<Vec<Leaf1QueryData<_>>>()
319 })
320 })
321 .boxed()
322 })?;
323
324 api.stream("stream_leaves", move |req, state| {
325 async move {
326 let height = req.integer_param("height")?;
327 state
328 .read(|state| {
329 async move {
330 Ok(state
331 .subscribe_leaves(height)
332 .await
333 .map(|leaf| Ok(downgrade_leaf_query_data(leaf))))
334 }
335 .boxed()
336 })
337 .await
338 }
339 .try_flatten_stream()
340 .boxed()
341 })?;
342 } else {
343 api.at("get_leaf", move |req, state| {
344 get_leaf_handler(req, state, timeout).boxed()
345 })?;
346
347 api.at("get_leaf_range", move |req, state| {
348 get_leaf_range_handler(req, state, timeout, small_object_range_limit).boxed()
349 })?;
350
351 api.stream("stream_leaves", move |req, state| {
352 async move {
353 let height = req.integer_param("height")?;
354 state
355 .read(|state| {
356 async move { Ok(state.subscribe_leaves(height).await.map(Ok)) }.boxed()
357 })
358 .await
359 }
360 .try_flatten_stream()
361 .boxed()
362 })?;
363 }
364
365 if api_ver.major == 0 {
368 api.at("get_vid_common", move |req, state| {
369 get_vid_common_handler(req, state, timeout)
370 .map(|r| match r {
371 Ok(data) => downgrade_vid_common_query_data(data).ok_or(Error::Custom {
372 message: "Incompatible VID version.".to_string(),
373 status: StatusCode::BAD_REQUEST,
374 }),
375 Err(e) => Err(e),
376 })
377 .boxed()
378 })?
379 .at("get_vid_common_range", move |req, state| {
380 get_vid_common_range_handler(req, state, small_object_range_limit, timeout)
381 .map(|r| match r {
382 Ok(data) => data
383 .into_iter()
384 .map(downgrade_vid_common_query_data)
385 .collect::<Option<Vec<_>>>()
386 .ok_or(Error::Custom {
387 message: "Incompatible VID version.".to_string(),
388 status: StatusCode::BAD_REQUEST,
389 }),
390 Err(e) => Err(e),
391 })
392 .boxed()
393 })?
394 .stream("stream_vid_common", move |req, state| {
395 async move {
396 let height = req.integer_param("height")?;
397 state
398 .read(|state| {
399 async move {
400 Ok(state.subscribe_vid_common(height).await.map(|data| {
401 downgrade_vid_common_query_data(data).ok_or(Error::Custom {
402 message: "Incompatible VID version.".to_string(),
403 status: StatusCode::BAD_REQUEST,
404 })
405 }))
406 }
407 .boxed()
408 })
409 .await
410 }
411 .try_flatten_stream()
412 .boxed()
413 })?;
414 } else {
415 api.at("get_vid_common", move |req, state| {
416 get_vid_common_handler(req, state, timeout).boxed().boxed()
417 })?
418 .at("get_vid_common_range", move |req, state| {
419 get_vid_common_range_handler(req, state, small_object_range_limit, timeout)
420 .boxed()
421 .boxed()
422 })?
423 .stream("stream_vid_common", move |req, state| {
424 async move {
425 let height = req.integer_param("height")?;
426 state
427 .read(|state| {
428 async move { Ok(state.subscribe_vid_common(height).await.map(Ok)) }.boxed()
429 })
430 .await
431 }
432 .try_flatten_stream()
433 .boxed()
434 })?;
435 }
436
437 api.at("get_header", move |req, state| {
438 async move {
439 let id = if let Some(height) = req.opt_integer_param("height")? {
440 BlockId::Number(height)
441 } else if let Some(hash) = req.opt_blob_param("hash")? {
442 BlockId::Hash(hash)
443 } else {
444 BlockId::PayloadHash(req.blob_param("payload-hash")?)
445 };
446 let fetch = state.read(|state| state.get_header(id).boxed()).await;
447 fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
448 resource: id.to_string(),
449 })
450 }
451 .boxed()
452 })?
453 .at("get_header_range", move |req, state| {
454 async move {
455 let from = req.integer_param::<_, usize>("from")?;
456 let until = req.integer_param::<_, usize>("until")?;
457 enforce_range_limit(from, until, large_object_range_limit)?;
458
459 let headers = state
460 .read(|state| state.get_header_range(from..until).boxed())
461 .await;
462 headers
463 .enumerate()
464 .then(|(index, fetch)| async move {
465 fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
466 resource: (index + from).to_string(),
467 })
468 })
469 .try_collect::<Vec<_>>()
470 .await
471 }
472 .boxed()
473 })?
474 .stream("stream_headers", move |req, state| {
475 async move {
476 let height = req.integer_param("height")?;
477 state
478 .read(|state| {
479 async move { Ok(state.subscribe_headers(height).await.map(Ok)) }.boxed()
480 })
481 .await
482 }
483 .try_flatten_stream()
484 .boxed()
485 })?
486 .at("get_block", move |req, state| {
487 async move {
488 let id = if let Some(height) = req.opt_integer_param("height")? {
489 BlockId::Number(height)
490 } else if let Some(hash) = req.opt_blob_param("hash")? {
491 BlockId::Hash(hash)
492 } else {
493 BlockId::PayloadHash(req.blob_param("payload-hash")?)
494 };
495 let fetch = state.read(|state| state.get_block(id).boxed()).await;
496 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
497 resource: id.to_string(),
498 })
499 }
500 .boxed()
501 })?
502 .at("get_block_range", move |req, state| {
503 async move {
504 let from = req.integer_param::<_, usize>("from")?;
505 let until = req.integer_param("until")?;
506 enforce_range_limit(from, until, large_object_range_limit)?;
507
508 let blocks = state
509 .read(|state| state.get_block_range(from..until).boxed())
510 .await;
511 blocks
512 .enumerate()
513 .then(|(index, fetch)| async move {
514 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
515 resource: (index + from).to_string(),
516 })
517 })
518 .try_collect::<Vec<_>>()
519 .await
520 }
521 .boxed()
522 })?
523 .stream("stream_blocks", move |req, state| {
524 async move {
525 let height = req.integer_param("height")?;
526 state
527 .read(|state| {
528 async move { Ok(state.subscribe_blocks(height).await.map(Ok)) }.boxed()
529 })
530 .await
531 }
532 .try_flatten_stream()
533 .boxed()
534 })?
535 .at("get_payload", move |req, state| {
536 async move {
537 let id = if let Some(height) = req.opt_integer_param("height")? {
538 BlockId::Number(height)
539 } else if let Some(hash) = req.opt_blob_param("hash")? {
540 BlockId::PayloadHash(hash)
541 } else {
542 BlockId::Hash(req.blob_param("block-hash")?)
543 };
544 let fetch = state.read(|state| state.get_payload(id).boxed()).await;
545 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
546 resource: id.to_string(),
547 })
548 }
549 .boxed()
550 })?
551 .at("get_payload_range", move |req, state| {
552 async move {
553 let from = req.integer_param::<_, usize>("from")?;
554 let until = req.integer_param("until")?;
555 enforce_range_limit(from, until, large_object_range_limit)?;
556
557 let payloads = state
558 .read(|state| state.get_payload_range(from..until).boxed())
559 .await;
560 payloads
561 .enumerate()
562 .then(|(index, fetch)| async move {
563 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
564 resource: (index + from).to_string(),
565 })
566 })
567 .try_collect::<Vec<_>>()
568 .await
569 }
570 .boxed()
571 })?
572 .stream("stream_payloads", move |req, state| {
573 async move {
574 let height = req.integer_param("height")?;
575 state
576 .read(|state| {
577 async move { Ok(state.subscribe_payloads(height).await.map(Ok)) }.boxed()
578 })
579 .await
580 }
581 .try_flatten_stream()
582 .boxed()
583 })?
584 .at("get_transaction_proof", move |req, state| {
585 async move {
586 let tx = get_transaction(req, state, timeout).await?;
587 let height = tx.block.height();
588 let vid = state
589 .read(|state| state.get_vid_common(height as usize))
590 .await
591 .with_timeout(timeout)
592 .await
593 .context(FetchBlockSnafu {
594 resource: height.to_string(),
595 })?;
596 let proof = tx.block.transaction_proof(&vid, &tx.index).context(
597 InvalidTransactionIndexSnafu {
598 height,
599 index: tx.transaction.index(),
600 },
601 )?;
602 Ok(TransactionWithProofQueryData::new(tx.transaction, proof))
603 }
604 .boxed()
605 })?
606 .at("get_transaction", move |req, state| {
607 async move { Ok(get_transaction(req, state, timeout).await?.transaction) }.boxed()
608 })?
609 .stream("stream_transactions", move |req, state| {
610 async move {
611 let height = req.integer_param::<_, usize>("height")?;
612
613 let namespace: Option<i64> = req
614 .opt_integer_param::<_, usize>("namespace")?
615 .map(|i| {
616 i.try_into().map_err(|err| Error::Custom {
617 message: format!(
618 "Invalid 'namespace': could not convert usize to i64: {err}"
619 ),
620 status: StatusCode::BAD_REQUEST,
621 })
622 })
623 .transpose()?;
624
625 state
626 .read(|state| {
627 async move {
628 Ok(state
629 .subscribe_blocks(height)
630 .await
631 .map(move |block| {
632 let transactions = block.enumerate().enumerate();
633 let header = block.header();
634 let filtered_txs = transactions
635 .filter_map(|(i, (index, _tx))| {
636 if let Some(requested_ns) = namespace {
637 let ns_id = QueryableHeader::<Types>::namespace_id(
638 header,
639 &index.ns_index,
640 )?;
641
642 if ns_id.into() != requested_ns {
643 return None;
644 }
645 }
646
647 let tx = block.transaction(&index)?;
648 TransactionQueryData::new(tx, &block, &index, i as u64)
649 })
650 .collect::<Vec<_>>();
651
652 futures::stream::iter(filtered_txs.into_iter().map(Ok))
653 })
654 .flatten())
655 }
656 .boxed()
657 })
658 .await
659 }
660 .try_flatten_stream()
661 .boxed()
662 })?
663 .at("get_block_summary", move |req, state| {
664 async move {
665 let id: usize = req.integer_param("height")?;
666
667 let fetch = state.read(|state| state.get_block(id).boxed()).await;
668 fetch
669 .with_timeout(timeout)
670 .await
671 .context(FetchBlockSnafu {
672 resource: id.to_string(),
673 })
674 .map(BlockSummaryQueryData::from)
675 }
676 .boxed()
677 })?
678 .at("get_block_summary_range", move |req, state| {
679 async move {
680 let from: usize = req.integer_param("from")?;
681 let until: usize = req.integer_param("until")?;
682 enforce_range_limit(from, until, large_object_range_limit)?;
683
684 let blocks = state
685 .read(|state| state.get_block_range(from..until).boxed())
686 .await;
687 let result: Vec<BlockSummaryQueryData<Types>> = blocks
688 .enumerate()
689 .then(|(index, fetch)| async move {
690 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
691 resource: (index + from).to_string(),
692 })
693 })
694 .map(|result| result.map(BlockSummaryQueryData::from))
695 .try_collect()
696 .await?;
697
698 Ok(result)
699 }
700 .boxed()
701 })?
702 .at("get_limits", move |_req, _state| {
703 async move {
704 Ok(Limits {
705 small_object_range_limit,
706 large_object_range_limit,
707 })
708 }
709 .boxed()
710 })?
711 .get("get_cert2", |req, state| {
712 async move {
713 let height: u64 = req.integer_param("height")?;
714 state.get_cert2(height).await.map_err(|err| err.into())
715 }
716 .boxed()
717 })?;
718 Ok(api)
719}
720
721fn enforce_range_limit(from: usize, until: usize, limit: usize) -> Result<(), Error> {
722 if until.saturating_sub(from) > limit {
723 return Err(Error::RangeLimit { from, until, limit });
724 }
725 Ok(())
726}
727
728async fn get_transaction<Types, State>(
729 req: RequestParams,
730 state: &State,
731 timeout: Duration,
732) -> Result<BlockWithTransaction<Types>, Error>
733where
734 Types: NodeType,
735 Header<Types>: QueryableHeader<Types>,
736 Payload<Types>: QueryablePayload<Types>,
737 State: 'static + Send + Sync + ReadState,
738 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
739{
740 match req.opt_blob_param("hash")? {
741 Some(hash) => state
742 .read(|state| state.get_block_containing_transaction(hash).boxed())
743 .await
744 .with_timeout(timeout)
745 .await
746 .context(FetchTransactionSnafu {
747 resource: hash.to_string(),
748 }),
749 None => {
750 let height: u64 = req.integer_param("height")?;
751 let fetch = state
752 .read(|state| state.get_block(height as usize).boxed())
753 .await;
754 let block = fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
755 resource: height.to_string(),
756 })?;
757 let i: u64 = req.integer_param("index")?;
758 let index = block
759 .payload()
760 .nth(block.metadata(), i as usize)
761 .context(InvalidTransactionIndexSnafu { height, index: i })?;
762 let transaction = block
763 .transaction(&index)
764 .context(InvalidTransactionIndexSnafu { height, index: i })?;
765 let transaction = TransactionQueryData::new(transaction, &block, &index, i)
766 .context(InvalidTransactionIndexSnafu { height, index: i })?;
767 Ok(BlockWithTransaction {
768 transaction,
769 block,
770 index,
771 })
772 },
773 }
774}
775
776#[cfg(test)]
777mod test {
778 use std::{fmt::Debug, time::Duration};
779
780 use async_lock::RwLock;
781 use committable::Committable;
782 use futures::future::FutureExt;
783 use hotshot_example_types::node_types::TEST_VERSIONS;
784 use hotshot_types::{data::Leaf2, simple_certificate::QuorumCertificate2};
785 use serde::de::DeserializeOwned;
786 use surf_disco::{Client, Error as _};
787 use tempfile::TempDir;
788 use test_utils::reserve_tcp_port;
789 use tide_disco::App;
790 use toml::toml;
791
792 use super::*;
793 use crate::{
794 ApiState, Error, Header,
795 data_source::{ExtensibleDataSource, VersionedDataSource, storage::AvailabilityStorage},
796 status::StatusDataSource,
797 task::BackgroundTask,
798 testing::{
799 consensus::{MockDataSource, MockNetwork, MockSqlDataSource},
800 mocks::{MOCK_UPGRADE, MockBase, MockHeader, MockPayload, MockTypes, mock_transaction},
801 },
802 types::HeightIndexed,
803 };
804
805 async fn get_non_empty_blocks(
807 client: &Client<Error, MockBase>,
808 ) -> (
809 u64,
810 Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>,
811 ) {
812 let mut blocks = vec![];
813 for i in 1.. {
815 match client
816 .get::<BlockQueryData<MockTypes>>(&format!("block/{i}"))
817 .send()
818 .await
819 {
820 Ok(block) => {
821 if !block.is_empty() {
822 let leaf = client.get(&format!("leaf/{i}")).send().await.unwrap();
823 blocks.push((leaf, block));
824 }
825 },
826 Err(Error::Availability {
827 source: super::Error::FetchBlock { .. },
828 }) => {
829 tracing::info!(
830 "found end of ledger at height {i}, non-empty blocks are {blocks:?}",
831 );
832 return (i, blocks);
833 },
834 Err(err) => panic!("unexpected error {err}"),
835 }
836 }
837 unreachable!()
838 }
839
840 async fn validate(client: &Client<Error, MockBase>, height: u64) {
841 for i in 0..height {
843 if ![0, 1, height / 2, height - 1].contains(&i) {
846 continue;
847 }
848 tracing::info!("validate block {i}/{height}");
849
850 let leaf: LeafQueryData<MockTypes> =
852 client.get(&format!("leaf/{i}")).send().await.unwrap();
853 assert_eq!(leaf.height(), i);
854 assert_eq!(
855 leaf,
856 client
857 .get(&format!("leaf/hash/{}", leaf.hash()))
858 .send()
859 .await
860 .unwrap()
861 );
862
863 let block: BlockQueryData<MockTypes> =
865 client.get(&format!("block/{i}")).send().await.unwrap();
866 let expected_payload = PayloadQueryData::from(block.clone());
867 assert_eq!(leaf.block_hash(), block.hash());
868 assert_eq!(block.height(), i);
869 assert_eq!(
870 block,
871 client
872 .get(&format!("block/hash/{}", block.hash()))
873 .send()
874 .await
875 .unwrap()
876 );
877 assert_eq!(
878 *block.header(),
879 client.get(&format!("header/{i}")).send().await.unwrap()
880 );
881 assert_eq!(
882 *block.header(),
883 client
884 .get(&format!("header/hash/{}", block.hash()))
885 .send()
886 .await
887 .unwrap()
888 );
889 assert_eq!(
890 expected_payload,
891 client.get(&format!("payload/{i}")).send().await.unwrap(),
892 );
893 assert_eq!(
894 expected_payload,
895 client
896 .get(&format!("payload/block-hash/{}", block.hash()))
897 .send()
898 .await
899 .unwrap(),
900 );
901 let common: VidCommonQueryData<MockTypes> = client
903 .get(&format!("vid/common/{}", block.height()))
904 .send()
905 .await
906 .unwrap();
907 assert_eq!(common.height(), block.height());
908 assert_eq!(common.block_hash(), block.hash());
909 assert_eq!(common.payload_hash(), block.payload_hash());
910 assert_eq!(
911 common,
912 client
913 .get(&format!("vid/common/hash/{}", block.hash()))
914 .send()
915 .await
916 .unwrap()
917 );
918
919 let block_summary = client
920 .get(&format!("block/summary/{i}"))
921 .send()
922 .await
923 .unwrap();
924 assert_eq!(
925 BlockSummaryQueryData::<MockTypes>::from(block.clone()),
926 block_summary,
927 );
928 assert_eq!(block_summary.header(), block.header());
929 assert_eq!(block_summary.hash(), block.hash());
930 assert_eq!(block_summary.size(), block.size());
931 assert_eq!(block_summary.num_transactions(), block.num_transactions());
932
933 let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
934 .get(&format!("block/summaries/{}/{}", 0, i))
935 .send()
936 .await
937 .unwrap();
938 assert_eq!(block_summaries.len() as u64, i);
939
940 assert_eq!(
945 block.payload(),
946 client
947 .get::<BlockQueryData<MockTypes>>(&format!(
948 "block/payload-hash/{}",
949 block.payload_hash()
950 ))
951 .send()
952 .await
953 .unwrap()
954 .payload()
955 );
956 assert_eq!(
957 block.payload_hash(),
958 client
959 .get::<Header<MockTypes>>(&format!(
960 "header/payload-hash/{}",
961 block.payload_hash()
962 ))
963 .send()
964 .await
965 .unwrap()
966 .payload_commitment
967 );
968 assert_eq!(
969 block.payload(),
970 client
971 .get::<PayloadQueryData<MockTypes>>(&format!(
972 "payload/hash/{}",
973 block.payload_hash()
974 ))
975 .send()
976 .await
977 .unwrap()
978 .data(),
979 );
980 assert_eq!(
981 common.common(),
982 client
983 .get::<VidCommonQueryData<MockTypes>>(&format!(
984 "vid/common/payload-hash/{}",
985 block.payload_hash()
986 ))
987 .send()
988 .await
989 .unwrap()
990 .common()
991 );
992
993 for (j, txn_from_block) in block.enumerate() {
996 let txn: TransactionQueryData<MockTypes> = client
997 .get(&format!("transaction/{}/{}/noproof", i, j.position))
998 .send()
999 .await
1000 .unwrap();
1001 assert_eq!(txn.block_height(), i);
1002 assert_eq!(txn.block_hash(), block.hash());
1003 assert_eq!(txn.index(), j.position as u64);
1004 assert_eq!(txn.hash(), txn_from_block.commit());
1005 assert_eq!(txn.transaction(), &txn_from_block);
1006 assert_eq!(
1011 txn.hash(),
1012 client
1013 .get::<TransactionQueryData<MockTypes>>(&format!(
1014 "transaction/hash/{}/noproof",
1015 txn.hash()
1016 ))
1017 .send()
1018 .await
1019 .unwrap()
1020 .hash()
1021 );
1022
1023 let tx_with_proof = client
1024 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1025 "transaction/{}/{}/proof",
1026 i, j.position
1027 ))
1028 .send()
1029 .await
1030 .unwrap();
1031 assert_eq!(txn.hash(), tx_with_proof.hash());
1032 assert!(tx_with_proof.proof().verify(
1033 block.metadata(),
1034 txn.transaction(),
1035 &block.payload_hash(),
1036 common.common()
1037 ));
1038
1039 let tx_with_proof = client
1041 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1042 "transaction/hash/{}/proof",
1043 txn.hash()
1044 ))
1045 .send()
1046 .await
1047 .unwrap();
1048 assert_eq!(txn.hash(), tx_with_proof.hash());
1049 assert!(tx_with_proof.proof().verify(
1050 block.metadata(),
1051 txn.transaction(),
1052 &block.payload_hash(),
1053 common.common()
1054 ));
1055 }
1056
1057 let block_range: Vec<BlockQueryData<MockTypes>> = client
1058 .get(&format!("block/{}/{}", 0, i))
1059 .send()
1060 .await
1061 .unwrap();
1062
1063 assert_eq!(block_range.len() as u64, i);
1064
1065 let leaf_range: Vec<LeafQueryData<MockTypes>> = client
1066 .get(&format!("leaf/{}/{}", 0, i))
1067 .send()
1068 .await
1069 .unwrap();
1070
1071 assert_eq!(leaf_range.len() as u64, i);
1072
1073 let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1074 .get(&format!("payload/{}/{}", 0, i))
1075 .send()
1076 .await
1077 .unwrap();
1078
1079 assert_eq!(payload_range.len() as u64, i);
1080
1081 let vid_common_range: Vec<VidCommonQueryData<MockTypes>> = client
1082 .get(&format!("vid/common/{}/{}", 0, i))
1083 .send()
1084 .await
1085 .unwrap();
1086
1087 assert_eq!(vid_common_range.len() as u64, i);
1088
1089 let header_range: Vec<Header<MockTypes>> = client
1090 .get(&format!("header/{}/{}", 0, i))
1091 .send()
1092 .await
1093 .unwrap();
1094
1095 assert_eq!(header_range.len() as u64, i);
1096 }
1097 }
1098
1099 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1100 async fn test_api() {
1101 let mut network = MockNetwork::<MockDataSource>::init().await;
1103 network.start().await;
1104
1105 let port = reserve_tcp_port().unwrap();
1107 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1108 let options = Options {
1109 small_object_range_limit: 500,
1110 large_object_range_limit: 500,
1111 ..Default::default()
1112 };
1113
1114 app.register_module(
1115 "availability",
1116 define_api(&options, MockBase::instance(), "1.0.0".parse().unwrap()).unwrap(),
1117 )
1118 .unwrap();
1119 network.spawn(
1120 "server",
1121 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1122 );
1123
1124 let client = Client::<Error, MockBase>::new(
1126 format!("http://localhost:{port}/availability")
1127 .parse()
1128 .unwrap(),
1129 );
1130 assert!(client.connect(Some(Duration::from_secs(60))).await);
1131 assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1132
1133 let leaves = client
1136 .socket("stream/leaves/0")
1137 .subscribe::<LeafQueryData<MockTypes>>()
1138 .await
1139 .unwrap();
1140 let headers = client
1141 .socket("stream/headers/0")
1142 .subscribe::<Header<MockTypes>>()
1143 .await
1144 .unwrap();
1145 let blocks = client
1146 .socket("stream/blocks/0")
1147 .subscribe::<BlockQueryData<MockTypes>>()
1148 .await
1149 .unwrap();
1150 let vid_common = client
1151 .socket("stream/vid/common/0")
1152 .subscribe::<VidCommonQueryData<MockTypes>>()
1153 .await
1154 .unwrap();
1155 let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1156 for nonce in 0..3 {
1157 let txn = mock_transaction(vec![nonce]);
1158 network.submit_transaction(txn).await;
1159
1160 let (i, leaf, block, common) = loop {
1162 tracing::info!("waiting for block with transaction {}", nonce);
1163 let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1164 tracing::info!(i, ?leaf, ?header, ?block, ?common);
1165 let leaf = leaf.unwrap();
1166 let header = header.unwrap();
1167 let block = block.unwrap();
1168 let common = common.unwrap();
1169 assert_eq!(leaf.height() as usize, i);
1170 assert_eq!(leaf.block_hash(), block.hash());
1171 assert_eq!(block.header(), &header);
1172 assert_eq!(common.height() as usize, i);
1173 if !block.is_empty() {
1174 break (i, leaf, block, common);
1175 }
1176 };
1177 assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1178 assert_eq!(
1179 block,
1180 client.get(&format!("block/{i}")).send().await.unwrap()
1181 );
1182 assert_eq!(
1183 common,
1184 client.get(&format!("vid/common/{i}")).send().await.unwrap()
1185 );
1186
1187 validate(&client, (i + 1) as u64).await;
1188 }
1189
1190 network.shut_down().await;
1191 }
1192
1193 async fn validate_old(client: &Client<Error, MockBase>, height: u64) {
1194 for i in 0..height {
1196 if ![0, 1, height / 2, height - 1].contains(&i) {
1199 continue;
1200 }
1201 tracing::info!("validate block {i}/{height}");
1202
1203 let leaf: Leaf1QueryData<MockTypes> =
1205 client.get(&format!("leaf/{i}")).send().await.unwrap();
1206 assert_eq!(leaf.leaf.height(), i);
1207 assert_eq!(
1208 leaf,
1209 client
1210 .get(&format!(
1211 "leaf/hash/{}",
1212 <Leaf<MockTypes> as Committable>::commit(&leaf.leaf)
1213 ))
1214 .send()
1215 .await
1216 .unwrap()
1217 );
1218
1219 let block: BlockQueryData<MockTypes> =
1221 client.get(&format!("block/{i}")).send().await.unwrap();
1222 let expected_payload = PayloadQueryData::from(block.clone());
1223 assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1224 assert_eq!(block.height(), i);
1225 assert_eq!(
1226 block,
1227 client
1228 .get(&format!("block/hash/{}", block.hash()))
1229 .send()
1230 .await
1231 .unwrap()
1232 );
1233 assert_eq!(
1234 *block.header(),
1235 client.get(&format!("header/{i}")).send().await.unwrap()
1236 );
1237 assert_eq!(
1238 *block.header(),
1239 client
1240 .get(&format!("header/hash/{}", block.hash()))
1241 .send()
1242 .await
1243 .unwrap()
1244 );
1245 assert_eq!(
1246 expected_payload,
1247 client.get(&format!("payload/{i}")).send().await.unwrap(),
1248 );
1249 assert_eq!(
1250 expected_payload,
1251 client
1252 .get(&format!("payload/block-hash/{}", block.hash()))
1253 .send()
1254 .await
1255 .unwrap(),
1256 );
1257 let common: ADVZCommonQueryData<MockTypes> = client
1259 .get(&format!("vid/common/{}", block.height()))
1260 .send()
1261 .await
1262 .unwrap();
1263 assert_eq!(common.height(), block.height());
1264 assert_eq!(common.block_hash(), block.hash());
1265 assert_eq!(
1266 VidCommitment::V0(common.payload_hash()),
1267 block.payload_hash(),
1268 );
1269 assert_eq!(
1270 common,
1271 client
1272 .get(&format!("vid/common/hash/{}", block.hash()))
1273 .send()
1274 .await
1275 .unwrap()
1276 );
1277
1278 let block_summary = client
1279 .get(&format!("block/summary/{i}"))
1280 .send()
1281 .await
1282 .unwrap();
1283 assert_eq!(
1284 BlockSummaryQueryData::<MockTypes>::from(block.clone()),
1285 block_summary,
1286 );
1287 assert_eq!(block_summary.header(), block.header());
1288 assert_eq!(block_summary.hash(), block.hash());
1289 assert_eq!(block_summary.size(), block.size());
1290 assert_eq!(block_summary.num_transactions(), block.num_transactions());
1291
1292 let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
1293 .get(&format!("block/summaries/{}/{}", 0, i))
1294 .send()
1295 .await
1296 .unwrap();
1297 assert_eq!(block_summaries.len() as u64, i);
1298
1299 assert_eq!(
1304 block.payload(),
1305 client
1306 .get::<BlockQueryData<MockTypes>>(&format!(
1307 "block/payload-hash/{}",
1308 block.payload_hash()
1309 ))
1310 .send()
1311 .await
1312 .unwrap()
1313 .payload()
1314 );
1315 assert_eq!(
1316 block.payload_hash(),
1317 client
1318 .get::<Header<MockTypes>>(&format!(
1319 "header/payload-hash/{}",
1320 block.payload_hash()
1321 ))
1322 .send()
1323 .await
1324 .unwrap()
1325 .payload_commitment
1326 );
1327 assert_eq!(
1328 block.payload(),
1329 client
1330 .get::<PayloadQueryData<MockTypes>>(&format!(
1331 "payload/hash/{}",
1332 block.payload_hash()
1333 ))
1334 .send()
1335 .await
1336 .unwrap()
1337 .data(),
1338 );
1339 assert_eq!(
1340 common.common(),
1341 client
1342 .get::<ADVZCommonQueryData<MockTypes>>(&format!(
1343 "vid/common/payload-hash/{}",
1344 block.payload_hash()
1345 ))
1346 .send()
1347 .await
1348 .unwrap()
1349 .common()
1350 );
1351
1352 for (j, txn_from_block) in block.enumerate() {
1355 let txn: TransactionQueryData<MockTypes> = client
1356 .get(&format!("transaction/{}/{}/noproof", i, j.position))
1357 .send()
1358 .await
1359 .unwrap();
1360 assert_eq!(txn.block_height(), i);
1361 assert_eq!(txn.block_hash(), block.hash());
1362 assert_eq!(txn.index(), j.position as u64);
1363 assert_eq!(txn.hash(), txn_from_block.commit());
1364 assert_eq!(txn.transaction(), &txn_from_block);
1365 assert_eq!(
1370 txn.hash(),
1371 client
1372 .get::<TransactionQueryData<MockTypes>>(&format!(
1373 "transaction/hash/{}/noproof",
1374 txn.hash()
1375 ))
1376 .send()
1377 .await
1378 .unwrap()
1379 .hash()
1380 );
1381
1382 assert_eq!(
1383 txn.hash(),
1384 client
1385 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1386 "transaction/{}/{}/proof",
1387 i, j.position
1388 ))
1389 .send()
1390 .await
1391 .unwrap()
1392 .hash()
1393 );
1394
1395 assert_eq!(
1396 txn.hash(),
1397 client
1398 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1399 "transaction/hash/{}/proof",
1400 txn.hash()
1401 ))
1402 .send()
1403 .await
1404 .unwrap()
1405 .hash()
1406 );
1407 }
1408
1409 let block_range: Vec<BlockQueryData<MockTypes>> = client
1410 .get(&format!("block/{}/{}", 0, i))
1411 .send()
1412 .await
1413 .unwrap();
1414
1415 assert_eq!(block_range.len() as u64, i);
1416
1417 let leaf_range: Vec<Leaf1QueryData<MockTypes>> = client
1418 .get(&format!("leaf/{}/{}", 0, i))
1419 .send()
1420 .await
1421 .unwrap();
1422
1423 assert_eq!(leaf_range.len() as u64, i);
1424
1425 let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1426 .get(&format!("payload/{}/{}", 0, i))
1427 .send()
1428 .await
1429 .unwrap();
1430
1431 assert_eq!(payload_range.len() as u64, i);
1432
1433 let vid_common_range: Vec<ADVZCommonQueryData<MockTypes>> = client
1434 .get(&format!("vid/common/{}/{}", 0, i))
1435 .send()
1436 .await
1437 .unwrap();
1438
1439 assert_eq!(vid_common_range.len() as u64, i);
1440
1441 let header_range: Vec<Header<MockTypes>> = client
1442 .get(&format!("header/{}/{}", 0, i))
1443 .send()
1444 .await
1445 .unwrap();
1446
1447 assert_eq!(header_range.len() as u64, i);
1448 }
1449 }
1450
1451 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1452 async fn test_api_epochs() {
1453 let mut network = MockNetwork::<MockDataSource>::init().await;
1455 let epoch_height = network.epoch_height();
1456 network.start().await;
1457
1458 let port = reserve_tcp_port().unwrap();
1460 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1461 app.register_module(
1462 "availability",
1463 define_api(
1464 &Default::default(),
1465 MockBase::instance(),
1466 "1.0.0".parse().unwrap(),
1467 )
1468 .unwrap(),
1469 )
1470 .unwrap();
1471 network.spawn(
1472 "server",
1473 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1474 );
1475
1476 let client = Client::<Error, MockBase>::new(
1478 format!("http://localhost:{port}/availability")
1479 .parse()
1480 .unwrap(),
1481 );
1482 assert!(client.connect(Some(Duration::from_secs(60))).await);
1483
1484 let headers = client
1487 .socket("stream/headers/0")
1488 .subscribe::<Header<MockTypes>>()
1489 .await
1490 .unwrap();
1491 let mut chain = headers.enumerate();
1492
1493 loop {
1494 let (i, header) = chain.next().await.unwrap();
1495 let header = header.unwrap();
1496 assert_eq!(header.height(), i as u64);
1497 if header.height() >= 3 * epoch_height {
1498 break;
1499 }
1500 }
1501
1502 network.shut_down().await;
1503 }
1504
1505 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1506 async fn test_old_api() {
1507 let mut network = MockNetwork::<MockDataSource>::init().await;
1509 network.start().await;
1510
1511 let port = reserve_tcp_port().unwrap();
1513
1514 let options = Options {
1515 small_object_range_limit: 500,
1516 large_object_range_limit: 500,
1517 ..Default::default()
1518 };
1519
1520 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1521 app.register_module(
1522 "availability",
1523 define_api(&options, MockBase::instance(), "0.1.0".parse().unwrap()).unwrap(),
1524 )
1525 .unwrap();
1526 network.spawn(
1527 "server",
1528 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1529 );
1530
1531 let client = Client::<Error, MockBase>::new(
1533 format!("http://localhost:{port}/availability")
1534 .parse()
1535 .unwrap(),
1536 );
1537 assert!(client.connect(Some(Duration::from_secs(60))).await);
1538 assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1539
1540 let leaves = client
1543 .socket("stream/leaves/0")
1544 .subscribe::<Leaf1QueryData<MockTypes>>()
1545 .await
1546 .unwrap();
1547 let headers = client
1548 .socket("stream/headers/0")
1549 .subscribe::<Header<MockTypes>>()
1550 .await
1551 .unwrap();
1552 let blocks = client
1553 .socket("stream/blocks/0")
1554 .subscribe::<BlockQueryData<MockTypes>>()
1555 .await
1556 .unwrap();
1557 let vid_common = client
1558 .socket("stream/vid/common/0")
1559 .subscribe::<ADVZCommonQueryData<MockTypes>>()
1560 .await
1561 .unwrap();
1562 let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1563 for nonce in 0..3 {
1564 let txn = mock_transaction(vec![nonce]);
1565 network.submit_transaction(txn).await;
1566
1567 let (i, leaf, block, common) = loop {
1569 tracing::info!("waiting for block with transaction {}", nonce);
1570 let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1571 tracing::info!(i, ?leaf, ?header, ?block, ?common);
1572 let leaf = leaf.unwrap();
1573 let header = header.unwrap();
1574 let block = block.unwrap();
1575 let common = common.unwrap();
1576 assert_eq!(leaf.leaf.height() as usize, i);
1577 assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1578 assert_eq!(block.header(), &header);
1579 assert_eq!(common.height() as usize, i);
1580 if !block.is_empty() {
1581 break (i, leaf, block, common);
1582 }
1583 };
1584 assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1585 assert_eq!(
1586 block,
1587 client.get(&format!("block/{i}")).send().await.unwrap()
1588 );
1589 assert_eq!(
1590 common,
1591 client.get(&format!("vid/common/{i}")).send().await.unwrap()
1592 );
1593
1594 validate_old(&client, (i + 1) as u64).await;
1595 }
1596
1597 network.shut_down().await;
1598 }
1599
1600 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1601 async fn test_extensions() {
1602 let dir = TempDir::with_prefix("test_availability_extensions").unwrap();
1603 let data_source = ExtensibleDataSource::new(
1604 MockDataSource::create(dir.path(), Default::default())
1605 .await
1606 .unwrap(),
1607 0,
1608 );
1609
1610 let leaf = Leaf2::<MockTypes>::genesis(
1612 &Default::default(),
1613 &Default::default(),
1614 MOCK_UPGRADE.base,
1615 )
1616 .await;
1617 let qc = QuorumCertificate2::genesis(
1618 &Default::default(),
1619 &Default::default(),
1620 TEST_VERSIONS.test,
1621 )
1622 .await;
1623 let leaf = LeafQueryData::new(leaf, qc).unwrap();
1624 let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis());
1625 data_source
1626 .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1627 .await
1628 .unwrap();
1629
1630 assert_eq!(
1632 ExtensibleDataSource::<MockDataSource, u64>::block_height(&data_source)
1633 .await
1634 .unwrap(),
1635 1
1636 );
1637 assert_eq!(block, data_source.get_block(0).await.await);
1638
1639 let extensions = toml! {
1641 [route.post_ext]
1642 PATH = ["/ext/:val"]
1643 METHOD = "POST"
1644 ":val" = "Integer"
1645
1646 [route.get_ext]
1647 PATH = ["/ext"]
1648 METHOD = "GET"
1649 };
1650
1651 let mut api =
1652 define_api::<RwLock<ExtensibleDataSource<MockDataSource, u64>>, MockTypes, MockBase>(
1653 &Options {
1654 extensions: vec![extensions.into()],
1655 ..Default::default()
1656 },
1657 MockBase::instance(),
1658 "1.0.0".parse().unwrap(),
1659 )
1660 .unwrap();
1661 api.get("get_ext", |_, state| {
1662 async move { Ok(*state.as_ref()) }.boxed()
1663 })
1664 .unwrap()
1665 .post("post_ext", |req, state| {
1666 async move {
1667 *state.as_mut() = req.integer_param("val")?;
1668 Ok(())
1669 }
1670 .boxed()
1671 })
1672 .unwrap();
1673
1674 let mut app = App::<_, Error>::with_state(RwLock::new(data_source));
1675 app.register_module("availability", api).unwrap();
1676
1677 let port = reserve_tcp_port().unwrap();
1678 let _server = BackgroundTask::spawn(
1679 "server",
1680 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1681 );
1682
1683 let client = Client::<Error, MockBase>::new(
1684 format!("http://localhost:{port}/availability")
1685 .parse()
1686 .unwrap(),
1687 );
1688 assert!(client.connect(Some(Duration::from_secs(60))).await);
1689
1690 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 0);
1691 client.post::<()>("ext/42").send().await.unwrap();
1692 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 42);
1693
1694 assert_eq!(
1696 client
1697 .get::<MockHeader>("header/0")
1698 .send()
1699 .await
1700 .unwrap()
1701 .block_number,
1702 0
1703 );
1704 }
1705
1706 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1707 async fn test_range_limit() {
1708 let large_object_range_limit = 2;
1709 let small_object_range_limit = 3;
1710
1711 let mut network = MockNetwork::<MockDataSource>::init().await;
1713 network.start().await;
1714
1715 let port = reserve_tcp_port().unwrap();
1717 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1718 app.register_module(
1719 "availability",
1720 define_api(
1721 &Options {
1722 large_object_range_limit,
1723 small_object_range_limit,
1724 ..Default::default()
1725 },
1726 MockBase::instance(),
1727 "1.0.0".parse().unwrap(),
1728 )
1729 .unwrap(),
1730 )
1731 .unwrap();
1732 network.spawn(
1733 "server",
1734 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1735 );
1736
1737 let client = Client::<Error, MockBase>::new(
1739 format!("http://localhost:{port}/availability")
1740 .parse()
1741 .unwrap(),
1742 );
1743 assert!(client.connect(Some(Duration::from_secs(60))).await);
1744
1745 assert_eq!(
1747 client.get::<Limits>("limits").send().await.unwrap(),
1748 Limits {
1749 small_object_range_limit,
1750 large_object_range_limit
1751 }
1752 );
1753
1754 client
1756 .socket("stream/blocks/0")
1757 .subscribe::<BlockQueryData<MockTypes>>()
1758 .await
1759 .unwrap()
1760 .take(small_object_range_limit + 1)
1761 .try_collect::<Vec<_>>()
1762 .await
1763 .unwrap();
1764
1765 async fn check_limit<T: DeserializeOwned + Debug>(
1766 client: &Client<Error, MockBase>,
1767 req: &str,
1768 limit: usize,
1769 ) {
1770 let range: Vec<T> = client
1771 .get(&format!("{req}/0/{limit}"))
1772 .send()
1773 .await
1774 .unwrap();
1775 assert_eq!(range.len(), limit);
1776 let err = client
1777 .get::<Vec<T>>(&format!("{req}/0/{}", limit + 1))
1778 .send()
1779 .await
1780 .unwrap_err();
1781 assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1782 }
1783
1784 check_limit::<LeafQueryData<MockTypes>>(&client, "leaf", small_object_range_limit).await;
1785 check_limit::<Header<MockTypes>>(&client, "header", large_object_range_limit).await;
1786 check_limit::<BlockQueryData<MockTypes>>(&client, "block", large_object_range_limit).await;
1787 check_limit::<PayloadQueryData<MockTypes>>(&client, "payload", large_object_range_limit)
1788 .await;
1789 check_limit::<BlockSummaryQueryData<MockTypes>>(
1790 &client,
1791 "block/summaries",
1792 large_object_range_limit,
1793 )
1794 .await;
1795
1796 network.shut_down().await;
1797 }
1798
1799 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1800 async fn test_header_endpoint() {
1801 let mut network = MockNetwork::<MockSqlDataSource>::init().await;
1803 network.start().await;
1804
1805 let port = reserve_tcp_port().unwrap();
1807 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1808 app.register_module(
1809 "availability",
1810 define_api(
1811 &Default::default(),
1812 MockBase::instance(),
1813 "1.0.0".parse().unwrap(),
1814 )
1815 .unwrap(),
1816 )
1817 .unwrap();
1818 network.spawn(
1819 "server",
1820 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1821 );
1822
1823 let ds = network.data_source();
1824
1825 let block_height = ds.block_height().await.unwrap();
1828 let fetch = ds
1829 .get_header(BlockId::<MockTypes>::Number(block_height + 25))
1830 .await;
1831
1832 assert!(fetch.is_pending());
1833 let header = fetch.await;
1834 assert_eq!(header.height() as usize, block_height + 25);
1835
1836 network.shut_down().await;
1837 }
1838
1839 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1840 async fn test_leaf_only_ds() {
1841 let mut network = MockNetwork::<MockSqlDataSource>::init_with_leaf_ds().await;
1843 network.start().await;
1844
1845 let port = reserve_tcp_port().unwrap();
1847 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1848 app.register_module(
1849 "availability",
1850 define_api(
1851 &Default::default(),
1852 MockBase::instance(),
1853 "1.0.0".parse().unwrap(),
1854 )
1855 .unwrap(),
1856 )
1857 .unwrap();
1858 network.spawn(
1859 "server",
1860 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1861 );
1862
1863 let client = Client::<Error, MockBase>::new(
1865 format!("http://localhost:{port}/availability")
1866 .parse()
1867 .unwrap(),
1868 );
1869 assert!(client.connect(Some(Duration::from_secs(60))).await);
1870
1871 client
1873 .socket("stream/headers/0")
1874 .subscribe::<Header<MockTypes>>()
1875 .await
1876 .unwrap()
1877 .take(5)
1878 .try_collect::<Vec<_>>()
1879 .await
1880 .unwrap();
1881
1882 client
1884 .socket("stream/leaves/5")
1885 .subscribe::<LeafQueryData<MockTypes>>()
1886 .await
1887 .unwrap()
1888 .take(5)
1889 .try_collect::<Vec<_>>()
1890 .await
1891 .unwrap();
1892
1893 let ds = network.data_source();
1894
1895 let block_height = ds.block_height().await.unwrap();
1899 let target_block_height = block_height + 20;
1900 let fetch = ds
1901 .get_block(BlockId::<MockTypes>::Number(target_block_height))
1902 .await;
1903
1904 assert!(fetch.is_pending());
1905 let block = fetch.await;
1906 assert_eq!(block.height() as usize, target_block_height);
1907
1908 let mut tx = ds.read().await.unwrap();
1909 tx.get_block(BlockId::<MockTypes>::Number(target_block_height))
1910 .await
1911 .unwrap_err();
1912 drop(tx);
1913
1914 network.shut_down().await;
1915 }
1916}