Skip to main content

hotshot_query_service/
availability.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Queries for HotShot chain state.
14//!
15//! The availability API provides an objective view of the HotShot blockchain. It provides access
16//! only to normative data: that is, data which is agreed upon by all honest consensus nodes and
17//! which is immutable. This means access to core consensus data structures including leaves,
18//! blocks, and headers, where each query is pure and idempotent. This also means that it is
19//! possible for a client to verify all of the information provided by this API, by running a
20//! HotShot light client and downloading the appropriate evidence with each query.
21//!
22//! This API does not provide any queries which represent only the _current_ state of the chain or
23//! may change over time, and it does not provide information for which there is not (yet) agreement
24//! of a supermajority of consensus nodes. For information about the current dynamic state of
25//! consensus and uncommitted state, try the [status](crate::status) API. For information about the
26//! chain which is tabulated by this specific node and not subject to full consensus agreement, try
27//! the [node](crate::node) API.
28
29use std::{path::PathBuf, time::Duration};
30
31use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
32use hotshot_types::{
33    data::{Leaf, Leaf2, QuorumProposal, VidCommitment, VidCommon},
34    simple_certificate::QuorumCertificate,
35    traits::node_implementation::NodeType,
36};
37use serde::{Deserialize, Serialize};
38use snafu::OptionExt;
39use tide_disco::{Api, RequestParams, StatusCode, api::ApiError, method::ReadState};
40use vbs::version::StaticVersionType;
41
42use crate::{Header, Payload, api::load_api, types::HeightIndexed};
43
44pub(crate) mod data_source;
45mod fetch;
46pub(crate) mod query_data;
47pub use data_source::*;
48pub use fetch::Fetch;
49pub use hotshot_query_service_types::availability::Error;
50pub use query_data::*;
51
52#[derive(Debug)]
53pub struct Options {
54    pub api_path: Option<PathBuf>,
55
56    /// Timeout for failing requests due to missing data.
57    ///
58    /// If data needed to respond to a request is missing, it can (in some cases) be fetched from an
59    /// external provider. This parameter controls how long the request handler will wait for
60    /// missing data to be fetched before giving up and failing the request.
61    pub fetch_timeout: Duration,
62
63    /// Additional API specification files to merge with `availability-api-path`.
64    ///
65    /// These optional files may contain route definitions for application-specific routes that have
66    /// been added as extensions to the basic availability API.
67    pub extensions: Vec<toml::Value>,
68
69    /// The maximum number of small objects which can be loaded in a single range query.
70    ///
71    /// Currently small objects include leaves only. In the future this limit will also apply to
72    /// headers, block summaries, and VID common, however
73    /// * loading of headers and block summaries is currently implemented by loading the entire
74    ///   block
75    /// * imperfect VID parameter tuning means that VID common can be much larger than it should
76    pub small_object_range_limit: usize,
77
78    /// The maximum number of large objects which can be loaded in a single range query.
79    ///
80    /// Large objects include anything that _might_ contain a full payload or an object proportional
81    /// in size to a payload. Note that this limit applies to the entire class of objects: we do not
82    /// check the size of objects while loading to determine which limit to apply. If an object
83    /// belongs to a class which might contain a large payload, the large object limit always
84    /// applies.
85    pub large_object_range_limit: usize,
86}
87
88impl Default for Options {
89    fn default() -> Self {
90        Self {
91            api_path: None,
92            fetch_timeout: Duration::from_millis(500),
93            extensions: vec![],
94            large_object_range_limit: 100,
95            small_object_range_limit: 500,
96        }
97    }
98}
99
100#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
101#[serde(bound = "")]
102pub struct Leaf1QueryData<Types: NodeType> {
103    pub(crate) leaf: Leaf<Types>,
104    pub(crate) qc: QuorumCertificate<Types>,
105}
106
107impl<Types: NodeType> Leaf1QueryData<Types> {
108    pub fn new(leaf: Leaf<Types>, qc: QuorumCertificate<Types>) -> Self {
109        Self { leaf, qc }
110    }
111
112    pub fn leaf(&self) -> &Leaf<Types> {
113        &self.leaf
114    }
115
116    pub fn qc(&self) -> &QuorumCertificate<Types> {
117        &self.qc
118    }
119}
120
121fn downgrade_leaf<Types: NodeType>(leaf2: Leaf2<Types>) -> Leaf<Types> {
122    // TODO do we still need some check here?
123    // `drb_seed` no longer exists on `Leaf2`
124    // if leaf2.drb_seed != [0; 32] && leaf2.drb_result != [0; 32] {
125    //     panic!("Downgrade of Leaf2 to Leaf will lose DRB information!");
126    // }
127    let quorum_proposal = QuorumProposal {
128        block_header: leaf2.block_header().clone(),
129        view_number: leaf2.view_number(),
130        justify_qc: leaf2.justify_qc().to_qc(),
131        upgrade_certificate: leaf2.upgrade_certificate(),
132        proposal_certificate: None,
133    };
134    let mut leaf = Leaf::from_quorum_proposal(&quorum_proposal);
135    if let Some(payload) = leaf2.block_payload() {
136        leaf.fill_block_payload_unchecked(payload);
137    }
138    leaf
139}
140
141fn downgrade_leaf_query_data<Types: NodeType>(leaf: LeafQueryData<Types>) -> Leaf1QueryData<Types> {
142    Leaf1QueryData {
143        leaf: downgrade_leaf(leaf.leaf),
144        qc: leaf.qc.to_qc(),
145    }
146}
147
148async fn get_leaf_handler<Types, State>(
149    req: tide_disco::RequestParams,
150    state: &State,
151    timeout: Duration,
152) -> Result<LeafQueryData<Types>, Error>
153where
154    State: 'static + Send + Sync + ReadState,
155    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
156    Types: NodeType,
157    Header<Types>: QueryableHeader<Types>,
158    Payload<Types>: QueryablePayload<Types>,
159{
160    let id = match req.opt_integer_param("height")? {
161        Some(height) => LeafId::Number(height),
162        None => LeafId::Hash(req.blob_param("hash")?),
163    };
164    let fetch = state.read(|state| state.get_leaf(id).boxed()).await;
165    fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
166        resource: id.to_string(),
167    })
168}
169
170async fn get_leaf_range_handler<Types, State>(
171    req: tide_disco::RequestParams,
172    state: &State,
173    timeout: Duration,
174    small_object_range_limit: usize,
175) -> Result<Vec<LeafQueryData<Types>>, Error>
176where
177    State: 'static + Send + Sync + ReadState,
178    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
179    Types: NodeType,
180    Header<Types>: QueryableHeader<Types>,
181    Payload<Types>: QueryablePayload<Types>,
182{
183    let from = req.integer_param::<_, usize>("from")?;
184    let until = req.integer_param("until")?;
185    enforce_range_limit(from, until, small_object_range_limit)?;
186
187    let leaves = state
188        .read(|state| state.get_leaf_range(from..until).boxed())
189        .await;
190    leaves
191        .enumerate()
192        .then(|(index, fetch)| async move {
193            fetch.with_timeout(timeout).await.context(FetchLeafSnafu {
194                resource: (index + from).to_string(),
195            })
196        })
197        .try_collect::<Vec<_>>()
198        .await
199}
200
201fn downgrade_vid_common_query_data<Types: NodeType>(
202    data: VidCommonQueryData<Types>,
203) -> Option<ADVZCommonQueryData<Types>> {
204    let VidCommonQueryData {
205        height,
206        block_hash,
207        payload_hash: VidCommitment::V0(payload_hash),
208        common: VidCommon::V0(common),
209    } = data
210    else {
211        return None;
212    };
213    Some(ADVZCommonQueryData {
214        height,
215        block_hash,
216        payload_hash,
217        common,
218    })
219}
220
221async fn get_vid_common_handler<Types, State>(
222    req: tide_disco::RequestParams,
223    state: &State,
224    timeout: Duration,
225) -> Result<VidCommonQueryData<Types>, Error>
226where
227    State: 'static + Send + Sync + ReadState,
228    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
229    Types: NodeType,
230    Header<Types>: QueryableHeader<Types>,
231    Payload<Types>: QueryablePayload<Types>,
232{
233    let id = if let Some(height) = req.opt_integer_param("height")? {
234        BlockId::Number(height)
235    } else if let Some(hash) = req.opt_blob_param("hash")? {
236        BlockId::Hash(hash)
237    } else {
238        BlockId::PayloadHash(req.blob_param("payload-hash")?)
239    };
240    let fetch = state.read(|state| state.get_vid_common(id).boxed()).await;
241    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
242        resource: id.to_string(),
243    })
244}
245
246async fn get_vid_common_range_handler<Types, State>(
247    req: tide_disco::RequestParams,
248    state: &State,
249    limit: usize,
250    timeout: Duration,
251) -> Result<Vec<VidCommonQueryData<Types>>, Error>
252where
253    State: 'static + Send + Sync + ReadState,
254    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
255    Types: NodeType,
256    Header<Types>: QueryableHeader<Types>,
257    Payload<Types>: QueryablePayload<Types>,
258{
259    let from = req.integer_param("from")?;
260    let until = req.integer_param("until")?;
261    enforce_range_limit(from, until, limit)?;
262
263    let vid = state
264        .read(|state| state.get_vid_common_range(from..until).boxed())
265        .await;
266    vid.enumerate()
267        .then(|(index, fetch)| async move {
268            fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
269                resource: (index + from).to_string(),
270            })
271        })
272        .try_collect::<Vec<_>>()
273        .await
274}
275
276pub fn define_api<State, Types: NodeType, Ver: StaticVersionType + 'static>(
277    options: &Options,
278    _: Ver,
279    api_ver: semver::Version,
280) -> Result<Api<State, Error, Ver>, ApiError>
281where
282    State: 'static + Send + Sync + ReadState,
283    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
284    Header<Types>: QueryableHeader<Types>,
285    Payload<Types>: QueryablePayload<Types>,
286{
287    let mut api = load_api::<State, Error, Ver>(
288        options.api_path.as_ref(),
289        include_str!("../api/availability.toml"),
290        options.extensions.clone(),
291    )?;
292    let timeout = options.fetch_timeout;
293    let small_object_range_limit = options.small_object_range_limit;
294    let large_object_range_limit = options.large_object_range_limit;
295
296    api.with_version(api_ver.clone());
297
298    // `LeafQueryData` now contains `Leaf2` and `QC2``, which is a breaking change.
299    // On node startup, all leaves are migrated to `Leaf2`.
300    //
301    // To maintain compatibility with nodes running an older version
302    // (which expect `LeafQueryData` with `Leaf1` and `QC1`),
303    // we downgrade `Leaf2` to `Leaf1` and `QC2` to `QC1` if the API version is V0.
304    // Otherwise, we return the new types.
305    if api_ver.major == 0 {
306        api.at("get_leaf", move |req, state| {
307            get_leaf_handler(req, state, timeout)
308                .map(|res| res.map(downgrade_leaf_query_data))
309                .boxed()
310        })?;
311
312        api.at("get_leaf_range", move |req, state| {
313            get_leaf_range_handler(req, state, timeout, small_object_range_limit)
314                .map(|res| {
315                    res.map(|r| {
316                        r.into_iter()
317                            .map(downgrade_leaf_query_data)
318                            .collect::<Vec<Leaf1QueryData<_>>>()
319                    })
320                })
321                .boxed()
322        })?;
323
324        api.stream("stream_leaves", move |req, state| {
325            async move {
326                let height = req.integer_param("height")?;
327                state
328                    .read(|state| {
329                        async move {
330                            Ok(state
331                                .subscribe_leaves(height)
332                                .await
333                                .map(|leaf| Ok(downgrade_leaf_query_data(leaf))))
334                        }
335                        .boxed()
336                    })
337                    .await
338            }
339            .try_flatten_stream()
340            .boxed()
341        })?;
342    } else {
343        api.at("get_leaf", move |req, state| {
344            get_leaf_handler(req, state, timeout).boxed()
345        })?;
346
347        api.at("get_leaf_range", move |req, state| {
348            get_leaf_range_handler(req, state, timeout, small_object_range_limit).boxed()
349        })?;
350
351        api.stream("stream_leaves", move |req, state| {
352            async move {
353                let height = req.integer_param("height")?;
354                state
355                    .read(|state| {
356                        async move { Ok(state.subscribe_leaves(height).await.map(Ok)) }.boxed()
357                    })
358                    .await
359            }
360            .try_flatten_stream()
361            .boxed()
362        })?;
363    }
364
365    // VIDCommon data is version gated after the VID upgrade.
366    // We keep the old struct and data in the API version V0. Starting from V1 we are returning version gated structs.
367    if api_ver.major == 0 {
368        api.at("get_vid_common", move |req, state| {
369            get_vid_common_handler(req, state, timeout)
370                .map(|r| match r {
371                    Ok(data) => downgrade_vid_common_query_data(data).ok_or(Error::Custom {
372                        message: "Incompatible VID version.".to_string(),
373                        status: StatusCode::BAD_REQUEST,
374                    }),
375                    Err(e) => Err(e),
376                })
377                .boxed()
378        })?
379        .at("get_vid_common_range", move |req, state| {
380            get_vid_common_range_handler(req, state, small_object_range_limit, timeout)
381                .map(|r| match r {
382                    Ok(data) => data
383                        .into_iter()
384                        .map(downgrade_vid_common_query_data)
385                        .collect::<Option<Vec<_>>>()
386                        .ok_or(Error::Custom {
387                            message: "Incompatible VID version.".to_string(),
388                            status: StatusCode::BAD_REQUEST,
389                        }),
390                    Err(e) => Err(e),
391                })
392                .boxed()
393        })?
394        .stream("stream_vid_common", move |req, state| {
395            async move {
396                let height = req.integer_param("height")?;
397                state
398                    .read(|state| {
399                        async move {
400                            Ok(state.subscribe_vid_common(height).await.map(|data| {
401                                downgrade_vid_common_query_data(data).ok_or(Error::Custom {
402                                    message: "Incompatible VID version.".to_string(),
403                                    status: StatusCode::BAD_REQUEST,
404                                })
405                            }))
406                        }
407                        .boxed()
408                    })
409                    .await
410            }
411            .try_flatten_stream()
412            .boxed()
413        })?;
414    } else {
415        api.at("get_vid_common", move |req, state| {
416            get_vid_common_handler(req, state, timeout).boxed().boxed()
417        })?
418        .at("get_vid_common_range", move |req, state| {
419            get_vid_common_range_handler(req, state, small_object_range_limit, timeout)
420                .boxed()
421                .boxed()
422        })?
423        .stream("stream_vid_common", move |req, state| {
424            async move {
425                let height = req.integer_param("height")?;
426                state
427                    .read(|state| {
428                        async move { Ok(state.subscribe_vid_common(height).await.map(Ok)) }.boxed()
429                    })
430                    .await
431            }
432            .try_flatten_stream()
433            .boxed()
434        })?;
435    }
436
437    api.at("get_header", move |req, state| {
438        async move {
439            let id = if let Some(height) = req.opt_integer_param("height")? {
440                BlockId::Number(height)
441            } else if let Some(hash) = req.opt_blob_param("hash")? {
442                BlockId::Hash(hash)
443            } else {
444                BlockId::PayloadHash(req.blob_param("payload-hash")?)
445            };
446            let fetch = state.read(|state| state.get_header(id).boxed()).await;
447            fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
448                resource: id.to_string(),
449            })
450        }
451        .boxed()
452    })?
453    .at("get_header_range", move |req, state| {
454        async move {
455            let from = req.integer_param::<_, usize>("from")?;
456            let until = req.integer_param::<_, usize>("until")?;
457            enforce_range_limit(from, until, large_object_range_limit)?;
458
459            let headers = state
460                .read(|state| state.get_header_range(from..until).boxed())
461                .await;
462            headers
463                .enumerate()
464                .then(|(index, fetch)| async move {
465                    fetch.with_timeout(timeout).await.context(FetchHeaderSnafu {
466                        resource: (index + from).to_string(),
467                    })
468                })
469                .try_collect::<Vec<_>>()
470                .await
471        }
472        .boxed()
473    })?
474    .stream("stream_headers", move |req, state| {
475        async move {
476            let height = req.integer_param("height")?;
477            state
478                .read(|state| {
479                    async move { Ok(state.subscribe_headers(height).await.map(Ok)) }.boxed()
480                })
481                .await
482        }
483        .try_flatten_stream()
484        .boxed()
485    })?
486    .at("get_block", move |req, state| {
487        async move {
488            let id = if let Some(height) = req.opt_integer_param("height")? {
489                BlockId::Number(height)
490            } else if let Some(hash) = req.opt_blob_param("hash")? {
491                BlockId::Hash(hash)
492            } else {
493                BlockId::PayloadHash(req.blob_param("payload-hash")?)
494            };
495            let fetch = state.read(|state| state.get_block(id).boxed()).await;
496            fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
497                resource: id.to_string(),
498            })
499        }
500        .boxed()
501    })?
502    .at("get_block_range", move |req, state| {
503        async move {
504            let from = req.integer_param::<_, usize>("from")?;
505            let until = req.integer_param("until")?;
506            enforce_range_limit(from, until, large_object_range_limit)?;
507
508            let blocks = state
509                .read(|state| state.get_block_range(from..until).boxed())
510                .await;
511            blocks
512                .enumerate()
513                .then(|(index, fetch)| async move {
514                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
515                        resource: (index + from).to_string(),
516                    })
517                })
518                .try_collect::<Vec<_>>()
519                .await
520        }
521        .boxed()
522    })?
523    .stream("stream_blocks", move |req, state| {
524        async move {
525            let height = req.integer_param("height")?;
526            state
527                .read(|state| {
528                    async move { Ok(state.subscribe_blocks(height).await.map(Ok)) }.boxed()
529                })
530                .await
531        }
532        .try_flatten_stream()
533        .boxed()
534    })?
535    .at("get_payload", move |req, state| {
536        async move {
537            let id = if let Some(height) = req.opt_integer_param("height")? {
538                BlockId::Number(height)
539            } else if let Some(hash) = req.opt_blob_param("hash")? {
540                BlockId::PayloadHash(hash)
541            } else {
542                BlockId::Hash(req.blob_param("block-hash")?)
543            };
544            let fetch = state.read(|state| state.get_payload(id).boxed()).await;
545            fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
546                resource: id.to_string(),
547            })
548        }
549        .boxed()
550    })?
551    .at("get_payload_range", move |req, state| {
552        async move {
553            let from = req.integer_param::<_, usize>("from")?;
554            let until = req.integer_param("until")?;
555            enforce_range_limit(from, until, large_object_range_limit)?;
556
557            let payloads = state
558                .read(|state| state.get_payload_range(from..until).boxed())
559                .await;
560            payloads
561                .enumerate()
562                .then(|(index, fetch)| async move {
563                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
564                        resource: (index + from).to_string(),
565                    })
566                })
567                .try_collect::<Vec<_>>()
568                .await
569        }
570        .boxed()
571    })?
572    .stream("stream_payloads", move |req, state| {
573        async move {
574            let height = req.integer_param("height")?;
575            state
576                .read(|state| {
577                    async move { Ok(state.subscribe_payloads(height).await.map(Ok)) }.boxed()
578                })
579                .await
580        }
581        .try_flatten_stream()
582        .boxed()
583    })?
584    .at("get_transaction_proof", move |req, state| {
585        async move {
586            let tx = get_transaction(req, state, timeout).await?;
587            let height = tx.block.height();
588            let vid = state
589                .read(|state| state.get_vid_common(height as usize))
590                .await
591                .with_timeout(timeout)
592                .await
593                .context(FetchBlockSnafu {
594                    resource: height.to_string(),
595                })?;
596            let proof = tx.block.transaction_proof(&vid, &tx.index).context(
597                InvalidTransactionIndexSnafu {
598                    height,
599                    index: tx.transaction.index(),
600                },
601            )?;
602            Ok(TransactionWithProofQueryData::new(tx.transaction, proof))
603        }
604        .boxed()
605    })?
606    .at("get_transaction", move |req, state| {
607        async move { Ok(get_transaction(req, state, timeout).await?.transaction) }.boxed()
608    })?
609    .stream("stream_transactions", move |req, state| {
610        async move {
611            let height = req.integer_param::<_, usize>("height")?;
612
613            let namespace: Option<i64> = req
614                .opt_integer_param::<_, usize>("namespace")?
615                .map(|i| {
616                    i.try_into().map_err(|err| Error::Custom {
617                        message: format!(
618                            "Invalid 'namespace': could not convert usize to i64: {err}"
619                        ),
620                        status: StatusCode::BAD_REQUEST,
621                    })
622                })
623                .transpose()?;
624
625            state
626                .read(|state| {
627                    async move {
628                        Ok(state
629                            .subscribe_blocks(height)
630                            .await
631                            .map(move |block| {
632                                let transactions = block.enumerate().enumerate();
633                                let header = block.header();
634                                let filtered_txs = transactions
635                                    .filter_map(|(i, (index, _tx))| {
636                                        if let Some(requested_ns) = namespace {
637                                            let ns_id = QueryableHeader::<Types>::namespace_id(
638                                                header,
639                                                &index.ns_index,
640                                            )?;
641
642                                            if ns_id.into() != requested_ns {
643                                                return None;
644                                            }
645                                        }
646
647                                        let tx = block.transaction(&index)?;
648                                        TransactionQueryData::new(tx, &block, &index, i as u64)
649                                    })
650                                    .collect::<Vec<_>>();
651
652                                futures::stream::iter(filtered_txs.into_iter().map(Ok))
653                            })
654                            .flatten())
655                    }
656                    .boxed()
657                })
658                .await
659        }
660        .try_flatten_stream()
661        .boxed()
662    })?
663    .at("get_block_summary", move |req, state| {
664        async move {
665            let id: usize = req.integer_param("height")?;
666
667            let fetch = state.read(|state| state.get_block(id).boxed()).await;
668            fetch
669                .with_timeout(timeout)
670                .await
671                .context(FetchBlockSnafu {
672                    resource: id.to_string(),
673                })
674                .map(BlockSummaryQueryData::from)
675        }
676        .boxed()
677    })?
678    .at("get_block_summary_range", move |req, state| {
679        async move {
680            let from: usize = req.integer_param("from")?;
681            let until: usize = req.integer_param("until")?;
682            enforce_range_limit(from, until, large_object_range_limit)?;
683
684            let blocks = state
685                .read(|state| state.get_block_range(from..until).boxed())
686                .await;
687            let result: Vec<BlockSummaryQueryData<Types>> = blocks
688                .enumerate()
689                .then(|(index, fetch)| async move {
690                    fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
691                        resource: (index + from).to_string(),
692                    })
693                })
694                .map(|result| result.map(BlockSummaryQueryData::from))
695                .try_collect()
696                .await?;
697
698            Ok(result)
699        }
700        .boxed()
701    })?
702    .at("get_limits", move |_req, _state| {
703        async move {
704            Ok(Limits {
705                small_object_range_limit,
706                large_object_range_limit,
707            })
708        }
709        .boxed()
710    })?
711    .get("get_cert2", |req, state| {
712        async move {
713            let height: u64 = req.integer_param("height")?;
714            state.get_cert2(height).await.map_err(|err| err.into())
715        }
716        .boxed()
717    })?;
718    Ok(api)
719}
720
721fn enforce_range_limit(from: usize, until: usize, limit: usize) -> Result<(), Error> {
722    if until.saturating_sub(from) > limit {
723        return Err(Error::RangeLimit { from, until, limit });
724    }
725    Ok(())
726}
727
728async fn get_transaction<Types, State>(
729    req: RequestParams,
730    state: &State,
731    timeout: Duration,
732) -> Result<BlockWithTransaction<Types>, Error>
733where
734    Types: NodeType,
735    Header<Types>: QueryableHeader<Types>,
736    Payload<Types>: QueryablePayload<Types>,
737    State: 'static + Send + Sync + ReadState,
738    <State as ReadState>::State: Send + Sync + AvailabilityDataSource<Types>,
739{
740    match req.opt_blob_param("hash")? {
741        Some(hash) => state
742            .read(|state| state.get_block_containing_transaction(hash).boxed())
743            .await
744            .with_timeout(timeout)
745            .await
746            .context(FetchTransactionSnafu {
747                resource: hash.to_string(),
748            }),
749        None => {
750            let height: u64 = req.integer_param("height")?;
751            let fetch = state
752                .read(|state| state.get_block(height as usize).boxed())
753                .await;
754            let block = fetch.with_timeout(timeout).await.context(FetchBlockSnafu {
755                resource: height.to_string(),
756            })?;
757            let i: u64 = req.integer_param("index")?;
758            let index = block
759                .payload()
760                .nth(block.metadata(), i as usize)
761                .context(InvalidTransactionIndexSnafu { height, index: i })?;
762            let transaction = block
763                .transaction(&index)
764                .context(InvalidTransactionIndexSnafu { height, index: i })?;
765            let transaction = TransactionQueryData::new(transaction, &block, &index, i)
766                .context(InvalidTransactionIndexSnafu { height, index: i })?;
767            Ok(BlockWithTransaction {
768                transaction,
769                block,
770                index,
771            })
772        },
773    }
774}
775
776#[cfg(test)]
777mod test {
778    use std::{fmt::Debug, time::Duration};
779
780    use async_lock::RwLock;
781    use committable::Committable;
782    use futures::future::FutureExt;
783    use hotshot_example_types::node_types::TEST_VERSIONS;
784    use hotshot_types::{data::Leaf2, simple_certificate::QuorumCertificate2};
785    use serde::de::DeserializeOwned;
786    use surf_disco::{Client, Error as _};
787    use tempfile::TempDir;
788    use test_utils::reserve_tcp_port;
789    use tide_disco::App;
790    use toml::toml;
791
792    use super::*;
793    use crate::{
794        ApiState, Error, Header,
795        data_source::{ExtensibleDataSource, VersionedDataSource, storage::AvailabilityStorage},
796        status::StatusDataSource,
797        task::BackgroundTask,
798        testing::{
799            consensus::{MockDataSource, MockNetwork, MockSqlDataSource},
800            mocks::{MOCK_UPGRADE, MockBase, MockHeader, MockPayload, MockTypes, mock_transaction},
801        },
802        types::HeightIndexed,
803    };
804
805    /// Get the current ledger height and a list of non-empty leaf/block pairs.
806    async fn get_non_empty_blocks(
807        client: &Client<Error, MockBase>,
808    ) -> (
809        u64,
810        Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>,
811    ) {
812        let mut blocks = vec![];
813        // Ignore the genesis block (start from height 1).
814        for i in 1.. {
815            match client
816                .get::<BlockQueryData<MockTypes>>(&format!("block/{i}"))
817                .send()
818                .await
819            {
820                Ok(block) => {
821                    if !block.is_empty() {
822                        let leaf = client.get(&format!("leaf/{i}")).send().await.unwrap();
823                        blocks.push((leaf, block));
824                    }
825                },
826                Err(Error::Availability {
827                    source: super::Error::FetchBlock { .. },
828                }) => {
829                    tracing::info!(
830                        "found end of ledger at height {i}, non-empty blocks are {blocks:?}",
831                    );
832                    return (i, blocks);
833                },
834                Err(err) => panic!("unexpected error {err}"),
835            }
836        }
837        unreachable!()
838    }
839
840    async fn validate(client: &Client<Error, MockBase>, height: u64) {
841        // Check the consistency of every block/leaf pair.
842        for i in 0..height {
843            // Limit the number of blocks we validate in order to
844            // speed up the tests.
845            if ![0, 1, height / 2, height - 1].contains(&i) {
846                continue;
847            }
848            tracing::info!("validate block {i}/{height}");
849
850            // Check that looking up the leaf various ways returns the correct leaf.
851            let leaf: LeafQueryData<MockTypes> =
852                client.get(&format!("leaf/{i}")).send().await.unwrap();
853            assert_eq!(leaf.height(), i);
854            assert_eq!(
855                leaf,
856                client
857                    .get(&format!("leaf/hash/{}", leaf.hash()))
858                    .send()
859                    .await
860                    .unwrap()
861            );
862
863            // Check that looking up the block various ways returns the correct block.
864            let block: BlockQueryData<MockTypes> =
865                client.get(&format!("block/{i}")).send().await.unwrap();
866            let expected_payload = PayloadQueryData::from(block.clone());
867            assert_eq!(leaf.block_hash(), block.hash());
868            assert_eq!(block.height(), i);
869            assert_eq!(
870                block,
871                client
872                    .get(&format!("block/hash/{}", block.hash()))
873                    .send()
874                    .await
875                    .unwrap()
876            );
877            assert_eq!(
878                *block.header(),
879                client.get(&format!("header/{i}")).send().await.unwrap()
880            );
881            assert_eq!(
882                *block.header(),
883                client
884                    .get(&format!("header/hash/{}", block.hash()))
885                    .send()
886                    .await
887                    .unwrap()
888            );
889            assert_eq!(
890                expected_payload,
891                client.get(&format!("payload/{i}")).send().await.unwrap(),
892            );
893            assert_eq!(
894                expected_payload,
895                client
896                    .get(&format!("payload/block-hash/{}", block.hash()))
897                    .send()
898                    .await
899                    .unwrap(),
900            );
901            // Look up the common VID data.
902            let common: VidCommonQueryData<MockTypes> = client
903                .get(&format!("vid/common/{}", block.height()))
904                .send()
905                .await
906                .unwrap();
907            assert_eq!(common.height(), block.height());
908            assert_eq!(common.block_hash(), block.hash());
909            assert_eq!(common.payload_hash(), block.payload_hash());
910            assert_eq!(
911                common,
912                client
913                    .get(&format!("vid/common/hash/{}", block.hash()))
914                    .send()
915                    .await
916                    .unwrap()
917            );
918
919            let block_summary = client
920                .get(&format!("block/summary/{i}"))
921                .send()
922                .await
923                .unwrap();
924            assert_eq!(
925                BlockSummaryQueryData::<MockTypes>::from(block.clone()),
926                block_summary,
927            );
928            assert_eq!(block_summary.header(), block.header());
929            assert_eq!(block_summary.hash(), block.hash());
930            assert_eq!(block_summary.size(), block.size());
931            assert_eq!(block_summary.num_transactions(), block.num_transactions());
932
933            let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
934                .get(&format!("block/summaries/{}/{}", 0, i))
935                .send()
936                .await
937                .unwrap();
938            assert_eq!(block_summaries.len() as u64, i);
939
940            // We should be able to look up the block by payload hash. Note that for duplicate
941            // payloads, these endpoints may return a different block with the same payload, which
942            // is acceptable. Therefore, we don't check equivalence of the entire `BlockQueryData`
943            // response, only its payload.
944            assert_eq!(
945                block.payload(),
946                client
947                    .get::<BlockQueryData<MockTypes>>(&format!(
948                        "block/payload-hash/{}",
949                        block.payload_hash()
950                    ))
951                    .send()
952                    .await
953                    .unwrap()
954                    .payload()
955            );
956            assert_eq!(
957                block.payload_hash(),
958                client
959                    .get::<Header<MockTypes>>(&format!(
960                        "header/payload-hash/{}",
961                        block.payload_hash()
962                    ))
963                    .send()
964                    .await
965                    .unwrap()
966                    .payload_commitment
967            );
968            assert_eq!(
969                block.payload(),
970                client
971                    .get::<PayloadQueryData<MockTypes>>(&format!(
972                        "payload/hash/{}",
973                        block.payload_hash()
974                    ))
975                    .send()
976                    .await
977                    .unwrap()
978                    .data(),
979            );
980            assert_eq!(
981                common.common(),
982                client
983                    .get::<VidCommonQueryData<MockTypes>>(&format!(
984                        "vid/common/payload-hash/{}",
985                        block.payload_hash()
986                    ))
987                    .send()
988                    .await
989                    .unwrap()
990                    .common()
991            );
992
993            // Check that looking up each transaction in the block various ways returns the correct
994            // transaction.
995            for (j, txn_from_block) in block.enumerate() {
996                let txn: TransactionQueryData<MockTypes> = client
997                    .get(&format!("transaction/{}/{}/noproof", i, j.position))
998                    .send()
999                    .await
1000                    .unwrap();
1001                assert_eq!(txn.block_height(), i);
1002                assert_eq!(txn.block_hash(), block.hash());
1003                assert_eq!(txn.index(), j.position as u64);
1004                assert_eq!(txn.hash(), txn_from_block.commit());
1005                assert_eq!(txn.transaction(), &txn_from_block);
1006                // We should be able to look up the transaction by hash. Note that for duplicate
1007                // transactions, this endpoint may return a different transaction with the same
1008                // hash, which is acceptable. Therefore, we don't check equivalence of the entire
1009                // `TransactionWithProofQueryData` response, only its commitment.
1010                assert_eq!(
1011                    txn.hash(),
1012                    client
1013                        .get::<TransactionQueryData<MockTypes>>(&format!(
1014                            "transaction/hash/{}/noproof",
1015                            txn.hash()
1016                        ))
1017                        .send()
1018                        .await
1019                        .unwrap()
1020                        .hash()
1021                );
1022
1023                let tx_with_proof = client
1024                    .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1025                        "transaction/{}/{}/proof",
1026                        i, j.position
1027                    ))
1028                    .send()
1029                    .await
1030                    .unwrap();
1031                assert_eq!(txn.hash(), tx_with_proof.hash());
1032                assert!(tx_with_proof.proof().verify(
1033                    block.metadata(),
1034                    txn.transaction(),
1035                    &block.payload_hash(),
1036                    common.common()
1037                ));
1038
1039                // Similar to above, but by hash
1040                let tx_with_proof = client
1041                    .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1042                        "transaction/hash/{}/proof",
1043                        txn.hash()
1044                    ))
1045                    .send()
1046                    .await
1047                    .unwrap();
1048                assert_eq!(txn.hash(), tx_with_proof.hash());
1049                assert!(tx_with_proof.proof().verify(
1050                    block.metadata(),
1051                    txn.transaction(),
1052                    &block.payload_hash(),
1053                    common.common()
1054                ));
1055            }
1056
1057            let block_range: Vec<BlockQueryData<MockTypes>> = client
1058                .get(&format!("block/{}/{}", 0, i))
1059                .send()
1060                .await
1061                .unwrap();
1062
1063            assert_eq!(block_range.len() as u64, i);
1064
1065            let leaf_range: Vec<LeafQueryData<MockTypes>> = client
1066                .get(&format!("leaf/{}/{}", 0, i))
1067                .send()
1068                .await
1069                .unwrap();
1070
1071            assert_eq!(leaf_range.len() as u64, i);
1072
1073            let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1074                .get(&format!("payload/{}/{}", 0, i))
1075                .send()
1076                .await
1077                .unwrap();
1078
1079            assert_eq!(payload_range.len() as u64, i);
1080
1081            let vid_common_range: Vec<VidCommonQueryData<MockTypes>> = client
1082                .get(&format!("vid/common/{}/{}", 0, i))
1083                .send()
1084                .await
1085                .unwrap();
1086
1087            assert_eq!(vid_common_range.len() as u64, i);
1088
1089            let header_range: Vec<Header<MockTypes>> = client
1090                .get(&format!("header/{}/{}", 0, i))
1091                .send()
1092                .await
1093                .unwrap();
1094
1095            assert_eq!(header_range.len() as u64, i);
1096        }
1097    }
1098
1099    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1100    async fn test_api() {
1101        // Create the consensus network.
1102        let mut network = MockNetwork::<MockDataSource>::init().await;
1103        network.start().await;
1104
1105        // Start the web server.
1106        let port = reserve_tcp_port().unwrap();
1107        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1108        let options = Options {
1109            small_object_range_limit: 500,
1110            large_object_range_limit: 500,
1111            ..Default::default()
1112        };
1113
1114        app.register_module(
1115            "availability",
1116            define_api(&options, MockBase::instance(), "1.0.0".parse().unwrap()).unwrap(),
1117        )
1118        .unwrap();
1119        network.spawn(
1120            "server",
1121            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1122        );
1123
1124        // Start a client.
1125        let client = Client::<Error, MockBase>::new(
1126            format!("http://localhost:{port}/availability")
1127                .parse()
1128                .unwrap(),
1129        );
1130        assert!(client.connect(Some(Duration::from_secs(60))).await);
1131        assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1132
1133        // Submit a few blocks and make sure each one gets reflected in the query service and
1134        // preserves the consistency of the data and indices.
1135        let leaves = client
1136            .socket("stream/leaves/0")
1137            .subscribe::<LeafQueryData<MockTypes>>()
1138            .await
1139            .unwrap();
1140        let headers = client
1141            .socket("stream/headers/0")
1142            .subscribe::<Header<MockTypes>>()
1143            .await
1144            .unwrap();
1145        let blocks = client
1146            .socket("stream/blocks/0")
1147            .subscribe::<BlockQueryData<MockTypes>>()
1148            .await
1149            .unwrap();
1150        let vid_common = client
1151            .socket("stream/vid/common/0")
1152            .subscribe::<VidCommonQueryData<MockTypes>>()
1153            .await
1154            .unwrap();
1155        let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1156        for nonce in 0..3 {
1157            let txn = mock_transaction(vec![nonce]);
1158            network.submit_transaction(txn).await;
1159
1160            // Wait for the transaction to be finalized.
1161            let (i, leaf, block, common) = loop {
1162                tracing::info!("waiting for block with transaction {}", nonce);
1163                let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1164                tracing::info!(i, ?leaf, ?header, ?block, ?common);
1165                let leaf = leaf.unwrap();
1166                let header = header.unwrap();
1167                let block = block.unwrap();
1168                let common = common.unwrap();
1169                assert_eq!(leaf.height() as usize, i);
1170                assert_eq!(leaf.block_hash(), block.hash());
1171                assert_eq!(block.header(), &header);
1172                assert_eq!(common.height() as usize, i);
1173                if !block.is_empty() {
1174                    break (i, leaf, block, common);
1175                }
1176            };
1177            assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1178            assert_eq!(
1179                block,
1180                client.get(&format!("block/{i}")).send().await.unwrap()
1181            );
1182            assert_eq!(
1183                common,
1184                client.get(&format!("vid/common/{i}")).send().await.unwrap()
1185            );
1186
1187            validate(&client, (i + 1) as u64).await;
1188        }
1189
1190        network.shut_down().await;
1191    }
1192
1193    async fn validate_old(client: &Client<Error, MockBase>, height: u64) {
1194        // Check the consistency of every block/leaf pair.
1195        for i in 0..height {
1196            // Limit the number of blocks we validate in order to
1197            // speed up the tests.
1198            if ![0, 1, height / 2, height - 1].contains(&i) {
1199                continue;
1200            }
1201            tracing::info!("validate block {i}/{height}");
1202
1203            // Check that looking up the leaf various ways returns the correct leaf.
1204            let leaf: Leaf1QueryData<MockTypes> =
1205                client.get(&format!("leaf/{i}")).send().await.unwrap();
1206            assert_eq!(leaf.leaf.height(), i);
1207            assert_eq!(
1208                leaf,
1209                client
1210                    .get(&format!(
1211                        "leaf/hash/{}",
1212                        <Leaf<MockTypes> as Committable>::commit(&leaf.leaf)
1213                    ))
1214                    .send()
1215                    .await
1216                    .unwrap()
1217            );
1218
1219            // Check that looking up the block various ways returns the correct block.
1220            let block: BlockQueryData<MockTypes> =
1221                client.get(&format!("block/{i}")).send().await.unwrap();
1222            let expected_payload = PayloadQueryData::from(block.clone());
1223            assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1224            assert_eq!(block.height(), i);
1225            assert_eq!(
1226                block,
1227                client
1228                    .get(&format!("block/hash/{}", block.hash()))
1229                    .send()
1230                    .await
1231                    .unwrap()
1232            );
1233            assert_eq!(
1234                *block.header(),
1235                client.get(&format!("header/{i}")).send().await.unwrap()
1236            );
1237            assert_eq!(
1238                *block.header(),
1239                client
1240                    .get(&format!("header/hash/{}", block.hash()))
1241                    .send()
1242                    .await
1243                    .unwrap()
1244            );
1245            assert_eq!(
1246                expected_payload,
1247                client.get(&format!("payload/{i}")).send().await.unwrap(),
1248            );
1249            assert_eq!(
1250                expected_payload,
1251                client
1252                    .get(&format!("payload/block-hash/{}", block.hash()))
1253                    .send()
1254                    .await
1255                    .unwrap(),
1256            );
1257            // Look up the common VID data.
1258            let common: ADVZCommonQueryData<MockTypes> = client
1259                .get(&format!("vid/common/{}", block.height()))
1260                .send()
1261                .await
1262                .unwrap();
1263            assert_eq!(common.height(), block.height());
1264            assert_eq!(common.block_hash(), block.hash());
1265            assert_eq!(
1266                VidCommitment::V0(common.payload_hash()),
1267                block.payload_hash(),
1268            );
1269            assert_eq!(
1270                common,
1271                client
1272                    .get(&format!("vid/common/hash/{}", block.hash()))
1273                    .send()
1274                    .await
1275                    .unwrap()
1276            );
1277
1278            let block_summary = client
1279                .get(&format!("block/summary/{i}"))
1280                .send()
1281                .await
1282                .unwrap();
1283            assert_eq!(
1284                BlockSummaryQueryData::<MockTypes>::from(block.clone()),
1285                block_summary,
1286            );
1287            assert_eq!(block_summary.header(), block.header());
1288            assert_eq!(block_summary.hash(), block.hash());
1289            assert_eq!(block_summary.size(), block.size());
1290            assert_eq!(block_summary.num_transactions(), block.num_transactions());
1291
1292            let block_summaries: Vec<BlockSummaryQueryData<MockTypes>> = client
1293                .get(&format!("block/summaries/{}/{}", 0, i))
1294                .send()
1295                .await
1296                .unwrap();
1297            assert_eq!(block_summaries.len() as u64, i);
1298
1299            // We should be able to look up the block by payload hash. Note that for duplicate
1300            // payloads, these endpoints may return a different block with the same payload, which
1301            // is acceptable. Therefore, we don't check equivalence of the entire `BlockQueryData`
1302            // response, only its payload.
1303            assert_eq!(
1304                block.payload(),
1305                client
1306                    .get::<BlockQueryData<MockTypes>>(&format!(
1307                        "block/payload-hash/{}",
1308                        block.payload_hash()
1309                    ))
1310                    .send()
1311                    .await
1312                    .unwrap()
1313                    .payload()
1314            );
1315            assert_eq!(
1316                block.payload_hash(),
1317                client
1318                    .get::<Header<MockTypes>>(&format!(
1319                        "header/payload-hash/{}",
1320                        block.payload_hash()
1321                    ))
1322                    .send()
1323                    .await
1324                    .unwrap()
1325                    .payload_commitment
1326            );
1327            assert_eq!(
1328                block.payload(),
1329                client
1330                    .get::<PayloadQueryData<MockTypes>>(&format!(
1331                        "payload/hash/{}",
1332                        block.payload_hash()
1333                    ))
1334                    .send()
1335                    .await
1336                    .unwrap()
1337                    .data(),
1338            );
1339            assert_eq!(
1340                common.common(),
1341                client
1342                    .get::<ADVZCommonQueryData<MockTypes>>(&format!(
1343                        "vid/common/payload-hash/{}",
1344                        block.payload_hash()
1345                    ))
1346                    .send()
1347                    .await
1348                    .unwrap()
1349                    .common()
1350            );
1351
1352            // Check that looking up each transaction in the block various ways returns the correct
1353            // transaction.
1354            for (j, txn_from_block) in block.enumerate() {
1355                let txn: TransactionQueryData<MockTypes> = client
1356                    .get(&format!("transaction/{}/{}/noproof", i, j.position))
1357                    .send()
1358                    .await
1359                    .unwrap();
1360                assert_eq!(txn.block_height(), i);
1361                assert_eq!(txn.block_hash(), block.hash());
1362                assert_eq!(txn.index(), j.position as u64);
1363                assert_eq!(txn.hash(), txn_from_block.commit());
1364                assert_eq!(txn.transaction(), &txn_from_block);
1365                // We should be able to look up the transaction by hash. Note that for duplicate
1366                // transactions, this endpoint may return a different transaction with the same
1367                // hash, which is acceptable. Therefore, we don't check equivalence of the entire
1368                // `TransactionQueryData` response, only its commitment.
1369                assert_eq!(
1370                    txn.hash(),
1371                    client
1372                        .get::<TransactionQueryData<MockTypes>>(&format!(
1373                            "transaction/hash/{}/noproof",
1374                            txn.hash()
1375                        ))
1376                        .send()
1377                        .await
1378                        .unwrap()
1379                        .hash()
1380                );
1381
1382                assert_eq!(
1383                    txn.hash(),
1384                    client
1385                        .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1386                            "transaction/{}/{}/proof",
1387                            i, j.position
1388                        ))
1389                        .send()
1390                        .await
1391                        .unwrap()
1392                        .hash()
1393                );
1394
1395                assert_eq!(
1396                    txn.hash(),
1397                    client
1398                        .get::<TransactionWithProofQueryData<MockTypes>>(&format!(
1399                            "transaction/hash/{}/proof",
1400                            txn.hash()
1401                        ))
1402                        .send()
1403                        .await
1404                        .unwrap()
1405                        .hash()
1406                );
1407            }
1408
1409            let block_range: Vec<BlockQueryData<MockTypes>> = client
1410                .get(&format!("block/{}/{}", 0, i))
1411                .send()
1412                .await
1413                .unwrap();
1414
1415            assert_eq!(block_range.len() as u64, i);
1416
1417            let leaf_range: Vec<Leaf1QueryData<MockTypes>> = client
1418                .get(&format!("leaf/{}/{}", 0, i))
1419                .send()
1420                .await
1421                .unwrap();
1422
1423            assert_eq!(leaf_range.len() as u64, i);
1424
1425            let payload_range: Vec<PayloadQueryData<MockTypes>> = client
1426                .get(&format!("payload/{}/{}", 0, i))
1427                .send()
1428                .await
1429                .unwrap();
1430
1431            assert_eq!(payload_range.len() as u64, i);
1432
1433            let vid_common_range: Vec<ADVZCommonQueryData<MockTypes>> = client
1434                .get(&format!("vid/common/{}/{}", 0, i))
1435                .send()
1436                .await
1437                .unwrap();
1438
1439            assert_eq!(vid_common_range.len() as u64, i);
1440
1441            let header_range: Vec<Header<MockTypes>> = client
1442                .get(&format!("header/{}/{}", 0, i))
1443                .send()
1444                .await
1445                .unwrap();
1446
1447            assert_eq!(header_range.len() as u64, i);
1448        }
1449    }
1450
1451    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1452    async fn test_api_epochs() {
1453        // Create the consensus network.
1454        let mut network = MockNetwork::<MockDataSource>::init().await;
1455        let epoch_height = network.epoch_height();
1456        network.start().await;
1457
1458        // Start the web server.
1459        let port = reserve_tcp_port().unwrap();
1460        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1461        app.register_module(
1462            "availability",
1463            define_api(
1464                &Default::default(),
1465                MockBase::instance(),
1466                "1.0.0".parse().unwrap(),
1467            )
1468            .unwrap(),
1469        )
1470        .unwrap();
1471        network.spawn(
1472            "server",
1473            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1474        );
1475
1476        // Start a client.
1477        let client = Client::<Error, MockBase>::new(
1478            format!("http://localhost:{port}/availability")
1479                .parse()
1480                .unwrap(),
1481        );
1482        assert!(client.connect(Some(Duration::from_secs(60))).await);
1483
1484        // Submit a few blocks and make sure each one gets reflected in the query service and
1485        // preserves the consistency of the data and indices.
1486        let headers = client
1487            .socket("stream/headers/0")
1488            .subscribe::<Header<MockTypes>>()
1489            .await
1490            .unwrap();
1491        let mut chain = headers.enumerate();
1492
1493        loop {
1494            let (i, header) = chain.next().await.unwrap();
1495            let header = header.unwrap();
1496            assert_eq!(header.height(), i as u64);
1497            if header.height() >= 3 * epoch_height {
1498                break;
1499            }
1500        }
1501
1502        network.shut_down().await;
1503    }
1504
1505    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1506    async fn test_old_api() {
1507        // Create the consensus network.
1508        let mut network = MockNetwork::<MockDataSource>::init().await;
1509        network.start().await;
1510
1511        // Start the web server.
1512        let port = reserve_tcp_port().unwrap();
1513
1514        let options = Options {
1515            small_object_range_limit: 500,
1516            large_object_range_limit: 500,
1517            ..Default::default()
1518        };
1519
1520        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1521        app.register_module(
1522            "availability",
1523            define_api(&options, MockBase::instance(), "0.1.0".parse().unwrap()).unwrap(),
1524        )
1525        .unwrap();
1526        network.spawn(
1527            "server",
1528            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1529        );
1530
1531        // Start a client.
1532        let client = Client::<Error, MockBase>::new(
1533            format!("http://localhost:{port}/availability")
1534                .parse()
1535                .unwrap(),
1536        );
1537        assert!(client.connect(Some(Duration::from_secs(60))).await);
1538        assert_eq!(get_non_empty_blocks(&client).await.1, vec![]);
1539
1540        // Submit a few blocks and make sure each one gets reflected in the query service and
1541        // preserves the consistency of the data and indices.
1542        let leaves = client
1543            .socket("stream/leaves/0")
1544            .subscribe::<Leaf1QueryData<MockTypes>>()
1545            .await
1546            .unwrap();
1547        let headers = client
1548            .socket("stream/headers/0")
1549            .subscribe::<Header<MockTypes>>()
1550            .await
1551            .unwrap();
1552        let blocks = client
1553            .socket("stream/blocks/0")
1554            .subscribe::<BlockQueryData<MockTypes>>()
1555            .await
1556            .unwrap();
1557        let vid_common = client
1558            .socket("stream/vid/common/0")
1559            .subscribe::<ADVZCommonQueryData<MockTypes>>()
1560            .await
1561            .unwrap();
1562        let mut chain = leaves.zip(headers.zip(blocks.zip(vid_common))).enumerate();
1563        for nonce in 0..3 {
1564            let txn = mock_transaction(vec![nonce]);
1565            network.submit_transaction(txn).await;
1566
1567            // Wait for the transaction to be finalized.
1568            let (i, leaf, block, common) = loop {
1569                tracing::info!("waiting for block with transaction {}", nonce);
1570                let (i, (leaf, (header, (block, common)))) = chain.next().await.unwrap();
1571                tracing::info!(i, ?leaf, ?header, ?block, ?common);
1572                let leaf = leaf.unwrap();
1573                let header = header.unwrap();
1574                let block = block.unwrap();
1575                let common = common.unwrap();
1576                assert_eq!(leaf.leaf.height() as usize, i);
1577                assert_eq!(leaf.leaf.block_header().commit(), block.hash());
1578                assert_eq!(block.header(), &header);
1579                assert_eq!(common.height() as usize, i);
1580                if !block.is_empty() {
1581                    break (i, leaf, block, common);
1582                }
1583            };
1584            assert_eq!(leaf, client.get(&format!("leaf/{i}")).send().await.unwrap());
1585            assert_eq!(
1586                block,
1587                client.get(&format!("block/{i}")).send().await.unwrap()
1588            );
1589            assert_eq!(
1590                common,
1591                client.get(&format!("vid/common/{i}")).send().await.unwrap()
1592            );
1593
1594            validate_old(&client, (i + 1) as u64).await;
1595        }
1596
1597        network.shut_down().await;
1598    }
1599
1600    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1601    async fn test_extensions() {
1602        let dir = TempDir::with_prefix("test_availability_extensions").unwrap();
1603        let data_source = ExtensibleDataSource::new(
1604            MockDataSource::create(dir.path(), Default::default())
1605                .await
1606                .unwrap(),
1607            0,
1608        );
1609
1610        // mock up some consensus data.
1611        let leaf = Leaf2::<MockTypes>::genesis(
1612            &Default::default(),
1613            &Default::default(),
1614            MOCK_UPGRADE.base,
1615        )
1616        .await;
1617        let qc = QuorumCertificate2::genesis(
1618            &Default::default(),
1619            &Default::default(),
1620            TEST_VERSIONS.test,
1621        )
1622        .await;
1623        let leaf = LeafQueryData::new(leaf, qc).unwrap();
1624        let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis());
1625        data_source
1626            .append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1627            .await
1628            .unwrap();
1629
1630        // assert that the store has data before we move on to API requests
1631        assert_eq!(
1632            ExtensibleDataSource::<MockDataSource, u64>::block_height(&data_source)
1633                .await
1634                .unwrap(),
1635            1
1636        );
1637        assert_eq!(block, data_source.get_block(0).await.await);
1638
1639        // Create the API extensions specification.
1640        let extensions = toml! {
1641            [route.post_ext]
1642            PATH = ["/ext/:val"]
1643            METHOD = "POST"
1644            ":val" = "Integer"
1645
1646            [route.get_ext]
1647            PATH = ["/ext"]
1648            METHOD = "GET"
1649        };
1650
1651        let mut api =
1652            define_api::<RwLock<ExtensibleDataSource<MockDataSource, u64>>, MockTypes, MockBase>(
1653                &Options {
1654                    extensions: vec![extensions.into()],
1655                    ..Default::default()
1656                },
1657                MockBase::instance(),
1658                "1.0.0".parse().unwrap(),
1659            )
1660            .unwrap();
1661        api.get("get_ext", |_, state| {
1662            async move { Ok(*state.as_ref()) }.boxed()
1663        })
1664        .unwrap()
1665        .post("post_ext", |req, state| {
1666            async move {
1667                *state.as_mut() = req.integer_param("val")?;
1668                Ok(())
1669            }
1670            .boxed()
1671        })
1672        .unwrap();
1673
1674        let mut app = App::<_, Error>::with_state(RwLock::new(data_source));
1675        app.register_module("availability", api).unwrap();
1676
1677        let port = reserve_tcp_port().unwrap();
1678        let _server = BackgroundTask::spawn(
1679            "server",
1680            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1681        );
1682
1683        let client = Client::<Error, MockBase>::new(
1684            format!("http://localhost:{port}/availability")
1685                .parse()
1686                .unwrap(),
1687        );
1688        assert!(client.connect(Some(Duration::from_secs(60))).await);
1689
1690        assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 0);
1691        client.post::<()>("ext/42").send().await.unwrap();
1692        assert_eq!(client.get::<u64>("ext").send().await.unwrap(), 42);
1693
1694        // Ensure we can still access the built-in functionality.
1695        assert_eq!(
1696            client
1697                .get::<MockHeader>("header/0")
1698                .send()
1699                .await
1700                .unwrap()
1701                .block_number,
1702            0
1703        );
1704    }
1705
1706    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1707    async fn test_range_limit() {
1708        let large_object_range_limit = 2;
1709        let small_object_range_limit = 3;
1710
1711        // Create the consensus network.
1712        let mut network = MockNetwork::<MockDataSource>::init().await;
1713        network.start().await;
1714
1715        // Start the web server.
1716        let port = reserve_tcp_port().unwrap();
1717        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1718        app.register_module(
1719            "availability",
1720            define_api(
1721                &Options {
1722                    large_object_range_limit,
1723                    small_object_range_limit,
1724                    ..Default::default()
1725                },
1726                MockBase::instance(),
1727                "1.0.0".parse().unwrap(),
1728            )
1729            .unwrap(),
1730        )
1731        .unwrap();
1732        network.spawn(
1733            "server",
1734            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1735        );
1736
1737        // Start a client.
1738        let client = Client::<Error, MockBase>::new(
1739            format!("http://localhost:{port}/availability")
1740                .parse()
1741                .unwrap(),
1742        );
1743        assert!(client.connect(Some(Duration::from_secs(60))).await);
1744
1745        // Check reported limits.
1746        assert_eq!(
1747            client.get::<Limits>("limits").send().await.unwrap(),
1748            Limits {
1749                small_object_range_limit,
1750                large_object_range_limit
1751            }
1752        );
1753
1754        // Wait for enough blocks to be produced.
1755        client
1756            .socket("stream/blocks/0")
1757            .subscribe::<BlockQueryData<MockTypes>>()
1758            .await
1759            .unwrap()
1760            .take(small_object_range_limit + 1)
1761            .try_collect::<Vec<_>>()
1762            .await
1763            .unwrap();
1764
1765        async fn check_limit<T: DeserializeOwned + Debug>(
1766            client: &Client<Error, MockBase>,
1767            req: &str,
1768            limit: usize,
1769        ) {
1770            let range: Vec<T> = client
1771                .get(&format!("{req}/0/{limit}"))
1772                .send()
1773                .await
1774                .unwrap();
1775            assert_eq!(range.len(), limit);
1776            let err = client
1777                .get::<Vec<T>>(&format!("{req}/0/{}", limit + 1))
1778                .send()
1779                .await
1780                .unwrap_err();
1781            assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1782        }
1783
1784        check_limit::<LeafQueryData<MockTypes>>(&client, "leaf", small_object_range_limit).await;
1785        check_limit::<Header<MockTypes>>(&client, "header", large_object_range_limit).await;
1786        check_limit::<BlockQueryData<MockTypes>>(&client, "block", large_object_range_limit).await;
1787        check_limit::<PayloadQueryData<MockTypes>>(&client, "payload", large_object_range_limit)
1788            .await;
1789        check_limit::<BlockSummaryQueryData<MockTypes>>(
1790            &client,
1791            "block/summaries",
1792            large_object_range_limit,
1793        )
1794        .await;
1795
1796        network.shut_down().await;
1797    }
1798
1799    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1800    async fn test_header_endpoint() {
1801        // Create the consensus network.
1802        let mut network = MockNetwork::<MockSqlDataSource>::init().await;
1803        network.start().await;
1804
1805        // Start the web server.
1806        let port = reserve_tcp_port().unwrap();
1807        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1808        app.register_module(
1809            "availability",
1810            define_api(
1811                &Default::default(),
1812                MockBase::instance(),
1813                "1.0.0".parse().unwrap(),
1814            )
1815            .unwrap(),
1816        )
1817        .unwrap();
1818        network.spawn(
1819            "server",
1820            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1821        );
1822
1823        let ds = network.data_source();
1824
1825        // Get the current block height and fetch header for some later block height
1826        // This fetch will only resolve when we receive a leaf or block for that block height
1827        let block_height = ds.block_height().await.unwrap();
1828        let fetch = ds
1829            .get_header(BlockId::<MockTypes>::Number(block_height + 25))
1830            .await;
1831
1832        assert!(fetch.is_pending());
1833        let header = fetch.await;
1834        assert_eq!(header.height() as usize, block_height + 25);
1835
1836        network.shut_down().await;
1837    }
1838
1839    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1840    async fn test_leaf_only_ds() {
1841        // Create the consensus network.
1842        let mut network = MockNetwork::<MockSqlDataSource>::init_with_leaf_ds().await;
1843        network.start().await;
1844
1845        // Start the web server.
1846        let port = reserve_tcp_port().unwrap();
1847        let mut app = App::<_, Error>::with_state(ApiState::from(network.data_source()));
1848        app.register_module(
1849            "availability",
1850            define_api(
1851                &Default::default(),
1852                MockBase::instance(),
1853                "1.0.0".parse().unwrap(),
1854            )
1855            .unwrap(),
1856        )
1857        .unwrap();
1858        network.spawn(
1859            "server",
1860            app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
1861        );
1862
1863        // Start a client.
1864        let client = Client::<Error, MockBase>::new(
1865            format!("http://localhost:{port}/availability")
1866                .parse()
1867                .unwrap(),
1868        );
1869        assert!(client.connect(Some(Duration::from_secs(60))).await);
1870
1871        // Wait for some headers to be produced.
1872        client
1873            .socket("stream/headers/0")
1874            .subscribe::<Header<MockTypes>>()
1875            .await
1876            .unwrap()
1877            .take(5)
1878            .try_collect::<Vec<_>>()
1879            .await
1880            .unwrap();
1881
1882        // Wait for some leaves to be produced.
1883        client
1884            .socket("stream/leaves/5")
1885            .subscribe::<LeafQueryData<MockTypes>>()
1886            .await
1887            .unwrap()
1888            .take(5)
1889            .try_collect::<Vec<_>>()
1890            .await
1891            .unwrap();
1892
1893        let ds = network.data_source();
1894
1895        // Get the current block height and fetch header for some later block height
1896        // This fetch will only resolve if we get a block notification
1897        // However, this block will never be stored
1898        let block_height = ds.block_height().await.unwrap();
1899        let target_block_height = block_height + 20;
1900        let fetch = ds
1901            .get_block(BlockId::<MockTypes>::Number(target_block_height))
1902            .await;
1903
1904        assert!(fetch.is_pending());
1905        let block = fetch.await;
1906        assert_eq!(block.height() as usize, target_block_height);
1907
1908        let mut tx = ds.read().await.unwrap();
1909        tx.get_block(BlockId::<MockTypes>::Number(target_block_height))
1910            .await
1911            .unwrap_err();
1912        drop(tx);
1913
1914        network.shut_down().await;
1915    }
1916}