1use std::{fmt::Display, path::PathBuf, time::Duration};
30
31use derive_more::From;
32use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
33use hotshot_types::{
34 data::{Leaf, Leaf2, QuorumProposal, VidCommitment, VidCommon},
35 simple_certificate::QuorumCertificate,
36 traits::node_implementation::NodeType,
37};
38use serde::{Deserialize, Serialize};
39use snafu::{OptionExt, Snafu};
40use tide_disco::{Api, RequestError, RequestParams, StatusCode, api::ApiError, method::ReadState};
41use vbs::version::StaticVersionType;
42
43use crate::{Header, Payload, QueryError, api::load_api, types::HeightIndexed};
44
45pub(crate) mod data_source;
46mod fetch;
47pub(crate) mod query_data;
48pub use data_source::*;
49pub use fetch::Fetch;
50pub use query_data::*;
51
52#[derive(Debug)]
53pub struct Options {
54 pub api_path: Option<PathBuf>,
55
56 pub fetch_timeout: Duration,
62
63 pub extensions: Vec<toml::Value>,
68
69 pub small_object_range_limit: usize,
77
78 pub large_object_range_limit: usize,
86}
87
88impl Default for Options {
89 fn default() -> Self {
90 Self {
91 api_path: None,
92 fetch_timeout: Duration::from_millis(500),
93 extensions: vec![],
94 large_object_range_limit: 100,
95 small_object_range_limit: 500,
96 }
97 }
98}
99
100#[derive(Clone, Debug, From, Snafu, Deserialize, Serialize)]
101#[snafu(visibility(pub))]
102pub enum Error {
103 Request {
104 source: RequestError,
105 },
106 #[snafu(display("leaf {resource} missing or not available"))]
107 #[from(ignore)]
108 FetchLeaf {
109 resource: String,
110 },
111 #[snafu(display("block {resource} missing or not available"))]
112 #[from(ignore)]
113 FetchBlock {
114 resource: String,
115 },
116 #[snafu(display("header {resource} missing or not available"))]
117 #[from(ignore)]
118 FetchHeader {
119 resource: String,
120 },
121 #[snafu(display("transaction {resource} missing or not available"))]
122 #[from(ignore)]
123 FetchTransaction {
124 resource: String,
125 },
126 #[snafu(display("transaction index {index} out of range for block {height}"))]
127 #[from(ignore)]
128 InvalidTransactionIndex {
129 height: u64,
130 index: u64,
131 },
132 #[snafu(display("request for range {from}..{until} exceeds limit {limit}"))]
133 #[from(ignore)]
134 RangeLimit {
135 from: usize,
136 until: usize,
137 limit: usize,
138 },
139 #[snafu(display("{source}"))]
140 Query {
141 source: QueryError,
142 },
143 #[snafu(display("State cert for epoch {epoch} not found"))]
144 #[from(ignore)]
145 FetchStateCert {
146 epoch: u64,
147 },
148 #[snafu(display("error {status}: {message}"))]
149 Custom {
150 message: String,
151 status: StatusCode,
152 },
153}
154
155impl Error {
156 pub fn internal<M: Display>(message: M) -> Self {
157 Self::Custom {
158 message: message.to_string(),
159 status: StatusCode::INTERNAL_SERVER_ERROR,
160 }
161 }
162
163 pub fn status(&self) -> StatusCode {
164 match self {
165 Self::Request { .. } | Self::RangeLimit { .. } => StatusCode::BAD_REQUEST,
166 Self::FetchLeaf { .. }
167 | Self::FetchBlock { .. }
168 | Self::FetchTransaction { .. }
169 | Self::FetchHeader { .. }
170 | Self::FetchStateCert { .. } => StatusCode::NOT_FOUND,
171 Self::InvalidTransactionIndex { .. } | Self::Query { .. } => StatusCode::NOT_FOUND,
172 Self::Custom { status, .. } => *status,
173 }
174 }
175}
176
177#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
178#[serde(bound = "")]
179pub struct Leaf1QueryData<Types: NodeType> {
180 pub(crate) leaf: Leaf<Types>,
181 pub(crate) qc: QuorumCertificate<Types>,
182}
183
184impl<Types: NodeType> Leaf1QueryData<Types> {
185 pub fn new(leaf: Leaf<Types>, qc: QuorumCertificate<Types>) -> Self {
186 Self { leaf, qc }
187 }
188
189 pub fn leaf(&self) -> &Leaf<Types> {
190 &self.leaf
191 }
192
193 pub fn qc(&self) -> &QuorumCertificate<Types> {
194 &self.qc
195 }
196}
197
198fn downgrade_leaf<Types: NodeType>(leaf2: Leaf2<Types>) -> Leaf<Types> {
199 let quorum_proposal = QuorumProposal {
205 block_header: leaf2.block_header().clone(),
206 view_number: leaf2.view_number(),
207 justify_qc: leaf2.justify_qc().to_qc(),
208 upgrade_certificate: leaf2.upgrade_certificate(),
209 proposal_certificate: None,
210 };
211 let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
212 if let Some(payload) = leaf2.block_payload() {
213 leaf.fill_block_payload_unchecked(payload);
214 }
215 leaf
216}
217
218fn downgrade_leaf_query_data<Types: NodeType>(leaf: LeafQueryData<Types>) -> Leaf1QueryData<Types> {
219 Leaf1QueryData {
220 leaf: downgrade_leaf(leaf.leaf),
221 qc: leaf.qc.to_qc(),
222 }
223}
224
225async fn get_leaf_handler<Types, State>(
226 req: tide_disco::RequestParams,
227 state: &State,
228 timeout: Duration,
229) -> Result<LeafQueryData<Types>, Error>
230where
231 State: 'static + Send + Sync + ReadState,
232 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
233 Types: NodeType,
234 Header<Types>: QueryableHeader<Types>,
235 Payload<Types>: QueryablePayload<Types>,
236{
237 let id = match req.opt_integer_param("height")? {
238 Some(height) => LeafId::Number(height),
239 None => LeafId::Hash(req.blob_param("hash")?),
240 };
241 let fetch = state.read(|state| state.get_leaf(id).boxed()).await;
242 fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
243 resource: id.to_string(),
244 })
245}
246
247async fn get_leaf_range_handler<Types, State>(
248 req: tide_disco::RequestParams,
249 state: &State,
250 timeout: Duration,
251 small_object_range_limit: usize,
252) -> Result<Vec<LeafQueryData<Types>>, Error>
253where
254 State: 'static + Send + Sync + ReadState,
255 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
256 Types: NodeType,
257 Header<Types>: QueryableHeader<Types>,
258 Payload<Types>: QueryablePayload<Types>,
259{
260 let from = req.integer_param::<_, usize>("from")?;
261 let until = req.integer_param("until")?;
262 enforce_range_limit(from, until, small_object_range_limit)?;
263
264 let leaves = state
265 .read(|state| state.get_leaf_range(from..until).boxed())
266 .await;
267 leaves
268 .enumerate()
269 .then(|(index, fetch)| async move {
270 fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
271 resource: (index + from).to_string(),
272 })
273 })
274 .try_collect::<Vec<_>>()
275 .await
276}
277
278fn downgrade_vid_common_query_data<Types: NodeType>(
279 data: VidCommonQueryData<Types>,
280) -> Option<ADVZCommonQueryData<Types>> {
281 let VidCommonQueryData {
282 height,
283 block_hash,
284 payload_hash: VidCommitment::V0(payload_hash),
285 common: VidCommon::V0(common),
286 } = data
287 else {
288 return None;
289 };
290 Some(ADVZCommonQueryData {
291 height,
292 block_hash,
293 payload_hash,
294 common,
295 })
296}
297
298async fn get_vid_common_handler<Types, State>(
299 req: tide_disco::RequestParams,
300 state: &State,
301 timeout: Duration,
302) -> Result<VidCommonQueryData<Types>, Error>
303where
304 State: 'static + Send + Sync + ReadState,
305 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
306 Types: NodeType,
307 Header<Types>: QueryableHeader<Types>,
308 Payload<Types>: QueryablePayload<Types>,
309{
310 let id = if let Some(height) = req.opt_integer_param("height")? {
311 BlockId::Number(height)
312 } else if let Some(hash) = req.opt_blob_param("hash")? {
313 BlockId::Hash(hash)
314 } else {
315 BlockId::PayloadHash(req.blob_param("payload-hash")?)
316 };
317 let fetch = state.read(|state| state.get_vid_common(id).boxed()).await;
318 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
319 resource: id.to_string(),
320 })
321}
322
323async fn get_vid_common_range_handler<Types, State>(
324 req: tide_disco::RequestParams,
325 state: &State,
326 limit: usize,
327 timeout: Duration,
328) -> Result<Vec<VidCommonQueryData<Types>>, Error>
329where
330 State: 'static + Send + Sync + ReadState,
331 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
332 Types: NodeType,
333 Header<Types>: QueryableHeader<Types>,
334 Payload<Types>: QueryablePayload<Types>,
335{
336 let from = req.integer_param("from")?;
337 let until = req.integer_param("until")?;
338 enforce_range_limit(from, until, limit)?;
339
340 let vid = state
341 .read(|state| state.get_vid_common_range(from..until).boxed())
342 .await;
343 vid.enumerate()
344 .then(|(index, fetch)| async move {
345 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
346 resource: (index + from).to_string(),
347 })
348 })
349 .try_collect::<Vec<_>>()
350 .await
351}
352
353pub fn define_api<State, Types: NodeType, Ver: StaticVersionType + 'static>(
354 options: &Options,
355 _: Ver,
356 api_ver: semver::Version,
357) -> Result<Api<State, Error, Ver>, ApiError>
358where
359 State: 'static + Send + Sync + ReadState,
360 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
361 Header<Types>: QueryableHeader<Types>,
362 Payload<Types>: QueryablePayload<Types>,
363{
364 let mut api = load_api::<State, Error, Ver>(
365 options.api_path.as_ref(),
366 include_str!("../api/availability.toml"),
367 options.extensions.clone(),
368 )?;
369 let timeout = options.fetch_timeout;
370 let small_object_range_limit = options.small_object_range_limit;
371 let large_object_range_limit = options.large_object_range_limit;
372
373 api.with_version(api_ver.clone());
374
375 if api_ver.major == 0 {
383 api.at("get_leaf", move |req, state| {
384 get_leaf_handler(req, state, timeout)
385 .map(|res| res.map(downgrade_leaf_query_data))
386 .boxed()
387 })?;
388
389 api.at("get_leaf_range", move |req, state| {
390 get_leaf_range_handler(req, state, timeout, small_object_range_limit)
391 .map(|res| {
392 res.map(|r| {
393 r.into_iter()
394 .map(downgrade_leaf_query_data)
395 .collect::<Vec<Leaf1QueryData<_>>>()
396 })
397 })
398 .boxed()
399 })?;
400
401 api.stream("stream_leaves", move |req, state| {
402 async move {
403 let height = req.integer_param("height")?;
404 state
405 .read(|state| {
406 async move {
407 Ok(state
408 .subscribe_leaves(height)
409 .await
410 .map(|leaf| Ok(downgrade_leaf_query_data(leaf))))
411 }
412 .boxed()
413 })
414 .await
415 }
416 .try_flatten_stream()
417 .boxed()
418 })?;
419 } else {
420 api.at("get_leaf", move |req, state| {
421 get_leaf_handler(req, state, timeout).boxed()
422 })?;
423
424 api.at("get_leaf_range", move |req, state| {
425 get_leaf_range_handler(req, state, timeout, small_object_range_limit).boxed()
426 })?;
427
428 api.stream("stream_leaves", move |req, state| {
429 async move {
430 let height = req.integer_param("height")?;
431 state
432 .read(|state| {
433 async move { Ok(state.subscribe_leaves(height).await.map(Ok)) }.boxed()
434 })
435 .await
436 }
437 .try_flatten_stream()
438 .boxed()
439 })?;
440 }
441
442 if api_ver.major == 0 {
445 api.at("get_vid_common", move |req, state| {
446 get_vid_common_handler(req, state, timeout)
447 .map(|r| match r {
448 Ok(data) => downgrade_vid_common_query_data(data).ok_or(Error::Custom {
449 message: "Incompatible VID version.".to_string(),
450 status: StatusCode::BAD_REQUEST,
451 }),
452 Err(e) => Err(e),
453 })
454 .boxed()
455 })?
456 .at("get_vid_common_range", move |req, state| {
457 get_vid_common_range_handler(req, state, small_object_range_limit, timeout)
458 .map(|r| match r {
459 Ok(data) => data
460 .into_iter()
461 .map(downgrade_vid_common_query_data)
462 .collect::<Option<Vec<_>>>()
463 .ok_or(Error::Custom {
464 message: "Incompatible VID version.".to_string(),
465 status: StatusCode::BAD_REQUEST,
466 }),
467 Err(e) => Err(e),
468 })
469 .boxed()
470 })?
471 .stream("stream_vid_common", move |req, state| {
472 async move {
473 let height = req.integer_param("height")?;
474 state
475 .read(|state| {
476 async move {
477 Ok(state.subscribe_vid_common(height).await.map(|data| {
478 downgrade_vid_common_query_data(data).ok_or(Error::Custom {
479 message: "Incompatible VID version.".to_string(),
480 status: StatusCode::BAD_REQUEST,
481 })
482 }))
483 }
484 .boxed()
485 })
486 .await
487 }
488 .try_flatten_stream()
489 .boxed()
490 })?;
491 } else {
492 api.at("get_vid_common", move |req, state| {
493 get_vid_common_handler(req, state, timeout).boxed().boxed()
494 })?
495 .at("get_vid_common_range", move |req, state| {
496 get_vid_common_range_handler(req, state, small_object_range_limit, timeout)
497 .boxed()
498 .boxed()
499 })?
500 .stream("stream_vid_common", move |req, state| {
501 async move {
502 let height = req.integer_param("height")?;
503 state
504 .read(|state| {
505 async move { Ok(state.subscribe_vid_common(height).await.map(Ok)) }.boxed()
506 })
507 .await
508 }
509 .try_flatten_stream()
510 .boxed()
511 })?;
512 }
513
514 api.at("get_header", move |req, state| {
515 async move {
516 let id = if let Some(height) = req.opt_integer_param("height")? {
517 BlockId::Number(height)
518 } else if let Some(hash) = req.opt_blob_param("hash")? {
519 BlockId::Hash(hash)
520 } else {
521 BlockId::PayloadHash(req.blob_param("payload-hash")?)
522 };
523 let fetch = state.read(|state| state.get_header(id).boxed()).await;
524 fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
525 resource: id.to_string(),
526 })
527 }
528 .boxed()
529 })?
530 .at("get_header_range", move |req, state| {
531 async move {
532 let from = req.integer_param::<_, usize>("from")?;
533 let until = req.integer_param::<_, usize>("until")?;
534 enforce_range_limit(from, until, large_object_range_limit)?;
535
536 let headers = state
537 .read(|state| state.get_header_range(from..until).boxed())
538 .await;
539 headers
540 .enumerate()
541 .then(|(index, fetch)| async move {
542 fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
543 resource: (index + from).to_string(),
544 })
545 })
546 .try_collect::<Vec<_>>()
547 .await
548 }
549 .boxed()
550 })?
551 .stream("stream_headers", move |req, state| {
552 async move {
553 let height = req.integer_param("height")?;
554 state
555 .read(|state| {
556 async move { Ok(state.subscribe_headers(height).await.map(Ok)) }.boxed()
557 })
558 .await
559 }
560 .try_flatten_stream()
561 .boxed()
562 })?
563 .at("get_block", move |req, state| {
564 async move {
565 let id = if let Some(height) = req.opt_integer_param("height")? {
566 BlockId::Number(height)
567 } else if let Some(hash) = req.opt_blob_param("hash")? {
568 BlockId::Hash(hash)
569 } else {
570 BlockId::PayloadHash(req.blob_param("payload-hash")?)
571 };
572 let fetch = state.read(|state| state.get_block(id).boxed()).await;
573 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
574 resource: id.to_string(),
575 })
576 }
577 .boxed()
578 })?
579 .at("get_block_range", move |req, state| {
580 async move {
581 let from = req.integer_param::<_, usize>("from")?;
582 let until = req.integer_param("until")?;
583 enforce_range_limit(from, until, large_object_range_limit)?;
584
585 let blocks = state
586 .read(|state| state.get_block_range(from..until).boxed())
587 .await;
588 blocks
589 .enumerate()
590 .then(|(index, fetch)| async move {
591 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
592 resource: (index + from).to_string(),
593 })
594 })
595 .try_collect::<Vec<_>>()
596 .await
597 }
598 .boxed()
599 })?
600 .stream("stream_blocks", move |req, state| {
601 async move {
602 let height = req.integer_param("height")?;
603 state
604 .read(|state| {
605 async move { Ok(state.subscribe_blocks(height).await.map(Ok)) }.boxed()
606 })
607 .await
608 }
609 .try_flatten_stream()
610 .boxed()
611 })?
612 .at("get_payload", move |req, state| {
613 async move {
614 let id = if let Some(height) = req.opt_integer_param("height")? {
615 BlockId::Number(height)
616 } else if let Some(hash) = req.opt_blob_param("hash")? {
617 BlockId::PayloadHash(hash)
618 } else {
619 BlockId::Hash(req.blob_param("block-hash")?)
620 };
621 let fetch = state.read(|state| state.get_payload(id).boxed()).await;
622 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
623 resource: id.to_string(),
624 })
625 }
626 .boxed()
627 })?
628 .at("get_payload_range", move |req, state| {
629 async move {
630 let from = req.integer_param::<_, usize>("from")?;
631 let until = req.integer_param("until")?;
632 enforce_range_limit(from, until, large_object_range_limit)?;
633
634 let payloads = state
635 .read(|state| state.get_payload_range(from..until).boxed())
636 .await;
637 payloads
638 .enumerate()
639 .then(|(index, fetch)| async move {
640 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
641 resource: (index + from).to_string(),
642 })
643 })
644 .try_collect::<Vec<_>>()
645 .await
646 }
647 .boxed()
648 })?
649 .stream("stream_payloads", move |req, state| {
650 async move {
651 let height = req.integer_param("height")?;
652 state
653 .read(|state| {
654 async move { Ok(state.subscribe_payloads(height).await.map(Ok)) }.boxed()
655 })
656 .await
657 }
658 .try_flatten_stream()
659 .boxed()
660 })?
661 .at("get_transaction_proof", move |req, state| {
662 async move {
663 let tx = get_transaction(req, state, timeout).await?;
664 let height = tx.block.height();
665 let vid = state
666 .read(|state| state.get_vid_common(height as usize))
667 .await
668 .with_timeout(timeout)
669 .await
670 .context(FetchBlockSnafu {
671 resource: height.to_string(),
672 })?;
673 let proof = tx.block.transaction_proof(&vid, &tx.index).context(
674 InvalidTransactionIndexSnafu {
675 height,
676 index: tx.transaction.index(),
677 },
678 )?;
679 Ok(TransactionWithProofQueryData::new(tx.transaction, proof))
680 }
681 .boxed()
682 })?
683 .at("get_transaction", move |req, state| {
684 async move { Ok(get_transaction(req, state, timeout).await?.transaction) }.boxed()
685 })?
686 .stream("stream_transactions", move |req, state| {
687 async move {
688 let height = req.integer_param::<_, usize>("height")?;
689
690 let namespace: Option<i64> = req
691 .opt_integer_param::<_, usize>("namespace")?
692 .map(|i| {
693 i.try_into().map_err(|err| Error::Custom {
694 message: format!(
695 "Invalid 'namespace': could not convert usize to i64: {err}"
696 ),
697 status: StatusCode::BAD_REQUEST,
698 })
699 })
700 .transpose()?;
701
702 state
703 .read(|state| {
704 async move {
705 Ok(state
706 .subscribe_blocks(height)
707 .await
708 .map(move |block| {
709 let transactions = block.enumerate().enumerate();
710 let header = block.header();
711 let filtered_txs = transactions
712 .filter_map(|(i, (index, _tx))| {
713 if let Some(requested_ns) = namespace {
714 let ns_id = QueryableHeader::<Types>::namespace_id(
715 header,
716 &index.ns_index,
717 )?;
718
719 if ns_id.into() != requested_ns {
720 return None;
721 }
722 }
723
724 let tx = block.transaction(&index)?;
725 TransactionQueryData::new(tx, &block, &index, i as u64)
726 })
727 .collect::<Vec<_>>();
728
729 futures::stream::iter(filtered_txs.into_iter().map(Ok))
730 })
731 .flatten())
732 }
733 .boxed()
734 })
735 .await
736 }
737 .try_flatten_stream()
738 .boxed()
739 })?
740 .at("get_block_summary", move |req, state| {
741 async move {
742 let id: usize = req.integer_param("height")?;
743
744 let fetch = state.read(|state| state.get_block(id).boxed()).await;
745 fetch
746 .with_timeout(timeout)
747 .await
748 .context(FetchBlockSnafu {
749 resource: id.to_string(),
750 })
751 .map(BlockSummaryQueryData::from)
752 }
753 .boxed()
754 })?
755 .at("get_block_summary_range", move |req, state| {
756 async move {
757 let from: usize = req.integer_param("from")?;
758 let until: usize = req.integer_param("until")?;
759 enforce_range_limit(from, until, large_object_range_limit)?;
760
761 let blocks = state
762 .read(|state| state.get_block_range(from..until).boxed())
763 .await;
764 let result: Vec<BlockSummaryQueryData<Types>> = blocks
765 .enumerate()
766 .then(|(index, fetch)| async move {
767 fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
768 resource: (index + from).to_string(),
769 })
770 })
771 .map(|result| result.map(BlockSummaryQueryData::from))
772 .try_collect()
773 .await?;
774
775 Ok(result)
776 }
777 .boxed()
778 })?
779 .at("get_limits", move |_req, _state| {
780 async move {
781 Ok(Limits {
782 small_object_range_limit,
783 large_object_range_limit,
784 })
785 }
786 .boxed()
787 })?;
788 Ok(api)
789}
790
791fn enforce_range_limit(from: usize, until: usize, limit: usize) -> Result<(), Error> {
792 if until.saturating_sub(from) > limit {
793 return Err(Error::RangeLimit { from, until, limit });
794 }
795 Ok(())
796}
797
798async fn get_transaction<Types, State>(
799 req: RequestParams,
800 state: &State,
801 timeout: Duration,
802) -> Result<BlockWithTransaction<Types>, Error>
803where
804 Types: NodeType,
805 Header<Types>: QueryableHeader<Types>,
806 Payload<Types>: QueryablePayload<Types>,
807 State: 'static + Send + Sync + ReadState,
808 <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
809{
810 match req.opt_blob_param("hash")? {
811 Some(hash) => state
812 .read(|state| state.get_block_containing_transaction(hash).boxed())
813 .await
814 .with_timeout(timeout)
815 .await
816 .context(FetchTransactionSnafu {
817 resource: hash.to_string(),
818 }),
819 None => {
820 let height: u64 = req.integer_param("height")?;
821 let fetch = state
822 .read(|state| state.get_block(height as usize).boxed())
823 .await;
824 let block = fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
825 resource: height.to_string(),
826 })?;
827 let i: u64 = req.integer_param("index")?;
828 let index = block
829 .payload()
830 .nth(block.metadata(), i as usize)
831 .context(InvalidTransactionIndexSnafu { height, index: i })?;
832 let transaction = block
833 .transaction(&index)
834 .context(InvalidTransactionIndexSnafu { height, index: i })?;
835 let transaction = TransactionQueryData::new(transaction, &block, &index, i)
836 .context(InvalidTransactionIndexSnafu { height, index: i })?;
837 Ok(BlockWithTransaction {
838 transaction,
839 block,
840 index,
841 })
842 },
843 }
844}
845
846#[cfg(test)]
847mod test {
848 use std::{fmt::Debug, time::Duration};
849
850 use async_lock::RwLock;
851 use committable::Committable;
852 use futures::future::FutureExt;
853 use hotshot_example_types::node_types::TEST_VERSIONS;
854 use hotshot_types::{data::Leaf2, simple_certificate::QuorumCertificate2};
855 use serde::de::DeserializeOwned;
856 use surf_disco::{Client, Error as _};
857 use tempfile::TempDir;
858 use test_utils::reserve_tcp_port;
859 use tide_disco::App;
860 use toml::toml;
861
862 use super::*;
863 use crate::{
864 ApiState, Error, Header,
865 data_source::{ExtensibleDataSource, VersionedDataSource, storage::AvailabilityStorage},
866 status::StatusDataSource,
867 task::BackgroundTask,
868 testing::{
869 consensus::{MockDataSource, MockNetwork, MockSqlDataSource},
870 mocks::{MOCK_UPGRADE, MockBase, MockHeader, MockPayload, MockTypes, mock_transaction},
871 },
872 types::HeightIndexed,
873 };
874
875 async fn get_non_empty_blocks(
877 client: &Client<Error, MockBase>,
878 ) -> (
879 u64,
880 Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>,
881 ) {
882 let mut blocks = vec![];
883 for i in 1.. {
885 match client
886 .get::<BlockQueryData<MockTypes>>(&format!("block/{i}"))
887 .send()
888 .await
889 {
890 Ok(block) => {
891 if !block.is_empty() {
892 let leaf = client.get(&format!("leaf/{i}")).send().await.unwrap();
893 blocks.push((leaf, block));
894 }
895 },
896 Err(Error::Availability {
897 source: super::Error::FetchBlock { .. },
898 }) => {
899 tracing::info!(
900 "found end of ledger at height {i}, non-empty blocks are {blocks:?}",
901 );
902 return (i, blocks);
903 },
904 Err(err) => panic!("unexpected error {err}"),
905 }
906 }
907 unreachable!()
908 }
909
910 async fn validate(client: &Client<Error, MockBase>, height: u64) {
911 for i in 0..height {
913 if ![0, 1, height / 2, height - 1].contains(&i) {
916 continue;
917 }
918 tracing::info!("validate block {i}/{height}");
919
920 let leaf: LeafQueryData<MockTypes> =
922 client.get(&format!("leaf/{i}")).send().await.unwrap();
923 assert_eq!(leaf.height(), i);
924 assert_eq!(
925 leaf,
926 client
927 .get(&format!("leaf/hash/{}", leaf.hash()))
928 .send()
929 .await
930 .unwrap()
931 );
932
933 let block: BlockQueryData<MockTypes> =
935 client.get(&format!("block/{i}")).send().await.unwrap();
936 let expected_payload = PayloadQueryData::from(block.clone());
937 assert_eq!(leaf.block_hash(), block.hash());
938 assert_eq!(block.height(), i);
939 assert_eq!(
940 block,
941 client
942 .get(&format!("block/hash/{}", block.hash()))
943 .send()
944 .await
945 .unwrap()
946 );
947 assert_eq!(
948 *block.header(),
949 client.get(&format!("header/{i}")).send().await.unwrap()
950 );
951 assert_eq!(
952 *block.header(),
953 client
954 .get(&format!("header/hash/{}", block.hash()))
955 .send()
956 .await
957 .unwrap()
958 );
959 assert_eq!(
960 expected_payload,
961 client.get(&format!("payload/{i}")).send().await.unwrap(),
962 );
963 assert_eq!(
964 expected_payload,
965 client
966 .get(&format!("payload/block-hash/{}", block.hash()))
967 .send()
968 .await
969 .unwrap(),
970 );
971 let common: VidCommonQueryData<MockTypes> = client
973 .get(&format!("vid/common/{}", block.height()))
974 .send()
975 .await
976 .unwrap();
977 assert_eq!(common.height(), block.height());
978 assert_eq!(common.block_hash(), block.hash());
979 assert_eq!(common.payload_hash(), block.payload_hash());
980 assert_eq!(
981 common,
982 client
983 .get(&format!("vid/common/hash/{}", block.hash()))
984 .send()
985 .await
986 .unwrap()
987 );
988
989 let block_summary = client
990 .get(&format!("block/summary/{i}"))
991 .send()
992 .await
993 .unwrap();
994 assert_eq!(
995 BlockSummaryQueryData::<MockTypes>::from(block.clone()),
996 block_summary,
997 );
998 assert_eq!(block_summary.header(), block.header());
999 assert_eq!(block_summary.hash(), block.hash());
1000 assert_eq!(block_summary.size(), block.size());
1001 assert_eq!(block_summary.num_transactions(), block.num_transactions());
1002
1003 let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
1004 .get(&format!("block/summaries/{}/{}", 0, i))
1005 .send()
1006 .await
1007 .unwrap();
1008 assert_eq!(block_summaries.len() as u64, i);
1009
1010 assert_eq!(
1015 block.payload(),
1016 client
1017 .get::<BlockQueryData<MockTypes>>(&format!(
1018 "block/payload-hash/{}",
1019 block.payload_hash()
1020 ))
1021 .send()
1022 .await
1023 .unwrap()
1024 .payload()
1025 );
1026 assert_eq!(
1027 block.payload_hash(),
1028 client
1029 .get::<Header<MockTypes>>(&format!(
1030 "header/payload-hash/{}",
1031 block.payload_hash()
1032 ))
1033 .send()
1034 .await
1035 .unwrap()
1036 .payload_commitment
1037 );
1038 assert_eq!(
1039 block.payload(),
1040 client
1041 .get::<PayloadQueryData<MockTypes>>(&format!(
1042 "payload/hash/{}",
1043 block.payload_hash()
1044 ))
1045 .send()
1046 .await
1047 .unwrap()
1048 .data(),
1049 );
1050 assert_eq!(
1051 common.common(),
1052 client
1053 .get::<VidCommonQueryData<MockTypes>>(&format!(
1054 "vid/common/payload-hash/{}",
1055 block.payload_hash()
1056 ))
1057 .send()
1058 .await
1059 .unwrap()
1060 .common()
1061 );
1062
1063 for (j, txn_from_block) in block.enumerate() {
1066 let txn: TransactionQueryData<MockTypes> = client
1067 .get(&format!("transaction/{}/{}/noproof", i, j.position))
1068 .send()
1069 .await
1070 .unwrap();
1071 assert_eq!(txn.block_height(), i);
1072 assert_eq!(txn.block_hash(), block.hash());
1073 assert_eq!(txn.index(), j.position as u64);
1074 assert_eq!(txn.hash(), txn_from_block.commit());
1075 assert_eq!(txn.transaction(), &txn_from_block);
1076 assert_eq!(
1081 txn.hash(),
1082 client
1083 .get::<TransactionQueryData<MockTypes>>(&format!(
1084 "transaction/hash/{}/noproof",
1085 txn.hash()
1086 ))
1087 .send()
1088 .await
1089 .unwrap()
1090 .hash()
1091 );
1092
1093 let tx_with_proof = client
1094 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1095 "transaction/{}/{}/proof",
1096 i, j.position
1097 ))
1098 .send()
1099 .await
1100 .unwrap();
1101 assert_eq!(txn.hash(), tx_with_proof.hash());
1102 assert!(tx_with_proof.proof().verify(
1103 block.metadata(),
1104 txn.transaction(),
1105 &block.payload_hash(),
1106 common.common()
1107 ));
1108
1109 let tx_with_proof = client
1111 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1112 "transaction/hash/{}/proof",
1113 txn.hash()
1114 ))
1115 .send()
1116 .await
1117 .unwrap();
1118 assert_eq!(txn.hash(), tx_with_proof.hash());
1119 assert!(tx_with_proof.proof().verify(
1120 block.metadata(),
1121 txn.transaction(),
1122 &block.payload_hash(),
1123 common.common()
1124 ));
1125 }
1126
1127 let block_range: Vec<BlockQueryData<MockTypes>> = client
1128 .get(&format!("block/{}/{}", 0, i))
1129 .send()
1130 .await
1131 .unwrap();
1132
1133 assert_eq!(block_range.len() as u64, i);
1134
1135 let leaf_range: Vec<LeafQueryData<MockTypes>> = client
1136 .get(&format!("leaf/{}/{}", 0, i))
1137 .send()
1138 .await
1139 .unwrap();
1140
1141 assert_eq!(leaf_range.len() as u64, i);
1142
1143 let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1144 .get(&format!("payload/{}/{}", 0, i))
1145 .send()
1146 .await
1147 .unwrap();
1148
1149 assert_eq!(payload_range.len() as u64, i);
1150
1151 let vid_common_range: Vec<VidCommonQueryData<MockTypes>> = client
1152 .get(&format!("vid/common/{}/{}", 0, i))
1153 .send()
1154 .await
1155 .unwrap();
1156
1157 assert_eq!(vid_common_range.len() as u64, i);
1158
1159 let header_range: Vec<Header<MockTypes>> = client
1160 .get(&format!("header/{}/{}", 0, i))
1161 .send()
1162 .await
1163 .unwrap();
1164
1165 assert_eq!(header_range.len() as u64, i);
1166 }
1167 }
1168
1169 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1170 async fn test_api() {
1171 let mut network = MockNetwork::<MockDataSource>::init().await;
1173 network.start().await;
1174
1175 let port = reserve_tcp_port().unwrap();
1177 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1178 let options = Options {
1179 small_object_range_limit: 500,
1180 large_object_range_limit: 500,
1181 ..Default::default()
1182 };
1183
1184 app.register_module(
1185 "availability",
1186 define_api(&options, MockBase::instance(), "1.0.0".parse().unwrap()).unwrap(),
1187 )
1188 .unwrap();
1189 network.spawn(
1190 "server",
1191 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1192 );
1193
1194 let client = Client::<Error, MockBase>::new(
1196 format!("http://localhost:{port}/availability")
1197 .parse()
1198 .unwrap(),
1199 );
1200 assert!(client.connect(Some(Duration::from_secs(60))).await);
1201 assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1202
1203 let leaves = client
1206 .socket("stream/leaves/0")
1207 .subscribe::<LeafQueryData<MockTypes>>()
1208 .await
1209 .unwrap();
1210 let headers = client
1211 .socket("stream/headers/0")
1212 .subscribe::<Header<MockTypes>>()
1213 .await
1214 .unwrap();
1215 let blocks = client
1216 .socket("stream/blocks/0")
1217 .subscribe::<BlockQueryData<MockTypes>>()
1218 .await
1219 .unwrap();
1220 let vid_common = client
1221 .socket("stream/vid/common/0")
1222 .subscribe::<VidCommonQueryData<MockTypes>>()
1223 .await
1224 .unwrap();
1225 let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1226 for nonce in 0..3 {
1227 let txn = mock_transaction(vec![nonce]);
1228 network.submit_transaction(txn).await;
1229
1230 let (i, leaf, block, common) = loop {
1232 tracing::info!("waiting for block with transaction {}", nonce);
1233 let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1234 tracing::info!(i, ?leaf, ?header, ?block, ?common);
1235 let leaf = leaf.unwrap();
1236 let header = header.unwrap();
1237 let block = block.unwrap();
1238 let common = common.unwrap();
1239 assert_eq!(leaf.height() as usize, i);
1240 assert_eq!(leaf.block_hash(), block.hash());
1241 assert_eq!(block.header(), &header);
1242 assert_eq!(common.height() as usize, i);
1243 if !block.is_empty() {
1244 break (i, leaf, block, common);
1245 }
1246 };
1247 assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1248 assert_eq!(
1249 block,
1250 client.get(&format!("block/{i}")).send().await.unwrap()
1251 );
1252 assert_eq!(
1253 common,
1254 client.get(&format!("vid/common/{i}")).send().await.unwrap()
1255 );
1256
1257 validate(&client, (i + 1) as u64).await;
1258 }
1259
1260 network.shut_down().await;
1261 }
1262
1263 async fn validate_old(client: &Client<Error, MockBase>, height: u64) {
1264 for i in 0..height {
1266 if ![0, 1, height / 2, height - 1].contains(&i) {
1269 continue;
1270 }
1271 tracing::info!("validate block {i}/{height}");
1272
1273 let leaf: Leaf1QueryData<MockTypes> =
1275 client.get(&format!("leaf/{i}")).send().await.unwrap();
1276 assert_eq!(leaf.leaf.height(), i);
1277 assert_eq!(
1278 leaf,
1279 client
1280 .get(&format!(
1281 "leaf/hash/{}",
1282 <Leaf<MockTypes> as Committable>::commit(&leaf.leaf)
1283 ))
1284 .send()
1285 .await
1286 .unwrap()
1287 );
1288
1289 let block: BlockQueryData<MockTypes> =
1291 client.get(&format!("block/{i}")).send().await.unwrap();
1292 let expected_payload = PayloadQueryData::from(block.clone());
1293 assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1294 assert_eq!(block.height(), i);
1295 assert_eq!(
1296 block,
1297 client
1298 .get(&format!("block/hash/{}", block.hash()))
1299 .send()
1300 .await
1301 .unwrap()
1302 );
1303 assert_eq!(
1304 *block.header(),
1305 client.get(&format!("header/{i}")).send().await.unwrap()
1306 );
1307 assert_eq!(
1308 *block.header(),
1309 client
1310 .get(&format!("header/hash/{}", block.hash()))
1311 .send()
1312 .await
1313 .unwrap()
1314 );
1315 assert_eq!(
1316 expected_payload,
1317 client.get(&format!("payload/{i}")).send().await.unwrap(),
1318 );
1319 assert_eq!(
1320 expected_payload,
1321 client
1322 .get(&format!("payload/block-hash/{}", block.hash()))
1323 .send()
1324 .await
1325 .unwrap(),
1326 );
1327 let common: ADVZCommonQueryData<MockTypes> = client
1329 .get(&format!("vid/common/{}", block.height()))
1330 .send()
1331 .await
1332 .unwrap();
1333 assert_eq!(common.height(), block.height());
1334 assert_eq!(common.block_hash(), block.hash());
1335 assert_eq!(
1336 VidCommitment::V0(common.payload_hash()),
1337 block.payload_hash(),
1338 );
1339 assert_eq!(
1340 common,
1341 client
1342 .get(&format!("vid/common/hash/{}", block.hash()))
1343 .send()
1344 .await
1345 .unwrap()
1346 );
1347
1348 let block_summary = client
1349 .get(&format!("block/summary/{i}"))
1350 .send()
1351 .await
1352 .unwrap();
1353 assert_eq!(
1354 BlockSummaryQueryData::<MockTypes>::from(block.clone()),
1355 block_summary,
1356 );
1357 assert_eq!(block_summary.header(), block.header());
1358 assert_eq!(block_summary.hash(), block.hash());
1359 assert_eq!(block_summary.size(), block.size());
1360 assert_eq!(block_summary.num_transactions(), block.num_transactions());
1361
1362 let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
1363 .get(&format!("block/summaries/{}/{}", 0, i))
1364 .send()
1365 .await
1366 .unwrap();
1367 assert_eq!(block_summaries.len() as u64, i);
1368
1369 assert_eq!(
1374 block.payload(),
1375 client
1376 .get::<BlockQueryData<MockTypes>>(&format!(
1377 "block/payload-hash/{}",
1378 block.payload_hash()
1379 ))
1380 .send()
1381 .await
1382 .unwrap()
1383 .payload()
1384 );
1385 assert_eq!(
1386 block.payload_hash(),
1387 client
1388 .get::<Header<MockTypes>>(&format!(
1389 "header/payload-hash/{}",
1390 block.payload_hash()
1391 ))
1392 .send()
1393 .await
1394 .unwrap()
1395 .payload_commitment
1396 );
1397 assert_eq!(
1398 block.payload(),
1399 client
1400 .get::<PayloadQueryData<MockTypes>>(&format!(
1401 "payload/hash/{}",
1402 block.payload_hash()
1403 ))
1404 .send()
1405 .await
1406 .unwrap()
1407 .data(),
1408 );
1409 assert_eq!(
1410 common.common(),
1411 client
1412 .get::<ADVZCommonQueryData<MockTypes>>(&format!(
1413 "vid/common/payload-hash/{}",
1414 block.payload_hash()
1415 ))
1416 .send()
1417 .await
1418 .unwrap()
1419 .common()
1420 );
1421
1422 for (j, txn_from_block) in block.enumerate() {
1425 let txn: TransactionQueryData<MockTypes> = client
1426 .get(&format!("transaction/{}/{}/noproof", i, j.position))
1427 .send()
1428 .await
1429 .unwrap();
1430 assert_eq!(txn.block_height(), i);
1431 assert_eq!(txn.block_hash(), block.hash());
1432 assert_eq!(txn.index(), j.position as u64);
1433 assert_eq!(txn.hash(), txn_from_block.commit());
1434 assert_eq!(txn.transaction(), &txn_from_block);
1435 assert_eq!(
1440 txn.hash(),
1441 client
1442 .get::<TransactionQueryData<MockTypes>>(&format!(
1443 "transaction/hash/{}/noproof",
1444 txn.hash()
1445 ))
1446 .send()
1447 .await
1448 .unwrap()
1449 .hash()
1450 );
1451
1452 assert_eq!(
1453 txn.hash(),
1454 client
1455 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1456 "transaction/{}/{}/proof",
1457 i, j.position
1458 ))
1459 .send()
1460 .await
1461 .unwrap()
1462 .hash()
1463 );
1464
1465 assert_eq!(
1466 txn.hash(),
1467 client
1468 .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1469 "transaction/hash/{}/proof",
1470 txn.hash()
1471 ))
1472 .send()
1473 .await
1474 .unwrap()
1475 .hash()
1476 );
1477 }
1478
1479 let block_range: Vec<BlockQueryData<MockTypes>> = client
1480 .get(&format!("block/{}/{}", 0, i))
1481 .send()
1482 .await
1483 .unwrap();
1484
1485 assert_eq!(block_range.len() as u64, i);
1486
1487 let leaf_range: Vec<Leaf1QueryData<MockTypes>> = client
1488 .get(&format!("leaf/{}/{}", 0, i))
1489 .send()
1490 .await
1491 .unwrap();
1492
1493 assert_eq!(leaf_range.len() as u64, i);
1494
1495 let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1496 .get(&format!("payload/{}/{}", 0, i))
1497 .send()
1498 .await
1499 .unwrap();
1500
1501 assert_eq!(payload_range.len() as u64, i);
1502
1503 let vid_common_range: Vec<ADVZCommonQueryData<MockTypes>> = client
1504 .get(&format!("vid/common/{}/{}", 0, i))
1505 .send()
1506 .await
1507 .unwrap();
1508
1509 assert_eq!(vid_common_range.len() as u64, i);
1510
1511 let header_range: Vec<Header<MockTypes>> = client
1512 .get(&format!("header/{}/{}", 0, i))
1513 .send()
1514 .await
1515 .unwrap();
1516
1517 assert_eq!(header_range.len() as u64, i);
1518 }
1519 }
1520
1521 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1522 async fn test_api_epochs() {
1523 let mut network = MockNetwork::<MockDataSource>::init().await;
1525 let epoch_height = network.epoch_height();
1526 network.start().await;
1527
1528 let port = reserve_tcp_port().unwrap();
1530 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1531 app.register_module(
1532 "availability",
1533 define_api(
1534 &Default::default(),
1535 MockBase::instance(),
1536 "1.0.0".parse().unwrap(),
1537 )
1538 .unwrap(),
1539 )
1540 .unwrap();
1541 network.spawn(
1542 "server",
1543 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1544 );
1545
1546 let client = Client::<Error, MockBase>::new(
1548 format!("http://localhost:{port}/availability")
1549 .parse()
1550 .unwrap(),
1551 );
1552 assert!(client.connect(Some(Duration::from_secs(60))).await);
1553
1554 let headers = client
1557 .socket("stream/headers/0")
1558 .subscribe::<Header<MockTypes>>()
1559 .await
1560 .unwrap();
1561 let mut chain = headers.enumerate();
1562
1563 loop {
1564 let (i, header) = chain.next().await.unwrap();
1565 let header = header.unwrap();
1566 assert_eq!(header.height(), i as u64);
1567 if header.height() >= 3 * epoch_height {
1568 break;
1569 }
1570 }
1571
1572 network.shut_down().await;
1573 }
1574
1575 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1576 async fn test_old_api() {
1577 let mut network = MockNetwork::<MockDataSource>::init().await;
1579 network.start().await;
1580
1581 let port = reserve_tcp_port().unwrap();
1583
1584 let options = Options {
1585 small_object_range_limit: 500,
1586 large_object_range_limit: 500,
1587 ..Default::default()
1588 };
1589
1590 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1591 app.register_module(
1592 "availability",
1593 define_api(&options, MockBase::instance(), "0.1.0".parse().unwrap()).unwrap(),
1594 )
1595 .unwrap();
1596 network.spawn(
1597 "server",
1598 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1599 );
1600
1601 let client = Client::<Error, MockBase>::new(
1603 format!("http://localhost:{port}/availability")
1604 .parse()
1605 .unwrap(),
1606 );
1607 assert!(client.connect(Some(Duration::from_secs(60))).await);
1608 assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1609
1610 let leaves = client
1613 .socket("stream/leaves/0")
1614 .subscribe::<Leaf1QueryData<MockTypes>>()
1615 .await
1616 .unwrap();
1617 let headers = client
1618 .socket("stream/headers/0")
1619 .subscribe::<Header<MockTypes>>()
1620 .await
1621 .unwrap();
1622 let blocks = client
1623 .socket("stream/blocks/0")
1624 .subscribe::<BlockQueryData<MockTypes>>()
1625 .await
1626 .unwrap();
1627 let vid_common = client
1628 .socket("stream/vid/common/0")
1629 .subscribe::<ADVZCommonQueryData<MockTypes>>()
1630 .await
1631 .unwrap();
1632 let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1633 for nonce in 0..3 {
1634 let txn = mock_transaction(vec![nonce]);
1635 network.submit_transaction(txn).await;
1636
1637 let (i, leaf, block, common) = loop {
1639 tracing::info!("waiting for block with transaction {}", nonce);
1640 let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1641 tracing::info!(i, ?leaf, ?header, ?block, ?common);
1642 let leaf = leaf.unwrap();
1643 let header = header.unwrap();
1644 let block = block.unwrap();
1645 let common = common.unwrap();
1646 assert_eq!(leaf.leaf.height() as usize, i);
1647 assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1648 assert_eq!(block.header(), &header);
1649 assert_eq!(common.height() as usize, i);
1650 if !block.is_empty() {
1651 break (i, leaf, block, common);
1652 }
1653 };
1654 assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1655 assert_eq!(
1656 block,
1657 client.get(&format!("block/{i}")).send().await.unwrap()
1658 );
1659 assert_eq!(
1660 common,
1661 client.get(&format!("vid/common/{i}")).send().await.unwrap()
1662 );
1663
1664 validate_old(&client, (i + 1) as u64).await;
1665 }
1666
1667 network.shut_down().await;
1668 }
1669
1670 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1671 async fn test_extensions() {
1672 let dir = TempDir::with_prefix("test_availability_extensions").unwrap();
1673 let data_source = ExtensibleDataSource::new(
1674 MockDataSource::create(dir.path(), Default::default())
1675 .await
1676 .unwrap(),
1677 0,
1678 );
1679
1680 let leaf = Leaf2::<MockTypes>::genesis(
1682 &Default::default(),
1683 &Default::default(),
1684 MOCK_UPGRADE.base,
1685 )
1686 .await;
1687 let qc = QuorumCertificate2::genesis(
1688 &Default::default(),
1689 &Default::default(),
1690 TEST_VERSIONS.test,
1691 )
1692 .await;
1693 let leaf = LeafQueryData::new(leaf, qc).unwrap();
1694 let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis());
1695 data_source
1696 .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1697 .await
1698 .unwrap();
1699
1700 assert_eq!(
1702 ExtensibleDataSource::<MockDataSource, u64>::block_height(&data_source)
1703 .await
1704 .unwrap(),
1705 1
1706 );
1707 assert_eq!(block, data_source.get_block(0).await.await);
1708
1709 let extensions = toml! {
1711 [route.post_ext]
1712 PATH = ["/ext/:val"]
1713 METHOD = "POST"
1714 ":val" = "Integer"
1715
1716 [route.get_ext]
1717 PATH = ["/ext"]
1718 METHOD = "GET"
1719 };
1720
1721 let mut api =
1722 define_api::<RwLock<ExtensibleDataSource<MockDataSource, u64>>, MockTypes, MockBase>(
1723 &Options {
1724 extensions: vec![extensions.into()],
1725 ..Default::default()
1726 },
1727 MockBase::instance(),
1728 "1.0.0".parse().unwrap(),
1729 )
1730 .unwrap();
1731 api.get("get_ext", |_, state| {
1732 async move { Ok(*state.as_ref()) }.boxed()
1733 })
1734 .unwrap()
1735 .post("post_ext", |req, state| {
1736 async move {
1737 *state.as_mut() = req.integer_param("val")?;
1738 Ok(())
1739 }
1740 .boxed()
1741 })
1742 .unwrap();
1743
1744 let mut app = App::<_, Error>::with_state(RwLock::new(data_source));
1745 app.register_module("availability", api).unwrap();
1746
1747 let port = reserve_tcp_port().unwrap();
1748 let _server = BackgroundTask::spawn(
1749 "server",
1750 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1751 );
1752
1753 let client = Client::<Error, MockBase>::new(
1754 format!("http://localhost:{port}/availability")
1755 .parse()
1756 .unwrap(),
1757 );
1758 assert!(client.connect(Some(Duration::from_secs(60))).await);
1759
1760 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 0);
1761 client.post::<()>("ext/42").send().await.unwrap();
1762 assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 42);
1763
1764 assert_eq!(
1766 client
1767 .get::<MockHeader>("header/0")
1768 .send()
1769 .await
1770 .unwrap()
1771 .block_number,
1772 0
1773 );
1774 }
1775
1776 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1777 async fn test_range_limit() {
1778 let large_object_range_limit = 2;
1779 let small_object_range_limit = 3;
1780
1781 let mut network = MockNetwork::<MockDataSource>::init().await;
1783 network.start().await;
1784
1785 let port = reserve_tcp_port().unwrap();
1787 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1788 app.register_module(
1789 "availability",
1790 define_api(
1791 &Options {
1792 large_object_range_limit,
1793 small_object_range_limit,
1794 ..Default::default()
1795 },
1796 MockBase::instance(),
1797 "1.0.0".parse().unwrap(),
1798 )
1799 .unwrap(),
1800 )
1801 .unwrap();
1802 network.spawn(
1803 "server",
1804 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1805 );
1806
1807 let client = Client::<Error, MockBase>::new(
1809 format!("http://localhost:{port}/availability")
1810 .parse()
1811 .unwrap(),
1812 );
1813 assert!(client.connect(Some(Duration::from_secs(60))).await);
1814
1815 assert_eq!(
1817 client.get::<Limits>("limits").send().await.unwrap(),
1818 Limits {
1819 small_object_range_limit,
1820 large_object_range_limit
1821 }
1822 );
1823
1824 client
1826 .socket("stream/blocks/0")
1827 .subscribe::<BlockQueryData<MockTypes>>()
1828 .await
1829 .unwrap()
1830 .take(small_object_range_limit + 1)
1831 .try_collect::<Vec<_>>()
1832 .await
1833 .unwrap();
1834
1835 async fn check_limit<T: DeserializeOwned + Debug>(
1836 client: &Client<Error, MockBase>,
1837 req: &str,
1838 limit: usize,
1839 ) {
1840 let range: Vec<T> = client
1841 .get(&format!("{req}/0/{limit}"))
1842 .send()
1843 .await
1844 .unwrap();
1845 assert_eq!(range.len(), limit);
1846 let err = client
1847 .get::<Vec<T>>(&format!("{req}/0/{}", limit + 1))
1848 .send()
1849 .await
1850 .unwrap_err();
1851 assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1852 }
1853
1854 check_limit::<LeafQueryData<MockTypes>>(&client, "leaf", small_object_range_limit).await;
1855 check_limit::<Header<MockTypes>>(&client, "header", large_object_range_limit).await;
1856 check_limit::<BlockQueryData<MockTypes>>(&client, "block", large_object_range_limit).await;
1857 check_limit::<PayloadQueryData<MockTypes>>(&client, "payload", large_object_range_limit)
1858 .await;
1859 check_limit::<BlockSummaryQueryData<MockTypes>>(
1860 &client,
1861 "block/summaries",
1862 large_object_range_limit,
1863 )
1864 .await;
1865
1866 network.shut_down().await;
1867 }
1868
1869 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1870 async fn test_header_endpoint() {
1871 let mut network = MockNetwork::<MockSqlDataSource>::init().await;
1873 network.start().await;
1874
1875 let port = reserve_tcp_port().unwrap();
1877 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1878 app.register_module(
1879 "availability",
1880 define_api(
1881 &Default::default(),
1882 MockBase::instance(),
1883 "1.0.0".parse().unwrap(),
1884 )
1885 .unwrap(),
1886 )
1887 .unwrap();
1888 network.spawn(
1889 "server",
1890 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1891 );
1892
1893 let ds = network.data_source();
1894
1895 let block_height = ds.block_height().await.unwrap();
1898 let fetch = ds
1899 .get_header(BlockId::<MockTypes>::Number(block_height + 25))
1900 .await;
1901
1902 assert!(fetch.is_pending());
1903 let header = fetch.await;
1904 assert_eq!(header.height() as usize, block_height + 25);
1905
1906 network.shut_down().await;
1907 }
1908
1909 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1910 async fn test_leaf_only_ds() {
1911 let mut network = MockNetwork::<MockSqlDataSource>::init_with_leaf_ds().await;
1913 network.start().await;
1914
1915 let port = reserve_tcp_port().unwrap();
1917 let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1918 app.register_module(
1919 "availability",
1920 define_api(
1921 &Default::default(),
1922 MockBase::instance(),
1923 "1.0.0".parse().unwrap(),
1924 )
1925 .unwrap(),
1926 )
1927 .unwrap();
1928 network.spawn(
1929 "server",
1930 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1931 );
1932
1933 let client = Client::<Error, MockBase>::new(
1935 format!("http://localhost:{port}/availability")
1936 .parse()
1937 .unwrap(),
1938 );
1939 assert!(client.connect(Some(Duration::from_secs(60))).await);
1940
1941 client
1943 .socket("stream/headers/0")
1944 .subscribe::<Header<MockTypes>>()
1945 .await
1946 .unwrap()
1947 .take(5)
1948 .try_collect::<Vec<_>>()
1949 .await
1950 .unwrap();
1951
1952 client
1954 .socket("stream/leaves/5")
1955 .subscribe::<LeafQueryData<MockTypes>>()
1956 .await
1957 .unwrap()
1958 .take(5)
1959 .try_collect::<Vec<_>>()
1960 .await
1961 .unwrap();
1962
1963 let ds = network.data_source();
1964
1965 let block_height = ds.block_height().await.unwrap();
1969 let target_block_height = block_height + 20;
1970 let fetch = ds
1971 .get_block(BlockId::<MockTypes>::Number(target_block_height))
1972 .await;
1973
1974 assert!(fetch.is_pending());
1975 let block = fetch.await;
1976 assert_eq!(block.height() as usize, target_block_height);
1977
1978 let mut tx = ds.read().await.unwrap();
1979 tx.get_block(BlockId::<MockTypes>::Number(target_block_height))
1980 .await
1981 .unwrap_err();
1982 drop(tx);
1983
1984 network.shut_down().await;
1985 }
1986}