hotshot_query_service/
availability.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Queries for HotShot chain state.
14//!
15//! The availability API provides an objective view of the HotShot blockchain. It provides access
16//! only to normative data: that is, data which is agreed upon by all honest consensus nodes and
17//! which is immutable. This means access to core consensus data structures including leaves,
18//! blocks, and headers, where each query is pure and idempotent. This also means that it is
19//! possible for a client to verify all of the information provided by this API, by running a
20//! HotShot light client and downloading the appropriate evidence with each query.
21//!
22//! This API does not provide any queries which represent only the _current_ state of the chain or
23//! may change over time, and it does not provide information for which there is not (yet) agreement
24//! of a supermajority of consensus nodes. For information about the current dynamic state of
25//! consensus and uncommitted state, try the [status](crate::status) API. For information about the
26//! chain which is tabulated by this specific node and not subject to full consensus agreement, try
27//! the [node](crate::node) API.
28
29use std::{fmt::Display, path::PathBuf, time::Duration};
30
31use derive_more::From;
32use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
33use hotshot_types::{
34    data::{Leaf, Leaf2, QuorumProposal, VidCommitment, VidCommon},
35    simple_certificate::QuorumCertificate,
36    traits::node_implementation::NodeType,
37};
38use serde::{Deserialize, Serialize};
39use snafu::{OptionExt, Snafu};
40use tide_disco::{Api, RequestError, RequestParams, StatusCode, api::ApiError, method::ReadState};
41use vbs::version::StaticVersionType;
42
43use crate::{Header, Payload, QueryError, api::load_api, types::HeightIndexed};
44
45pub(crate) mod data_source;
46mod fetch;
47pub(crate) mod query_data;
48pub use data_source::*;
49pub use fetch::Fetch;
50pub use query_data::*;
51
52#[derive(Debug)]
53pub struct Options {
54    pub api_path: Option<PathBuf>,
55
56    /// Timeout for failing requests due to missing data.
57    ///
58    /// If data needed to respond to a request is missing, it can (in some cases) be fetched from an
59    /// external provider. This parameter controls how long the request handler will wait for
60    /// missing data to be fetched before giving up and failing the request.
61    pub fetch_timeout: Duration,
62
63    /// Additional API specification files to merge with `availability-api-path`.
64    ///
65    /// These optional files may contain route definitions for application-specific routes that have
66    /// been added as extensions to the basic availability API.
67    pub extensions: Vec<toml::Value>,
68
69    /// The maximum number of small objects which can be loaded in a single range query.
70    ///
71    /// Currently small objects include leaves only. In the future this limit will also apply to
72    /// headers, block summaries, and VID common, however
73    /// * loading of headers and block summaries is currently implemented by loading the entire
74    ///   block
75    /// * imperfect VID parameter tuning means that VID common can be much larger than it should
76    pub small_object_range_limit: usize,
77
78    /// The maximum number of large objects which can be loaded in a single range query.
79    ///
80    /// Large objects include anything that _might_ contain a full payload or an object proportional
81    /// in size to a payload. Note that this limit applies to the entire class of objects: we do not
82    /// check the size of objects while loading to determine which limit to apply. If an object
83    /// belongs to a class which might contain a large payload, the large object limit always
84    /// applies.
85    pub large_object_range_limit: usize,
86}
87
88impl Default for Options {
89    fn default() -> Self {
90        Self {
91            api_path: None,
92            fetch_timeout: Duration::from_millis(500),
93            extensions: vec![],
94            large_object_range_limit: 100,
95            small_object_range_limit: 500,
96        }
97    }
98}
99
100#[derive(Clone, Debug, From, Snafu, Deserialize, Serialize)]
101#[snafu(visibility(pub))]
102pub enum Error {
103    Request {
104        source: RequestError,
105    },
106    #[snafu(display("leaf {resource} missing or not available"))]
107    #[from(ignore)]
108    FetchLeaf {
109        resource: String,
110    },
111    #[snafu(display("block {resource} missing or not available"))]
112    #[from(ignore)]
113    FetchBlock {
114        resource: String,
115    },
116    #[snafu(display("header {resource} missing or not available"))]
117    #[from(ignore)]
118    FetchHeader {
119        resource: String,
120    },
121    #[snafu(display("transaction {resource} missing or not available"))]
122    #[from(ignore)]
123    FetchTransaction {
124        resource: String,
125    },
126    #[snafu(display("transaction index {index} out of range for block {height}"))]
127    #[from(ignore)]
128    InvalidTransactionIndex {
129        height: u64,
130        index: u64,
131    },
132    #[snafu(display("request for range {from}..{until} exceeds limit {limit}"))]
133    #[from(ignore)]
134    RangeLimit {
135        from: usize,
136        until: usize,
137        limit: usize,
138    },
139    #[snafu(display("{source}"))]
140    Query {
141        source: QueryError,
142    },
143    #[snafu(display("State cert for epoch {epoch} not found"))]
144    #[from(ignore)]
145    FetchStateCert {
146        epoch: u64,
147    },
148    #[snafu(display("error {status}: {message}"))]
149    Custom {
150        message: String,
151        status: StatusCode,
152    },
153}
154
155impl Error {
156    pub fn internal<M: Display>(message: M) -> Self {
157        Self::Custom {
158            message: message.to_string(),
159            status: StatusCode::INTERNAL_SERVER_ERROR,
160        }
161    }
162
163    pub fn status(&self) -> StatusCode {
164        match self {
165            Self::Request { .. } | Self::RangeLimit { .. } => StatusCode::BAD_REQUEST,
166            Self::FetchLeaf { .. }
167            | Self::FetchBlock { .. }
168            | Self::FetchTransaction { .. }
169            | Self::FetchHeader { .. }
170            | Self::FetchStateCert { .. } => StatusCode::NOT_FOUND,
171            Self::InvalidTransactionIndex { .. } | Self::Query { .. } => StatusCode::NOT_FOUND,
172            Self::Custom { status, .. } => *status,
173        }
174    }
175}
176
177#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
178#[serde(bound = "")]
179pub struct Leaf1QueryData<Types: NodeType> {
180    pub(crate) leaf: Leaf<Types>,
181    pub(crate) qc: QuorumCertificate<Types>,
182}
183
184impl<Types: NodeType> Leaf1QueryData<Types> {
185    pub fn new(leaf: Leaf<Types>, qc: QuorumCertificate<Types>) -> Self {
186        Self { leaf, qc }
187    }
188
189    pub fn leaf(&self) -> &Leaf<Types> {
190        &self.leaf
191    }
192
193    pub fn qc(&self) -> &QuorumCertificate<Types> {
194        &self.qc
195    }
196}
197
198fn downgrade_leaf<Types: NodeType>(leaf2: Leaf2<Types>) -> Leaf<Types> {
199    // TODO do we still need some check here?
200    // `drb_seed` no longer exists on `Leaf2`
201    // if leaf2.drb_seed != [0; 32] && leaf2.drb_result != [0; 32] {
202    //     panic!("Downgrade of Leaf2 to Leaf will lose DRB information!");
203    // }
204    let quorum_proposal = QuorumProposal {
205        block_header: leaf2.block_header().clone(),
206        view_number: leaf2.view_number(),
207        justify_qc: leaf2.justify_qc().to_qc(),
208        upgrade_certificate: leaf2.upgrade_certificate(),
209        proposal_certificate: None,
210    };
211    let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
212    if let Some(payload) = leaf2.block_payload() {
213        leaf.fill_block_payload_unchecked(payload);
214    }
215    leaf
216}
217
218fn downgrade_leaf_query_data<Types: NodeType>(leaf: LeafQueryData<Types>) -> Leaf1QueryData<Types> {
219    Leaf1QueryData {
220        leaf: downgrade_leaf(leaf.leaf),
221        qc: leaf.qc.to_qc(),
222    }
223}
224
225async fn get_leaf_handler<Types, State>(
226    req: tide_disco::RequestParams,
227    state: &State,
228    timeout: Duration,
229) -> Result<LeafQueryData<Types>, Error>
230where
231    State: 'static + Send + Sync + ReadState,
232    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
233    Types: NodeType,
234    Header<Types>: QueryableHeader<Types>,
235    Payload<Types>: QueryablePayload<Types>,
236{
237    let id = match req.opt_integer_param("height")? {
238        Some(height) => LeafId::Number(height),
239        None => LeafId::Hash(req.blob_param("hash")?),
240    };
241    let fetch = state.read(|state| state.get_leaf(id).boxed()).await;
242    fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
243        resource: id.to_string(),
244    })
245}
246
247async fn get_leaf_range_handler<Types, State>(
248    req: tide_disco::RequestParams,
249    state: &State,
250    timeout: Duration,
251    small_object_range_limit: usize,
252) -> Result<Vec<LeafQueryData<Types>>, Error>
253where
254    State: 'static + Send + Sync + ReadState,
255    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
256    Types: NodeType,
257    Header<Types>: QueryableHeader<Types>,
258    Payload<Types>: QueryablePayload<Types>,
259{
260    let from = req.integer_param::<_, usize>("from")?;
261    let until = req.integer_param("until")?;
262    enforce_range_limit(from, until, small_object_range_limit)?;
263
264    let leaves = state
265        .read(|state| state.get_leaf_range(from..until).boxed())
266        .await;
267    leaves
268        .enumerate()
269        .then(|(index, fetch)| async move {
270            fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
271                resource: (index + from).to_string(),
272            })
273        })
274        .try_collect::<Vec<_>>()
275        .await
276}
277
278fn downgrade_vid_common_query_data<Types: NodeType>(
279    data: VidCommonQueryData<Types>,
280) -> Option<ADVZCommonQueryData<Types>> {
281    let VidCommonQueryData {
282        height,
283        block_hash,
284        payload_hash: VidCommitment::V0(payload_hash),
285        common: VidCommon::V0(common),
286    } = data
287    else {
288        return None;
289    };
290    Some(ADVZCommonQueryData {
291        height,
292        block_hash,
293        payload_hash,
294        common,
295    })
296}
297
298async fn get_vid_common_handler<Types, State>(
299    req: tide_disco::RequestParams,
300    state: &State,
301    timeout: Duration,
302) -> Result<VidCommonQueryData<Types>, Error>
303where
304    State: 'static + Send + Sync + ReadState,
305    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
306    Types: NodeType,
307    Header<Types>: QueryableHeader<Types>,
308    Payload<Types>: QueryablePayload<Types>,
309{
310    let id = if let Some(height) = req.opt_integer_param("height")? {
311        BlockId::Number(height)
312    } else if let Some(hash) = req.opt_blob_param("hash")? {
313        BlockId::Hash(hash)
314    } else {
315        BlockId::PayloadHash(req.blob_param("payload-hash")?)
316    };
317    let fetch = state.read(|state| state.get_vid_common(id).boxed()).await;
318    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
319        resource: id.to_string(),
320    })
321}
322
323async fn get_vid_common_range_handler<Types, State>(
324    req: tide_disco::RequestParams,
325    state: &State,
326    limit: usize,
327    timeout: Duration,
328) -> Result<Vec<VidCommonQueryData<Types>>, Error>
329where
330    State: 'static + Send + Sync + ReadState,
331    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
332    Types: NodeType,
333    Header<Types>: QueryableHeader<Types>,
334    Payload<Types>: QueryablePayload<Types>,
335{
336    let from = req.integer_param("from")?;
337    let until = req.integer_param("until")?;
338    enforce_range_limit(from, until, limit)?;
339
340    let vid = state
341        .read(|state| state.get_vid_common_range(from..until).boxed())
342        .await;
343    vid.enumerate()
344        .then(|(index, fetch)| async move {
345            fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
346                resource: (index + from).to_string(),
347            })
348        })
349        .try_collect::<Vec<_>>()
350        .await
351}
352
353pub fn define_api<State, Types: NodeType, Ver: StaticVersionType + 'static>(
354    options: &Options,
355    _: Ver,
356    api_ver: semver::Version,
357) -> Result<Api<State, Error, Ver>, ApiError>
358where
359    State: 'static + Send + Sync + ReadState,
360    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
361    Header<Types>: QueryableHeader<Types>,
362    Payload<Types>: QueryablePayload<Types>,
363{
364    let mut api = load_api::<State, Error, Ver>(
365        options.api_path.as_ref(),
366        include_str!("../api/availability.toml"),
367        options.extensions.clone(),
368    )?;
369    let timeout = options.fetch_timeout;
370    let small_object_range_limit = options.small_object_range_limit;
371    let large_object_range_limit = options.large_object_range_limit;
372
373    api.with_version(api_ver.clone());
374
375    // `LeafQueryData` now contains `Leaf2` and `QC2``, which is a breaking change.
376    // On node startup, all leaves are migrated to `Leaf2`.
377    //
378    // To maintain compatibility with nodes running an older version
379    // (which expect `LeafQueryData` with `Leaf1` and `QC1`),
380    // we downgrade `Leaf2` to `Leaf1` and `QC2` to `QC1` if the API version is V0.
381    // Otherwise, we return the new types.
382    if api_ver.major == 0 {
383        api.at("get_leaf", move |req, state| {
384            get_leaf_handler(req, state, timeout)
385                .map(|res| res.map(downgrade_leaf_query_data))
386                .boxed()
387        })?;
388
389        api.at("get_leaf_range", move |req, state| {
390            get_leaf_range_handler(req, state, timeout, small_object_range_limit)
391                .map(|res| {
392                    res.map(|r| {
393                        r.into_iter()
394                            .map(downgrade_leaf_query_data)
395                            .collect::<Vec<Leaf1QueryData<_>>>()
396                    })
397                })
398                .boxed()
399        })?;
400
401        api.stream("stream_leaves", move |req, state| {
402            async move {
403                let height = req.integer_param("height")?;
404                state
405                    .read(|state| {
406                        async move {
407                            Ok(state
408                                .subscribe_leaves(height)
409                                .await
410                                .map(|leaf| Ok(downgrade_leaf_query_data(leaf))))
411                        }
412                        .boxed()
413                    })
414                    .await
415            }
416            .try_flatten_stream()
417            .boxed()
418        })?;
419    } else {
420        api.at("get_leaf", move |req, state| {
421            get_leaf_handler(req, state, timeout).boxed()
422        })?;
423
424        api.at("get_leaf_range", move |req, state| {
425            get_leaf_range_handler(req, state, timeout, small_object_range_limit).boxed()
426        })?;
427
428        api.stream("stream_leaves", move |req, state| {
429            async move {
430                let height = req.integer_param("height")?;
431                state
432                    .read(|state| {
433                        async move { Ok(state.subscribe_leaves(height).await.map(Ok)) }.boxed()
434                    })
435                    .await
436            }
437            .try_flatten_stream()
438            .boxed()
439        })?;
440    }
441
442    // VIDCommon data is version gated after the VID upgrade.
443    // We keep the old struct and data in the API version V0. Starting from V1 we are returning version gated structs.
444    if api_ver.major == 0 {
445        api.at("get_vid_common", move |req, state| {
446            get_vid_common_handler(req, state, timeout)
447                .map(|r| match r {
448                    Ok(data) => downgrade_vid_common_query_data(data).ok_or(Error::Custom {
449                        message: "Incompatible VID version.".to_string(),
450                        status: StatusCode::BAD_REQUEST,
451                    }),
452                    Err(e) => Err(e),
453                })
454                .boxed()
455        })?
456        .at("get_vid_common_range", move |req, state| {
457            get_vid_common_range_handler(req, state, small_object_range_limit, timeout)
458                .map(|r| match r {
459                    Ok(data) => data
460                        .into_iter()
461                        .map(downgrade_vid_common_query_data)
462                        .collect::<Option<Vec<_>>>()
463                        .ok_or(Error::Custom {
464                            message: "Incompatible VID version.".to_string(),
465                            status: StatusCode::BAD_REQUEST,
466                        }),
467                    Err(e) => Err(e),
468                })
469                .boxed()
470        })?
471        .stream("stream_vid_common", move |req, state| {
472            async move {
473                let height = req.integer_param("height")?;
474                state
475                    .read(|state| {
476                        async move {
477                            Ok(state.subscribe_vid_common(height).await.map(|data| {
478                                downgrade_vid_common_query_data(data).ok_or(Error::Custom {
479                                    message: "Incompatible VID version.".to_string(),
480                                    status: StatusCode::BAD_REQUEST,
481                                })
482                            }))
483                        }
484                        .boxed()
485                    })
486                    .await
487            }
488            .try_flatten_stream()
489            .boxed()
490        })?;
491    } else {
492        api.at("get_vid_common", move |req, state| {
493            get_vid_common_handler(req, state, timeout).boxed().boxed()
494        })?
495        .at("get_vid_common_range", move |req, state| {
496            get_vid_common_range_handler(req, state, small_object_range_limit, timeout)
497                .boxed()
498                .boxed()
499        })?
500        .stream("stream_vid_common", move |req, state| {
501            async move {
502                let height = req.integer_param("height")?;
503                state
504                    .read(|state| {
505                        async move { Ok(state.subscribe_vid_common(height).await.map(Ok)) }.boxed()
506                    })
507                    .await
508            }
509            .try_flatten_stream()
510            .boxed()
511        })?;
512    }
513
514    api.at("get_header", move |req, state| {
515        async move {
516            let id = if let Some(height) = req.opt_integer_param("height")? {
517                BlockId::Number(height)
518            } else if let Some(hash) = req.opt_blob_param("hash")? {
519                BlockId::Hash(hash)
520            } else {
521                BlockId::PayloadHash(req.blob_param("payload-hash")?)
522            };
523            let fetch = state.read(|state| state.get_header(id).boxed()).await;
524            fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
525                resource: id.to_string(),
526            })
527        }
528        .boxed()
529    })?
530    .at("get_header_range", move |req, state| {
531        async move {
532            let from = req.integer_param::<_, usize>("from")?;
533            let until = req.integer_param::<_, usize>("until")?;
534            enforce_range_limit(from, until, large_object_range_limit)?;
535
536            let headers = state
537                .read(|state| state.get_header_range(from..until).boxed())
538                .await;
539            headers
540                .enumerate()
541                .then(|(index, fetch)| async move {
542                    fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
543                        resource: (index + from).to_string(),
544                    })
545                })
546                .try_collect::<Vec<_>>()
547                .await
548        }
549        .boxed()
550    })?
551    .stream("stream_headers", move |req, state| {
552        async move {
553            let height = req.integer_param("height")?;
554            state
555                .read(|state| {
556                    async move { Ok(state.subscribe_headers(height).await.map(Ok)) }.boxed()
557                })
558                .await
559        }
560        .try_flatten_stream()
561        .boxed()
562    })?
563    .at("get_block", move |req, state| {
564        async move {
565            let id = if let Some(height) = req.opt_integer_param("height")? {
566                BlockId::Number(height)
567            } else if let Some(hash) = req.opt_blob_param("hash")? {
568                BlockId::Hash(hash)
569            } else {
570                BlockId::PayloadHash(req.blob_param("payload-hash")?)
571            };
572            let fetch = state.read(|state| state.get_block(id).boxed()).await;
573            fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
574                resource: id.to_string(),
575            })
576        }
577        .boxed()
578    })?
579    .at("get_block_range", move |req, state| {
580        async move {
581            let from = req.integer_param::<_, usize>("from")?;
582            let until = req.integer_param("until")?;
583            enforce_range_limit(from, until, large_object_range_limit)?;
584
585            let blocks = state
586                .read(|state| state.get_block_range(from..until).boxed())
587                .await;
588            blocks
589                .enumerate()
590                .then(|(index, fetch)| async move {
591                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
592                        resource: (index + from).to_string(),
593                    })
594                })
595                .try_collect::<Vec<_>>()
596                .await
597        }
598        .boxed()
599    })?
600    .stream("stream_blocks", move |req, state| {
601        async move {
602            let height = req.integer_param("height")?;
603            state
604                .read(|state| {
605                    async move { Ok(state.subscribe_blocks(height).await.map(Ok)) }.boxed()
606                })
607                .await
608        }
609        .try_flatten_stream()
610        .boxed()
611    })?
612    .at("get_payload", move |req, state| {
613        async move {
614            let id = if let Some(height) = req.opt_integer_param("height")? {
615                BlockId::Number(height)
616            } else if let Some(hash) = req.opt_blob_param("hash")? {
617                BlockId::PayloadHash(hash)
618            } else {
619                BlockId::Hash(req.blob_param("block-hash")?)
620            };
621            let fetch = state.read(|state| state.get_payload(id).boxed()).await;
622            fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
623                resource: id.to_string(),
624            })
625        }
626        .boxed()
627    })?
628    .at("get_payload_range", move |req, state| {
629        async move {
630            let from = req.integer_param::<_, usize>("from")?;
631            let until = req.integer_param("until")?;
632            enforce_range_limit(from, until, large_object_range_limit)?;
633
634            let payloads = state
635                .read(|state| state.get_payload_range(from..until).boxed())
636                .await;
637            payloads
638                .enumerate()
639                .then(|(index, fetch)| async move {
640                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
641                        resource: (index + from).to_string(),
642                    })
643                })
644                .try_collect::<Vec<_>>()
645                .await
646        }
647        .boxed()
648    })?
649    .stream("stream_payloads", move |req, state| {
650        async move {
651            let height = req.integer_param("height")?;
652            state
653                .read(|state| {
654                    async move { Ok(state.subscribe_payloads(height).await.map(Ok)) }.boxed()
655                })
656                .await
657        }
658        .try_flatten_stream()
659        .boxed()
660    })?
661    .at("get_transaction_proof", move |req, state| {
662        async move {
663            let tx = get_transaction(req, state, timeout).await?;
664            let height = tx.block.height();
665            let vid = state
666                .read(|state| state.get_vid_common(height as usize))
667                .await
668                .with_timeout(timeout)
669                .await
670                .context(FetchBlockSnafu {
671                    resource: height.to_string(),
672                })?;
673            let proof = tx.block.transaction_proof(&vid, &tx.index).context(
674                InvalidTransactionIndexSnafu {
675                    height,
676                    index: tx.transaction.index(),
677                },
678            )?;
679            Ok(TransactionWithProofQueryData::new(tx.transaction, proof))
680        }
681        .boxed()
682    })?
683    .at("get_transaction", move |req, state| {
684        async move { Ok(get_transaction(req, state, timeout).await?.transaction) }.boxed()
685    })?
686    .stream("stream_transactions", move |req, state| {
687        async move {
688            let height = req.integer_param::<_, usize>("height")?;
689
690            let namespace: Option<i64> = req
691                .opt_integer_param::<_, usize>("namespace")?
692                .map(|i| {
693                    i.try_into().map_err(|err| Error::Custom {
694                        message: format!(
695                            "Invalid 'namespace': could not convert usize to i64: {err}"
696                        ),
697                        status: StatusCode::BAD_REQUEST,
698                    })
699                })
700                .transpose()?;
701
702            state
703                .read(|state| {
704                    async move {
705                        Ok(state
706                            .subscribe_blocks(height)
707                            .await
708                            .map(move |block| {
709                                let transactions = block.enumerate().enumerate();
710                                let header = block.header();
711                                let filtered_txs = transactions
712                                    .filter_map(|(i, (index, _tx))| {
713                                        if let Some(requested_ns) = namespace {
714                                            let ns_id = QueryableHeader::<Types>::namespace_id(
715                                                header,
716                                                &index.ns_index,
717                                            )?;
718
719                                            if ns_id.into() != requested_ns {
720                                                return None;
721                                            }
722                                        }
723
724                                        let tx = block.transaction(&index)?;
725                                        TransactionQueryData::new(tx, &block, &index, i as u64)
726                                    })
727                                    .collect::<Vec<_>>();
728
729                                futures::stream::iter(filtered_txs.into_iter().map(Ok))
730                            })
731                            .flatten())
732                    }
733                    .boxed()
734                })
735                .await
736        }
737        .try_flatten_stream()
738        .boxed()
739    })?
740    .at("get_block_summary", move |req, state| {
741        async move {
742            let id: usize = req.integer_param("height")?;
743
744            let fetch = state.read(|state| state.get_block(id).boxed()).await;
745            fetch
746                .with_timeout(timeout)
747                .await
748                .context(FetchBlockSnafu {
749                    resource: id.to_string(),
750                })
751                .map(BlockSummaryQueryData::from)
752        }
753        .boxed()
754    })?
755    .at("get_block_summary_range", move |req, state| {
756        async move {
757            let from: usize = req.integer_param("from")?;
758            let until: usize = req.integer_param("until")?;
759            enforce_range_limit(from, until, large_object_range_limit)?;
760
761            let blocks = state
762                .read(|state| state.get_block_range(from..until).boxed())
763                .await;
764            let result: Vec<BlockSummaryQueryData<Types>> = blocks
765                .enumerate()
766                .then(|(index, fetch)| async move {
767                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
768                        resource: (index + from).to_string(),
769                    })
770                })
771                .map(|result| result.map(BlockSummaryQueryData::from))
772                .try_collect()
773                .await?;
774
775            Ok(result)
776        }
777        .boxed()
778    })?
779    .at("get_limits", move |_req, _state| {
780        async move {
781            Ok(Limits {
782                small_object_range_limit,
783                large_object_range_limit,
784            })
785        }
786        .boxed()
787    })?;
788    Ok(api)
789}
790
791fn enforce_range_limit(from: usize, until: usize, limit: usize) -> Result<(), Error> {
792    if until.saturating_sub(from) > limit {
793        return Err(Error::RangeLimit { from, until, limit });
794    }
795    Ok(())
796}
797
798async fn get_transaction<Types, State>(
799    req: RequestParams,
800    state: &State,
801    timeout: Duration,
802) -> Result<BlockWithTransaction<Types>, Error>
803where
804    Types: NodeType,
805    Header<Types>: QueryableHeader<Types>,
806    Payload<Types>: QueryablePayload<Types>,
807    State: 'static + Send + Sync + ReadState,
808    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
809{
810    match req.opt_blob_param("hash")? {
811        Some(hash) => state
812            .read(|state| state.get_block_containing_transaction(hash).boxed())
813            .await
814            .with_timeout(timeout)
815            .await
816            .context(FetchTransactionSnafu {
817                resource: hash.to_string(),
818            }),
819        None => {
820            let height: u64 = req.integer_param("height")?;
821            let fetch = state
822                .read(|state| state.get_block(height as usize).boxed())
823                .await;
824            let block = fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
825                resource: height.to_string(),
826            })?;
827            let i: u64 = req.integer_param("index")?;
828            let index = block
829                .payload()
830                .nth(block.metadata(), i as usize)
831                .context(InvalidTransactionIndexSnafu { height, index: i })?;
832            let transaction = block
833                .transaction(&index)
834                .context(InvalidTransactionIndexSnafu { height, index: i })?;
835            let transaction = TransactionQueryData::new(transaction, &block, &index, i)
836                .context(InvalidTransactionIndexSnafu { height, index: i })?;
837            Ok(BlockWithTransaction {
838                transaction,
839                block,
840                index,
841            })
842        },
843    }
844}
845
846#[cfg(test)]
847mod test {
848    use std::{fmt::Debug, time::Duration};
849
850    use async_lock::RwLock;
851    use committable::Committable;
852    use futures::future::FutureExt;
853    use hotshot_example_types::node_types::TEST_VERSIONS;
854    use hotshot_types::{data::Leaf2, simple_certificate::QuorumCertificate2};
855    use serde::de::DeserializeOwned;
856    use surf_disco::{Client, Error as _};
857    use tempfile::TempDir;
858    use test_utils::reserve_tcp_port;
859    use tide_disco::App;
860    use toml::toml;
861
862    use super::*;
863    use crate::{
864        ApiState, Error, Header,
865        data_source::{ExtensibleDataSource, VersionedDataSource, storage::AvailabilityStorage},
866        status::StatusDataSource,
867        task::BackgroundTask,
868        testing::{
869            consensus::{MockDataSource, MockNetwork, MockSqlDataSource},
870            mocks::{MOCK_UPGRADE, MockBase, MockHeader, MockPayload, MockTypes, mock_transaction},
871        },
872        types::HeightIndexed,
873    };
874
875    /// Get the current ledger height and a list of non-empty leaf/block pairs.
876    async fn get_non_empty_blocks(
877        client: &Client<Error, MockBase>,
878    ) -> (
879        u64,
880        Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>,
881    ) {
882        let mut blocks = vec![];
883        // Ignore the genesis block (start from height 1).
884        for i in 1.. {
885            match client
886                .get::<BlockQueryData<MockTypes>>(&format!("block/{i}"))
887                .send()
888                .await
889            {
890                Ok(block) => {
891                    if !block.is_empty() {
892                        let leaf = client.get(&format!("leaf/{i}")).send().await.unwrap();
893                        blocks.push((leaf, block));
894                    }
895                },
896                Err(Error::Availability {
897                    source: super::Error::FetchBlock { .. },
898                }) => {
899                    tracing::info!(
900                        "found end of ledger at height {i}, non-empty blocks are {blocks:?}",
901                    );
902                    return (i, blocks);
903                },
904                Err(err) => panic!("unexpected error {err}"),
905            }
906        }
907        unreachable!()
908    }
909
910    async fn validate(client: &Client<Error, MockBase>, height: u64) {
911        // Check the consistency of every block/leaf pair.
912        for i in 0..height {
913            // Limit the number of blocks we validate in order to
914            // speed up the tests.
915            if ![0, 1, height / 2, height - 1].contains(&i) {
916                continue;
917            }
918            tracing::info!("validate block {i}/{height}");
919
920            // Check that looking up the leaf various ways returns the correct leaf.
921            let leaf: LeafQueryData<MockTypes> =
922                client.get(&format!("leaf/{i}")).send().await.unwrap();
923            assert_eq!(leaf.height(), i);
924            assert_eq!(
925                leaf,
926                client
927                    .get(&format!("leaf/hash/{}", leaf.hash()))
928                    .send()
929                    .await
930                    .unwrap()
931            );
932
933            // Check that looking up the block various ways returns the correct block.
934            let block: BlockQueryData<MockTypes> =
935                client.get(&format!("block/{i}")).send().await.unwrap();
936            let expected_payload = PayloadQueryData::from(block.clone());
937            assert_eq!(leaf.block_hash(), block.hash());
938            assert_eq!(block.height(), i);
939            assert_eq!(
940                block,
941                client
942                    .get(&format!("block/hash/{}", block.hash()))
943                    .send()
944                    .await
945                    .unwrap()
946            );
947            assert_eq!(
948                *block.header(),
949                client.get(&format!("header/{i}")).send().await.unwrap()
950            );
951            assert_eq!(
952                *block.header(),
953                client
954                    .get(&format!("header/hash/{}", block.hash()))
955                    .send()
956                    .await
957                    .unwrap()
958            );
959            assert_eq!(
960                expected_payload,
961                client.get(&format!("payload/{i}")).send().await.unwrap(),
962            );
963            assert_eq!(
964                expected_payload,
965                client
966                    .get(&format!("payload/block-hash/{}", block.hash()))
967                    .send()
968                    .await
969                    .unwrap(),
970            );
971            // Look up the common VID data.
972            let common: VidCommonQueryData<MockTypes> = client
973                .get(&format!("vid/common/{}", block.height()))
974                .send()
975                .await
976                .unwrap();
977            assert_eq!(common.height(), block.height());
978            assert_eq!(common.block_hash(), block.hash());
979            assert_eq!(common.payload_hash(), block.payload_hash());
980            assert_eq!(
981                common,
982                client
983                    .get(&format!("vid/common/hash/{}", block.hash()))
984                    .send()
985                    .await
986                    .unwrap()
987            );
988
989            let block_summary = client
990                .get(&format!("block/summary/{i}"))
991                .send()
992                .await
993                .unwrap();
994            assert_eq!(
995                BlockSummaryQueryData::<MockTypes>::from(block.clone()),
996                block_summary,
997            );
998            assert_eq!(block_summary.header(), block.header());
999            assert_eq!(block_summary.hash(), block.hash());
1000            assert_eq!(block_summary.size(), block.size());
1001            assert_eq!(block_summary.num_transactions(), block.num_transactions());
1002
1003            let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
1004                .get(&format!("block/summaries/{}/{}", 0, i))
1005                .send()
1006                .await
1007                .unwrap();
1008            assert_eq!(block_summaries.len() as u64, i);
1009
1010            // We should be able to look up the block by payload hash. Note that for duplicate
1011            // payloads, these endpoints may return a different block with the same payload, which
1012            // is acceptable. Therefore, we don't check equivalence of the entire `BlockQueryData`
1013            // response, only its payload.
1014            assert_eq!(
1015                block.payload(),
1016                client
1017                    .get::<BlockQueryData<MockTypes>>(&format!(
1018                        "block/payload-hash/{}",
1019                        block.payload_hash()
1020                    ))
1021                    .send()
1022                    .await
1023                    .unwrap()
1024                    .payload()
1025            );
1026            assert_eq!(
1027                block.payload_hash(),
1028                client
1029                    .get::<Header<MockTypes>>(&format!(
1030                        "header/payload-hash/{}",
1031                        block.payload_hash()
1032                    ))
1033                    .send()
1034                    .await
1035                    .unwrap()
1036                    .payload_commitment
1037            );
1038            assert_eq!(
1039                block.payload(),
1040                client
1041                    .get::<PayloadQueryData<MockTypes>>(&format!(
1042                        "payload/hash/{}",
1043                        block.payload_hash()
1044                    ))
1045                    .send()
1046                    .await
1047                    .unwrap()
1048                    .data(),
1049            );
1050            assert_eq!(
1051                common.common(),
1052                client
1053                    .get::<VidCommonQueryData<MockTypes>>(&format!(
1054                        "vid/common/payload-hash/{}",
1055                        block.payload_hash()
1056                    ))
1057                    .send()
1058                    .await
1059                    .unwrap()
1060                    .common()
1061            );
1062
1063            // Check that looking up each transaction in the block various ways returns the correct
1064            // transaction.
1065            for (j, txn_from_block) in block.enumerate() {
1066                let txn: TransactionQueryData<MockTypes> = client
1067                    .get(&format!("transaction/{}/{}/noproof", i, j.position))
1068                    .send()
1069                    .await
1070                    .unwrap();
1071                assert_eq!(txn.block_height(), i);
1072                assert_eq!(txn.block_hash(), block.hash());
1073                assert_eq!(txn.index(), j.position as u64);
1074                assert_eq!(txn.hash(), txn_from_block.commit());
1075                assert_eq!(txn.transaction(), &txn_from_block);
1076                // We should be able to look up the transaction by hash. Note that for duplicate
1077                // transactions, this endpoint may return a different transaction with the same
1078                // hash, which is acceptable. Therefore, we don't check equivalence of the entire
1079                // `TransactionWithProofQueryData` response, only its commitment.
1080                assert_eq!(
1081                    txn.hash(),
1082                    client
1083                        .get::<TransactionQueryData<MockTypes>>(&format!(
1084                            "transaction/hash/{}/noproof",
1085                            txn.hash()
1086                        ))
1087                        .send()
1088                        .await
1089                        .unwrap()
1090                        .hash()
1091                );
1092
1093                let tx_with_proof = client
1094                    .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1095                        "transaction/{}/{}/proof",
1096                        i, j.position
1097                    ))
1098                    .send()
1099                    .await
1100                    .unwrap();
1101                assert_eq!(txn.hash(), tx_with_proof.hash());
1102                assert!(tx_with_proof.proof().verify(
1103                    block.metadata(),
1104                    txn.transaction(),
1105                    &block.payload_hash(),
1106                    common.common()
1107                ));
1108
1109                // Similar to above, but by hash
1110                let tx_with_proof = client
1111                    .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1112                        "transaction/hash/{}/proof",
1113                        txn.hash()
1114                    ))
1115                    .send()
1116                    .await
1117                    .unwrap();
1118                assert_eq!(txn.hash(), tx_with_proof.hash());
1119                assert!(tx_with_proof.proof().verify(
1120                    block.metadata(),
1121                    txn.transaction(),
1122                    &block.payload_hash(),
1123                    common.common()
1124                ));
1125            }
1126
1127            let block_range: Vec<BlockQueryData<MockTypes>> = client
1128                .get(&format!("block/{}/{}", 0, i))
1129                .send()
1130                .await
1131                .unwrap();
1132
1133            assert_eq!(block_range.len() as u64, i);
1134
1135            let leaf_range: Vec<LeafQueryData<MockTypes>> = client
1136                .get(&format!("leaf/{}/{}", 0, i))
1137                .send()
1138                .await
1139                .unwrap();
1140
1141            assert_eq!(leaf_range.len() as u64, i);
1142
1143            let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1144                .get(&format!("payload/{}/{}", 0, i))
1145                .send()
1146                .await
1147                .unwrap();
1148
1149            assert_eq!(payload_range.len() as u64, i);
1150
1151            let vid_common_range: Vec<VidCommonQueryData<MockTypes>> = client
1152                .get(&format!("vid/common/{}/{}", 0, i))
1153                .send()
1154                .await
1155                .unwrap();
1156
1157            assert_eq!(vid_common_range.len() as u64, i);
1158
1159            let header_range: Vec<Header<MockTypes>> = client
1160                .get(&format!("header/{}/{}", 0, i))
1161                .send()
1162                .await
1163                .unwrap();
1164
1165            assert_eq!(header_range.len() as u64, i);
1166        }
1167    }
1168
1169    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1170    async fn test_api() {
1171        // Create the consensus network.
1172        let mut network = MockNetwork::<MockDataSource>::init().await;
1173        network.start().await;
1174
1175        // Start the web server.
1176        let port = reserve_tcp_port().unwrap();
1177        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1178        let options = Options {
1179            small_object_range_limit: 500,
1180            large_object_range_limit: 500,
1181            ..Default::default()
1182        };
1183
1184        app.register_module(
1185            "availability",
1186            define_api(&options, MockBase::instance(), "1.0.0".parse().unwrap()).unwrap(),
1187        )
1188        .unwrap();
1189        network.spawn(
1190            "server",
1191            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1192        );
1193
1194        // Start a client.
1195        let client = Client::<Error, MockBase>::new(
1196            format!("http://localhost:{port}/availability")
1197                .parse()
1198                .unwrap(),
1199        );
1200        assert!(client.connect(Some(Duration::from_secs(60))).await);
1201        assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1202
1203        // Submit a few blocks and make sure each one gets reflected in the query service and
1204        // preserves the consistency of the data and indices.
1205        let leaves = client
1206            .socket("stream/leaves/0")
1207            .subscribe::<LeafQueryData<MockTypes>>()
1208            .await
1209            .unwrap();
1210        let headers = client
1211            .socket("stream/headers/0")
1212            .subscribe::<Header<MockTypes>>()
1213            .await
1214            .unwrap();
1215        let blocks = client
1216            .socket("stream/blocks/0")
1217            .subscribe::<BlockQueryData<MockTypes>>()
1218            .await
1219            .unwrap();
1220        let vid_common = client
1221            .socket("stream/vid/common/0")
1222            .subscribe::<VidCommonQueryData<MockTypes>>()
1223            .await
1224            .unwrap();
1225        let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1226        for nonce in 0..3 {
1227            let txn = mock_transaction(vec![nonce]);
1228            network.submit_transaction(txn).await;
1229
1230            // Wait for the transaction to be finalized.
1231            let (i, leaf, block, common) = loop {
1232                tracing::info!("waiting for block with transaction {}", nonce);
1233                let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1234                tracing::info!(i, ?leaf, ?header, ?block, ?common);
1235                let leaf = leaf.unwrap();
1236                let header = header.unwrap();
1237                let block = block.unwrap();
1238                let common = common.unwrap();
1239                assert_eq!(leaf.height() as usize, i);
1240                assert_eq!(leaf.block_hash(), block.hash());
1241                assert_eq!(block.header(), &header);
1242                assert_eq!(common.height() as usize, i);
1243                if !block.is_empty() {
1244                    break (i, leaf, block, common);
1245                }
1246            };
1247            assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1248            assert_eq!(
1249                block,
1250                client.get(&format!("block/{i}")).send().await.unwrap()
1251            );
1252            assert_eq!(
1253                common,
1254                client.get(&format!("vid/common/{i}")).send().await.unwrap()
1255            );
1256
1257            validate(&client, (i + 1) as u64).await;
1258        }
1259
1260        network.shut_down().await;
1261    }
1262
1263    async fn validate_old(client: &Client<Error, MockBase>, height: u64) {
1264        // Check the consistency of every block/leaf pair.
1265        for i in 0..height {
1266            // Limit the number of blocks we validate in order to
1267            // speed up the tests.
1268            if ![0, 1, height / 2, height - 1].contains(&i) {
1269                continue;
1270            }
1271            tracing::info!("validate block {i}/{height}");
1272
1273            // Check that looking up the leaf various ways returns the correct leaf.
1274            let leaf: Leaf1QueryData<MockTypes> =
1275                client.get(&format!("leaf/{i}")).send().await.unwrap();
1276            assert_eq!(leaf.leaf.height(), i);
1277            assert_eq!(
1278                leaf,
1279                client
1280                    .get(&format!(
1281                        "leaf/hash/{}",
1282                        <Leaf<MockTypes> as Committable>::commit(&leaf.leaf)
1283                    ))
1284                    .send()
1285                    .await
1286                    .unwrap()
1287            );
1288
1289            // Check that looking up the block various ways returns the correct block.
1290            let block: BlockQueryData<MockTypes> =
1291                client.get(&format!("block/{i}")).send().await.unwrap();
1292            let expected_payload = PayloadQueryData::from(block.clone());
1293            assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1294            assert_eq!(block.height(), i);
1295            assert_eq!(
1296                block,
1297                client
1298                    .get(&format!("block/hash/{}", block.hash()))
1299                    .send()
1300                    .await
1301                    .unwrap()
1302            );
1303            assert_eq!(
1304                *block.header(),
1305                client.get(&format!("header/{i}")).send().await.unwrap()
1306            );
1307            assert_eq!(
1308                *block.header(),
1309                client
1310                    .get(&format!("header/hash/{}", block.hash()))
1311                    .send()
1312                    .await
1313                    .unwrap()
1314            );
1315            assert_eq!(
1316                expected_payload,
1317                client.get(&format!("payload/{i}")).send().await.unwrap(),
1318            );
1319            assert_eq!(
1320                expected_payload,
1321                client
1322                    .get(&format!("payload/block-hash/{}", block.hash()))
1323                    .send()
1324                    .await
1325                    .unwrap(),
1326            );
1327            // Look up the common VID data.
1328            let common: ADVZCommonQueryData<MockTypes> = client
1329                .get(&format!("vid/common/{}", block.height()))
1330                .send()
1331                .await
1332                .unwrap();
1333            assert_eq!(common.height(), block.height());
1334            assert_eq!(common.block_hash(), block.hash());
1335            assert_eq!(
1336                VidCommitment::V0(common.payload_hash()),
1337                block.payload_hash(),
1338            );
1339            assert_eq!(
1340                common,
1341                client
1342                    .get(&format!("vid/common/hash/{}", block.hash()))
1343                    .send()
1344                    .await
1345                    .unwrap()
1346            );
1347
1348            let block_summary = client
1349                .get(&format!("block/summary/{i}"))
1350                .send()
1351                .await
1352                .unwrap();
1353            assert_eq!(
1354                BlockSummaryQueryData::<MockTypes>::from(block.clone()),
1355                block_summary,
1356            );
1357            assert_eq!(block_summary.header(), block.header());
1358            assert_eq!(block_summary.hash(), block.hash());
1359            assert_eq!(block_summary.size(), block.size());
1360            assert_eq!(block_summary.num_transactions(), block.num_transactions());
1361
1362            let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
1363                .get(&format!("block/summaries/{}/{}", 0, i))
1364                .send()
1365                .await
1366                .unwrap();
1367            assert_eq!(block_summaries.len() as u64, i);
1368
1369            // We should be able to look up the block by payload hash. Note that for duplicate
1370            // payloads, these endpoints may return a different block with the same payload, which
1371            // is acceptable. Therefore, we don't check equivalence of the entire `BlockQueryData`
1372            // response, only its payload.
1373            assert_eq!(
1374                block.payload(),
1375                client
1376                    .get::<BlockQueryData<MockTypes>>(&format!(
1377                        "block/payload-hash/{}",
1378                        block.payload_hash()
1379                    ))
1380                    .send()
1381                    .await
1382                    .unwrap()
1383                    .payload()
1384            );
1385            assert_eq!(
1386                block.payload_hash(),
1387                client
1388                    .get::<Header<MockTypes>>(&format!(
1389                        "header/payload-hash/{}",
1390                        block.payload_hash()
1391                    ))
1392                    .send()
1393                    .await
1394                    .unwrap()
1395                    .payload_commitment
1396            );
1397            assert_eq!(
1398                block.payload(),
1399                client
1400                    .get::<PayloadQueryData<MockTypes>>(&format!(
1401                        "payload/hash/{}",
1402                        block.payload_hash()
1403                    ))
1404                    .send()
1405                    .await
1406                    .unwrap()
1407                    .data(),
1408            );
1409            assert_eq!(
1410                common.common(),
1411                client
1412                    .get::<ADVZCommonQueryData<MockTypes>>(&format!(
1413                        "vid/common/payload-hash/{}",
1414                        block.payload_hash()
1415                    ))
1416                    .send()
1417                    .await
1418                    .unwrap()
1419                    .common()
1420            );
1421
1422            // Check that looking up each transaction in the block various ways returns the correct
1423            // transaction.
1424            for (j, txn_from_block) in block.enumerate() {
1425                let txn: TransactionQueryData<MockTypes> = client
1426                    .get(&format!("transaction/{}/{}/noproof", i, j.position))
1427                    .send()
1428                    .await
1429                    .unwrap();
1430                assert_eq!(txn.block_height(), i);
1431                assert_eq!(txn.block_hash(), block.hash());
1432                assert_eq!(txn.index(), j.position as u64);
1433                assert_eq!(txn.hash(), txn_from_block.commit());
1434                assert_eq!(txn.transaction(), &txn_from_block);
1435                // We should be able to look up the transaction by hash. Note that for duplicate
1436                // transactions, this endpoint may return a different transaction with the same
1437                // hash, which is acceptable. Therefore, we don't check equivalence of the entire
1438                // `TransactionQueryData` response, only its commitment.
1439                assert_eq!(
1440                    txn.hash(),
1441                    client
1442                        .get::<TransactionQueryData<MockTypes>>(&format!(
1443                            "transaction/hash/{}/noproof",
1444                            txn.hash()
1445                        ))
1446                        .send()
1447                        .await
1448                        .unwrap()
1449                        .hash()
1450                );
1451
1452                assert_eq!(
1453                    txn.hash(),
1454                    client
1455                        .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1456                            "transaction/{}/{}/proof",
1457                            i, j.position
1458                        ))
1459                        .send()
1460                        .await
1461                        .unwrap()
1462                        .hash()
1463                );
1464
1465                assert_eq!(
1466                    txn.hash(),
1467                    client
1468                        .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1469                            "transaction/hash/{}/proof",
1470                            txn.hash()
1471                        ))
1472                        .send()
1473                        .await
1474                        .unwrap()
1475                        .hash()
1476                );
1477            }
1478
1479            let block_range: Vec<BlockQueryData<MockTypes>> = client
1480                .get(&format!("block/{}/{}", 0, i))
1481                .send()
1482                .await
1483                .unwrap();
1484
1485            assert_eq!(block_range.len() as u64, i);
1486
1487            let leaf_range: Vec<Leaf1QueryData<MockTypes>> = client
1488                .get(&format!("leaf/{}/{}", 0, i))
1489                .send()
1490                .await
1491                .unwrap();
1492
1493            assert_eq!(leaf_range.len() as u64, i);
1494
1495            let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1496                .get(&format!("payload/{}/{}", 0, i))
1497                .send()
1498                .await
1499                .unwrap();
1500
1501            assert_eq!(payload_range.len() as u64, i);
1502
1503            let vid_common_range: Vec<ADVZCommonQueryData<MockTypes>> = client
1504                .get(&format!("vid/common/{}/{}", 0, i))
1505                .send()
1506                .await
1507                .unwrap();
1508
1509            assert_eq!(vid_common_range.len() as u64, i);
1510
1511            let header_range: Vec<Header<MockTypes>> = client
1512                .get(&format!("header/{}/{}", 0, i))
1513                .send()
1514                .await
1515                .unwrap();
1516
1517            assert_eq!(header_range.len() as u64, i);
1518        }
1519    }
1520
1521    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1522    async fn test_api_epochs() {
1523        // Create the consensus network.
1524        let mut network = MockNetwork::<MockDataSource>::init().await;
1525        let epoch_height = network.epoch_height();
1526        network.start().await;
1527
1528        // Start the web server.
1529        let port = reserve_tcp_port().unwrap();
1530        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1531        app.register_module(
1532            "availability",
1533            define_api(
1534                &Default::default(),
1535                MockBase::instance(),
1536                "1.0.0".parse().unwrap(),
1537            )
1538            .unwrap(),
1539        )
1540        .unwrap();
1541        network.spawn(
1542            "server",
1543            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1544        );
1545
1546        // Start a client.
1547        let client = Client::<Error, MockBase>::new(
1548            format!("http://localhost:{port}/availability")
1549                .parse()
1550                .unwrap(),
1551        );
1552        assert!(client.connect(Some(Duration::from_secs(60))).await);
1553
1554        // Submit a few blocks and make sure each one gets reflected in the query service and
1555        // preserves the consistency of the data and indices.
1556        let headers = client
1557            .socket("stream/headers/0")
1558            .subscribe::<Header<MockTypes>>()
1559            .await
1560            .unwrap();
1561        let mut chain = headers.enumerate();
1562
1563        loop {
1564            let (i, header) = chain.next().await.unwrap();
1565            let header = header.unwrap();
1566            assert_eq!(header.height(), i as u64);
1567            if header.height() >= 3 * epoch_height {
1568                break;
1569            }
1570        }
1571
1572        network.shut_down().await;
1573    }
1574
1575    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1576    async fn test_old_api() {
1577        // Create the consensus network.
1578        let mut network = MockNetwork::<MockDataSource>::init().await;
1579        network.start().await;
1580
1581        // Start the web server.
1582        let port = reserve_tcp_port().unwrap();
1583
1584        let options = Options {
1585            small_object_range_limit: 500,
1586            large_object_range_limit: 500,
1587            ..Default::default()
1588        };
1589
1590        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1591        app.register_module(
1592            "availability",
1593            define_api(&options, MockBase::instance(), "0.1.0".parse().unwrap()).unwrap(),
1594        )
1595        .unwrap();
1596        network.spawn(
1597            "server",
1598            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1599        );
1600
1601        // Start a client.
1602        let client = Client::<Error, MockBase>::new(
1603            format!("http://localhost:{port}/availability")
1604                .parse()
1605                .unwrap(),
1606        );
1607        assert!(client.connect(Some(Duration::from_secs(60))).await);
1608        assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1609
1610        // Submit a few blocks and make sure each one gets reflected in the query service and
1611        // preserves the consistency of the data and indices.
1612        let leaves = client
1613            .socket("stream/leaves/0")
1614            .subscribe::<Leaf1QueryData<MockTypes>>()
1615            .await
1616            .unwrap();
1617        let headers = client
1618            .socket("stream/headers/0")
1619            .subscribe::<Header<MockTypes>>()
1620            .await
1621            .unwrap();
1622        let blocks = client
1623            .socket("stream/blocks/0")
1624            .subscribe::<BlockQueryData<MockTypes>>()
1625            .await
1626            .unwrap();
1627        let vid_common = client
1628            .socket("stream/vid/common/0")
1629            .subscribe::<ADVZCommonQueryData<MockTypes>>()
1630            .await
1631            .unwrap();
1632        let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1633        for nonce in 0..3 {
1634            let txn = mock_transaction(vec![nonce]);
1635            network.submit_transaction(txn).await;
1636
1637            // Wait for the transaction to be finalized.
1638            let (i, leaf, block, common) = loop {
1639                tracing::info!("waiting for block with transaction {}", nonce);
1640                let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1641                tracing::info!(i, ?leaf, ?header, ?block, ?common);
1642                let leaf = leaf.unwrap();
1643                let header = header.unwrap();
1644                let block = block.unwrap();
1645                let common = common.unwrap();
1646                assert_eq!(leaf.leaf.height() as usize, i);
1647                assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1648                assert_eq!(block.header(), &header);
1649                assert_eq!(common.height() as usize, i);
1650                if !block.is_empty() {
1651                    break (i, leaf, block, common);
1652                }
1653            };
1654            assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1655            assert_eq!(
1656                block,
1657                client.get(&format!("block/{i}")).send().await.unwrap()
1658            );
1659            assert_eq!(
1660                common,
1661                client.get(&format!("vid/common/{i}")).send().await.unwrap()
1662            );
1663
1664            validate_old(&client, (i + 1) as u64).await;
1665        }
1666
1667        network.shut_down().await;
1668    }
1669
1670    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1671    async fn test_extensions() {
1672        let dir = TempDir::with_prefix("test_availability_extensions").unwrap();
1673        let data_source = ExtensibleDataSource::new(
1674            MockDataSource::create(dir.path(), Default::default())
1675                .await
1676                .unwrap(),
1677            0,
1678        );
1679
1680        // mock up some consensus data.
1681        let leaf = Leaf2::<MockTypes>::genesis(
1682            &Default::default(),
1683            &Default::default(),
1684            MOCK_UPGRADE.base,
1685        )
1686        .await;
1687        let qc = QuorumCertificate2::genesis(
1688            &Default::default(),
1689            &Default::default(),
1690            TEST_VERSIONS.test,
1691        )
1692        .await;
1693        let leaf = LeafQueryData::new(leaf, qc).unwrap();
1694        let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis());
1695        data_source
1696            .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1697            .await
1698            .unwrap();
1699
1700        // assert that the store has data before we move on to API requests
1701        assert_eq!(
1702            ExtensibleDataSource::<MockDataSource, u64>::block_height(&data_source)
1703                .await
1704                .unwrap(),
1705            1
1706        );
1707        assert_eq!(block, data_source.get_block(0).await.await);
1708
1709        // Create the API extensions specification.
1710        let extensions = toml! {
1711            [route.post_ext]
1712            PATH = ["/ext/:val"]
1713            METHOD = "POST"
1714            ":val" = "Integer"
1715
1716            [route.get_ext]
1717            PATH = ["/ext"]
1718            METHOD = "GET"
1719        };
1720
1721        let mut api =
1722            define_api::<RwLock<ExtensibleDataSource<MockDataSource, u64>>, MockTypes, MockBase>(
1723                &Options {
1724                    extensions: vec![extensions.into()],
1725                    ..Default::default()
1726                },
1727                MockBase::instance(),
1728                "1.0.0".parse().unwrap(),
1729            )
1730            .unwrap();
1731        api.get("get_ext", |_, state| {
1732            async move { Ok(*state.as_ref()) }.boxed()
1733        })
1734        .unwrap()
1735        .post("post_ext", |req, state| {
1736            async move {
1737                *state.as_mut() = req.integer_param("val")?;
1738                Ok(())
1739            }
1740            .boxed()
1741        })
1742        .unwrap();
1743
1744        let mut app = App::<_, Error>::with_state(RwLock::new(data_source));
1745        app.register_module("availability", api).unwrap();
1746
1747        let port = reserve_tcp_port().unwrap();
1748        let _server = BackgroundTask::spawn(
1749            "server",
1750            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1751        );
1752
1753        let client = Client::<Error, MockBase>::new(
1754            format!("http://localhost:{port}/availability")
1755                .parse()
1756                .unwrap(),
1757        );
1758        assert!(client.connect(Some(Duration::from_secs(60))).await);
1759
1760        assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 0);
1761        client.post::<()>("ext/42").send().await.unwrap();
1762        assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 42);
1763
1764        // Ensure we can still access the built-in functionality.
1765        assert_eq!(
1766            client
1767                .get::<MockHeader>("header/0")
1768                .send()
1769                .await
1770                .unwrap()
1771                .block_number,
1772            0
1773        );
1774    }
1775
1776    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1777    async fn test_range_limit() {
1778        let large_object_range_limit = 2;
1779        let small_object_range_limit = 3;
1780
1781        // Create the consensus network.
1782        let mut network = MockNetwork::<MockDataSource>::init().await;
1783        network.start().await;
1784
1785        // Start the web server.
1786        let port = reserve_tcp_port().unwrap();
1787        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1788        app.register_module(
1789            "availability",
1790            define_api(
1791                &Options {
1792                    large_object_range_limit,
1793                    small_object_range_limit,
1794                    ..Default::default()
1795                },
1796                MockBase::instance(),
1797                "1.0.0".parse().unwrap(),
1798            )
1799            .unwrap(),
1800        )
1801        .unwrap();
1802        network.spawn(
1803            "server",
1804            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1805        );
1806
1807        // Start a client.
1808        let client = Client::<Error, MockBase>::new(
1809            format!("http://localhost:{port}/availability")
1810                .parse()
1811                .unwrap(),
1812        );
1813        assert!(client.connect(Some(Duration::from_secs(60))).await);
1814
1815        // Check reported limits.
1816        assert_eq!(
1817            client.get::<Limits>("limits").send().await.unwrap(),
1818            Limits {
1819                small_object_range_limit,
1820                large_object_range_limit
1821            }
1822        );
1823
1824        // Wait for enough blocks to be produced.
1825        client
1826            .socket("stream/blocks/0")
1827            .subscribe::<BlockQueryData<MockTypes>>()
1828            .await
1829            .unwrap()
1830            .take(small_object_range_limit + 1)
1831            .try_collect::<Vec<_>>()
1832            .await
1833            .unwrap();
1834
1835        async fn check_limit<T: DeserializeOwned + Debug>(
1836            client: &Client<Error, MockBase>,
1837            req: &str,
1838            limit: usize,
1839        ) {
1840            let range: Vec<T> = client
1841                .get(&format!("{req}/0/{limit}"))
1842                .send()
1843                .await
1844                .unwrap();
1845            assert_eq!(range.len(), limit);
1846            let err = client
1847                .get::<Vec<T>>(&format!("{req}/0/{}", limit + 1))
1848                .send()
1849                .await
1850                .unwrap_err();
1851            assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1852        }
1853
1854        check_limit::<LeafQueryData<MockTypes>>(&client, "leaf", small_object_range_limit).await;
1855        check_limit::<Header<MockTypes>>(&client, "header", large_object_range_limit).await;
1856        check_limit::<BlockQueryData<MockTypes>>(&client, "block", large_object_range_limit).await;
1857        check_limit::<PayloadQueryData<MockTypes>>(&client, "payload", large_object_range_limit)
1858            .await;
1859        check_limit::<BlockSummaryQueryData<MockTypes>>(
1860            &client,
1861            "block/summaries",
1862            large_object_range_limit,
1863        )
1864        .await;
1865
1866        network.shut_down().await;
1867    }
1868
1869    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1870    async fn test_header_endpoint() {
1871        // Create the consensus network.
1872        let mut network = MockNetwork::<MockSqlDataSource>::init().await;
1873        network.start().await;
1874
1875        // Start the web server.
1876        let port = reserve_tcp_port().unwrap();
1877        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1878        app.register_module(
1879            "availability",
1880            define_api(
1881                &Default::default(),
1882                MockBase::instance(),
1883                "1.0.0".parse().unwrap(),
1884            )
1885            .unwrap(),
1886        )
1887        .unwrap();
1888        network.spawn(
1889            "server",
1890            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1891        );
1892
1893        let ds = network.data_source();
1894
1895        // Get the current block height and fetch header for some later block height
1896        // This fetch will only resolve when we receive a leaf or block for that block height
1897        let block_height = ds.block_height().await.unwrap();
1898        let fetch = ds
1899            .get_header(BlockId::<MockTypes>::Number(block_height + 25))
1900            .await;
1901
1902        assert!(fetch.is_pending());
1903        let header = fetch.await;
1904        assert_eq!(header.height() as usize, block_height + 25);
1905
1906        network.shut_down().await;
1907    }
1908
1909    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1910    async fn test_leaf_only_ds() {
1911        // Create the consensus network.
1912        let mut network = MockNetwork::<MockSqlDataSource>::init_with_leaf_ds().await;
1913        network.start().await;
1914
1915        // Start the web server.
1916        let port = reserve_tcp_port().unwrap();
1917        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1918        app.register_module(
1919            "availability",
1920            define_api(
1921                &Default::default(),
1922                MockBase::instance(),
1923                "1.0.0".parse().unwrap(),
1924            )
1925            .unwrap(),
1926        )
1927        .unwrap();
1928        network.spawn(
1929            "server",
1930            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1931        );
1932
1933        // Start a client.
1934        let client = Client::<Error, MockBase>::new(
1935            format!("http://localhost:{port}/availability")
1936                .parse()
1937                .unwrap(),
1938        );
1939        assert!(client.connect(Some(Duration::from_secs(60))).await);
1940
1941        // Wait for some headers to be produced.
1942        client
1943            .socket("stream/headers/0")
1944            .subscribe::<Header<MockTypes>>()
1945            .await
1946            .unwrap()
1947            .take(5)
1948            .try_collect::<Vec<_>>()
1949            .await
1950            .unwrap();
1951
1952        // Wait for some leaves to be produced.
1953        client
1954            .socket("stream/leaves/5")
1955            .subscribe::<LeafQueryData<MockTypes>>()
1956            .await
1957            .unwrap()
1958            .take(5)
1959            .try_collect::<Vec<_>>()
1960            .await
1961            .unwrap();
1962
1963        let ds = network.data_source();
1964
1965        // Get the current block height and fetch header for some later block height
1966        // This fetch will only resolve if we get a block notification
1967        // However, this block will never be stored
1968        let block_height = ds.block_height().await.unwrap();
1969        let target_block_height = block_height + 20;
1970        let fetch = ds
1971            .get_block(BlockId::<MockTypes>::Number(target_block_height))
1972            .await;
1973
1974        assert!(fetch.is_pending());
1975        let block = fetch.await;
1976        assert_eq!(block.height() as usize, target_block_height);
1977
1978        let mut tx = ds.read().await.unwrap();
1979        tx.get_block(BlockId::<MockTypes>::Number(target_block_height))
1980            .await
1981            .unwrap_err();
1982        drop(tx);
1983
1984        network.shut_down().await;
1985    }
1986}