hotshot_query_service/lib.rs
1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! The HotShot Query Service is a minimal, generic query service that can be integrated into any
14//! decentralized application running on the [hotshot] consensus layer. It provides all the features
15//! that HotShot itself expects of a query service (such as providing consensus-related data for
16//! catchup and synchronization) as well as some application-level features that deal only with
17//! consensus-related or application-agnostic data. In addition, the query service is provided as an
18//! extensible library, which makes it easy to add additional, application-specific features.
19//!
20//! # Basic usage
21//!
22//! ```
23//! # use hotshot::types::SystemContextHandle;
24//! # use hotshot_query_service::testing::mocks::{
25//! # MockNodeImpl as AppNodeImpl, MockTypes as AppTypes, MockVersions as AppVersions,
26//! # };
27//! # use hotshot_example_types::node_types::TestVersions;
28//! # use hotshot_types::consensus::ConsensusMetricsValue;
29//! # use std::path::Path;
30//! # async fn doc(storage_path: &std::path::Path) -> anyhow::Result<()> {
31//! use hotshot_query_service::{
32//! availability,
33//! data_source::{FileSystemDataSource, Transaction, UpdateDataSource, VersionedDataSource},
34//! fetching::provider::NoFetching,
35//! node,
36//! status::UpdateStatusData,
37//! status,
38//! testing::mocks::MockBase,
39//! ApiState, Error,
40//! };
41//!
42//! use futures::StreamExt;
43//! use vbs::version::StaticVersionType;
44//! use hotshot::SystemContext;
45//! use std::sync::Arc;
46//! use tide_disco::App;
47//! use tokio::spawn;
48//!
49//! // Create or open a data source.
50//! let data_source = FileSystemDataSource::<AppTypes, NoFetching>::create(storage_path, NoFetching)
51//! .await?;
52//!
53//! // Create hotshot, giving it a handle to the status metrics.
54//! let hotshot = SystemContext::<AppTypes, AppNodeImpl, AppVersions>::init(
55//! # panic!(), panic!(), panic!(), panic!(), panic!(), panic!(), panic!(),
56//! ConsensusMetricsValue::new(&*data_source.populate_metrics()), panic!(),
57//! panic!()
58//! // Other fields omitted
59//! ).await?.0;
60//!
61//! // Create API modules.
62//! let availability_api = availability::define_api(&Default::default(), MockBase::instance())?;
63//! let node_api = node::define_api(&Default::default(), MockBase::instance())?;
64//! let status_api = status::define_api(&Default::default(), MockBase::instance())?;
65//!
66//! // Create app.
67//! let data_source = ApiState::from(data_source);
68//! let mut app = App::<_, Error>::with_state(data_source.clone());
69//! app
70//! .register_module("availability", availability_api)?
71//! .register_module("node", node_api)?
72//! .register_module("status", status_api)?;
73//!
74//! // Serve app.
75//! spawn(app.serve("0.0.0.0:8080", MockBase::instance()));
76//!
77//! // Update query data using HotShot events.
78//! let mut events = hotshot.event_stream();
79//! while let Some(event) = events.next().await {
80//! // Update the query data based on this event.
81//! data_source.update(&event).await.ok();
82//! }
83//! # Ok(())
84//! # }
85//! ```
86//!
87//! Shortcut for starting an out-of-the-box service with no extensions (does exactly the above and
88//! nothing more):
89//!
90//! ```
91//! # use hotshot::types::SystemContextHandle;
92//! # use vbs::version::StaticVersionType;
93//! # use hotshot_query_service::{data_source::FileSystemDataSource, Error, Options};
94//! # use hotshot_query_service::fetching::provider::NoFetching;
95//! # use hotshot_query_service::testing::mocks::{MockBase, MockNodeImpl, MockTypes, MockVersions};
96//! # use std::path::Path;
97//! # use tokio::spawn;
98//! # async fn doc(storage_path: &Path, options: Options, hotshot: SystemContextHandle<MockTypes, MockNodeImpl, MockVersions>) -> Result<(), Error> {
99//! use hotshot_query_service::run_standalone_service;
100//!
101//! let data_source = FileSystemDataSource::create(storage_path, NoFetching).await.map_err(Error::internal)?;
102//! spawn(run_standalone_service(options, data_source, hotshot, MockBase::instance()));
103//! # Ok(())
104//! # }
105//! ```
106//!
107//! # Persistence
108//!
109//! Naturally, an archival query service such as this is heavily dependent on a persistent storage
110//! implementation. The APIs provided by this query service are generic over the specific type of
111//! the persistence layer, which we call a _data source_. This crate provides several data source
112//! implementations in the [`data_source`] module.
113//!
114//! # Interaction with other components
115//!
116//! While the HotShot Query Service [can be used as a standalone service](run_standalone_service),
117//! it is designed to be used as a single component of a larger service consisting of several other
118//! interacting components. This interaction has two dimensions:
119//! * _extension_, adding new functionality to the API modules provided by this crate
120//! * _composition_, combining the API modules from this crate with other, application-specific API
121//! modules to create a single [tide_disco] API
122//!
123//! ## Extension
124//!
125//! It is possible to add new functionality directly to the modules provided by this create. This
126//! allows you to keep semantically related functionality grouped together in a single API module,
127//! for interface purposes, even while some of the functionality of that module is provided by this
128//! crate and some of it is an application-specific extension.
129//!
130//! For example, consider an application which is a UTXO-based blockchain. Each transaction consists
131//! of a handful of new _output records_, and you want your query service to provide an API for
132//! looking up a specific output by its index. Semantically, this functionality belongs in the
133//! _data availability_ API, however it is application-specific -- HotShot itself makes no
134//! assumptions and provides no guarantees about the internal structure of a transaction. In order
135//! to expose this UTXO-specific functionality as well as the generic data availability
136//! functionality provided by this crate as part of the same public API, you can extend the
137//! [availability] module of this crate with additional data structures and endpoints that know
138//! about the internal structure of your transactions.
139//!
140//! There are two parts to adding additional functionality to a module in this crate: adding the
141//! required additional data structures to the data source, and creating a new API endpoint to
142//! expose the functionality. The mechanism for the former will depend on the specific data source
143//! you are using. Check the documentation for your data source implementation to see how it can be
144//! extended.
145//!
146//! For the latter, you can modify the default availability API with the addition of a new endpoint
147//! that accesses the custom state you have added to the data source. It is good practice to define
148//! a trait for accessing this custom state, so that if you want to switch data sources in the
149//! future, you can easily extend the new data source, implement the trait, and then transparently
150//! replace the data source that you use to set up your API. In the case of
151//! adding a UTXO index, this trait might look like this:
152//!
153//! ```
154//! # use hotshot_query_service::{
155//! # availability::{AvailabilityDataSource, TransactionIndex},
156//! # testing::mocks::MockTypes as AppTypes,
157//! # };
158//! use async_trait::async_trait;
159//!
160//! #[async_trait]
161//! trait UtxoDataSource: AvailabilityDataSource<AppTypes> {
162//! // Index mapping UTXO index to (block index, transaction index, output index)
163//! async fn find_utxo(&self, utxo: u64) -> Option<(usize, TransactionIndex<AppTypes>, usize)>;
164//! }
165//! ```
166//!
167//! Implement this trait for the extended data source you're using, and then add a new endpoint to
168//! the availability API like so:
169//!
170//! ```
171//! # use async_trait::async_trait;
172//! # use futures::FutureExt;
173//! # use hotshot_query_service::availability::{
174//! # self, AvailabilityDataSource, FetchBlockSnafu, TransactionIndex,
175//! # };
176//! # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
177//! # use hotshot_query_service::testing::mocks::MockBase;
178//! # use hotshot_query_service::{ApiState, Error};
179//! # use snafu::ResultExt;
180//! # use tide_disco::{api::ApiError, method::ReadState, Api, App, StatusCode};
181//! # use vbs::version::StaticVersionType;
182//! # #[async_trait]
183//! # trait UtxoDataSource: AvailabilityDataSource<AppTypes> {
184//! # async fn find_utxo(&self, utxo: u64) -> Option<(usize, TransactionIndex<AppTypes>, usize)>;
185//! # }
186//!
187//! fn define_app_specific_availability_api<State, Ver: StaticVersionType + 'static>(
188//! options: &availability::Options,
189//! bind_version: Ver,
190//! ) -> Result<Api<State, availability::Error, Ver>, ApiError>
191//! where
192//! State: 'static + Send + Sync + ReadState,
193//! <State as ReadState>::State: UtxoDataSource + Send + Sync,
194//! {
195//! let mut api = availability::define_api(options, bind_version)?;
196//! api.get("get_utxo", |req, state: &<State as ReadState>::State| async move {
197//! let utxo_index = req.integer_param("index")?;
198//! let (block_index, txn_index, output_index) = state
199//! .find_utxo(utxo_index)
200//! .await
201//! .ok_or_else(|| availability::Error::Custom {
202//! message: format!("no such UTXO {}", utxo_index),
203//! status: StatusCode::NOT_FOUND,
204//! })?;
205//! let block = state
206//! .get_block(block_index)
207//! .await
208//! .context(FetchBlockSnafu { resource: block_index.to_string() })?;
209//! let txn = block.transaction(&txn_index).unwrap();
210//! let utxo = // Application-specific logic to extract a UTXO from a transaction.
211//! # todo!();
212//! Ok(utxo)
213//! }.boxed())?;
214//! Ok(api)
215//! }
216//!
217//! fn init_server<D: UtxoDataSource + Send + Sync + 'static, Ver: StaticVersionType + 'static>(
218//! options: &availability::Options,
219//! data_source: D,
220//! bind_version: Ver,
221//! ) -> Result<App<ApiState<D>, Error>, availability::Error> {
222//! let api = define_app_specific_availability_api(options, bind_version)
223//! .map_err(availability::Error::internal)?;
224//! let mut app = App::<_, _>::with_state(ApiState::from(data_source));
225//! app.register_module("availability", api).map_err(availability::Error::internal)?;
226//! Ok(app)
227//! }
228//! ```
229//!
230//! Now you need to define the new route, `get_utxo`, in your API specification. Create a file
231//! `app_specific_availability.toml`:
232//!
233//! ```toml
234//! [route.get_utxo]
235//! PATH = ["utxo/:index"]
236//! ":index" = "Integer"
237//! DOC = "Get a UTXO by its index"
238//! ```
239//!
240//! and make sure `options.extensions` includes `"app_specific_availability.toml"`.
241//!
242//! ## Composition
243//!
244//! Composing the modules provided by this crate with other, unrelated modules to create a unified
245//! service is fairly simple, as most of the complexity is handled by [tide_disco], which already
246//! provides a mechanism for composing several modules into a single application. In principle, all
247//! you need to do is register the [availability], [node], and [status] APIs provided by this crate
248//! with a [tide_disco::App], and then register your own API modules with the same app.
249//!
250//! The one wrinkle is that all modules within a [tide_disco] app must share the same state type. It
251//! is for this reason that the modules provided by this crate are generic on the state type --
252//! [availability::define_api], [node::define_api], and [status::define_api] can all work with any
253//! state type, provided that type implements the corresponding data source traits. The data sources
254//! provided by this crate implement these traits, but if you want to use a custom state type that
255//! includes state for other modules, you will need to implement these traits for your custom type.
256//! The basic pattern looks like this:
257//!
258//! ```
259//! # use async_trait::async_trait;
260//! # use hotshot_query_service::{Header, QueryResult, VidShare};
261//! # use hotshot_query_service::availability::{
262//! # AvailabilityDataSource, BlockId, BlockQueryData, Fetch, FetchStream, LeafId, LeafQueryData,
263//! # PayloadMetadata, PayloadQueryData, TransactionFromBlock, TransactionHash,
264//! # VidCommonMetadata, VidCommonQueryData,
265//! # };
266//! # use hotshot_query_service::metrics::PrometheusMetrics;
267//! # use hotshot_query_service::node::{
268//! # NodeDataSource, SyncStatus, TimeWindowQueryData, WindowStart,
269//! # };
270//! # use hotshot_query_service::status::{HasMetrics, StatusDataSource};
271//! # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
272//! # use std::ops::{Bound, RangeBounds};
273//! # type AppQueryData = ();
274//! // Our AppState takes an underlying data source `D` which already implements the relevant
275//! // traits, and adds some state for use with other modules.
276//! struct AppState<D> {
277//! hotshot_qs: D,
278//! // additional state for other modules
279//! }
280//!
281//! // Implement data source trait for availability API by delegating to the underlying data source.
282//! #[async_trait]
283//! impl<D: AvailabilityDataSource<AppTypes> + Send + Sync>
284//! AvailabilityDataSource<AppTypes> for AppState<D>
285//! {
286//! async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<AppTypes>>
287//! where
288//! ID: Into<LeafId<AppTypes>> + Send + Sync,
289//! {
290//! self.hotshot_qs.get_leaf(id).await
291//! }
292//!
293//! // etc
294//! # async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<AppTypes>>
295//! # where
296//! # ID: Into<BlockId<AppTypes>> + Send + Sync { todo!() }
297//! # async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<AppTypes>>
298//! # where
299//! # ID: Into<BlockId<AppTypes>> + Send + Sync { todo!() }
300//! # async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<AppTypes>>
301//! # where
302//! # ID: Into<BlockId<AppTypes>> + Send + Sync { todo!() }
303//! # async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<AppTypes>>
304//! # where
305//! # ID: Into<BlockId<AppTypes>> + Send + Sync { todo!() }
306//! # async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<AppTypes>>
307//! # where
308//! # ID: Into<BlockId<AppTypes>> + Send + Sync { todo!() }
309//! # async fn get_transaction<T: TransactionFromBlock<AppTypes>>(&self, hash: TransactionHash<AppTypes>) -> Fetch<T> { todo!() }
310//! # async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<AppTypes>>
311//! # where
312//! # R: RangeBounds<usize> + Send { todo!() }
313//! # async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<AppTypes>>
314//! # where
315//! # R: RangeBounds<usize> + Send { todo!() }
316//! # async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<AppTypes>>
317//! # where
318//! # R: RangeBounds<usize> + Send { todo!() }
319//! # async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<AppTypes>>
320//! # where
321//! # R: RangeBounds<usize> + Send { todo!() }
322//! # async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<AppTypes>>
323//! # where
324//! # R: RangeBounds<usize> + Send { todo!() }
325//! # async fn get_vid_common_metadata_range<R>(&self, range: R) -> FetchStream<VidCommonMetadata<AppTypes>>
326//! # where
327//! # R: RangeBounds<usize> + Send { todo!() }
328//! # async fn get_leaf_range_rev(&self, start: Bound<usize>, end: usize) -> FetchStream<LeafQueryData<AppTypes>> { todo!() }
329//! # async fn get_block_range_rev(&self, start: Bound<usize>, end: usize) -> FetchStream<BlockQueryData<AppTypes>> { todo!() }
330//! # async fn get_payload_range_rev(&self, start: Bound<usize>, end: usize) -> FetchStream<PayloadQueryData<AppTypes>> { todo!() }
331//! # async fn get_payload_metadata_range_rev(&self, start: Bound<usize>, end: usize) -> FetchStream<PayloadMetadata<AppTypes>> { todo!() }
332//! # async fn get_vid_common_range_rev(&self, start: Bound<usize>, end: usize) -> FetchStream<VidCommonQueryData<AppTypes>> { todo!() }
333//! # async fn get_vid_common_metadata_range_rev(&self, start: Bound<usize>, end: usize) -> FetchStream<VidCommonMetadata<AppTypes>> { todo!() }
334//! }
335//!
336//! // Implement data source trait for node API by delegating to the underlying data source.
337//! #[async_trait]
338//! impl<D: NodeDataSource<AppTypes> + Send + Sync> NodeDataSource<AppTypes> for AppState<D> {
339//! async fn block_height(&self) -> QueryResult<usize> {
340//! self.hotshot_qs.block_height().await
341//! }
342//!
343//! async fn count_transactions_in_range(
344//! &self,
345//! range: impl RangeBounds<usize> + Send,
346//! ) -> QueryResult<usize> {
347//! self.hotshot_qs.count_transactions_in_range(range).await
348//! }
349//!
350//! async fn payload_size_in_range(
351//! &self,
352//! range: impl RangeBounds<usize> + Send,
353//! ) -> QueryResult<usize> {
354//! self.hotshot_qs.payload_size_in_range(range).await
355//! }
356//!
357//! async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
358//! where
359//! ID: Into<BlockId<AppTypes>> + Send + Sync,
360//! {
361//! self.hotshot_qs.vid_share(id).await
362//! }
363//!
364//! async fn sync_status(&self) -> QueryResult<SyncStatus> {
365//! self.hotshot_qs.sync_status().await
366//! }
367//!
368//! async fn get_header_window(
369//! &self,
370//! start: impl Into<WindowStart<AppTypes>> + Send + Sync,
371//! end: u64,
372//! limit: usize,
373//! ) -> QueryResult<TimeWindowQueryData<Header<AppTypes>>> {
374//! self.hotshot_qs.get_header_window(start, end, limit).await
375//! }
376//! }
377//!
378//! // Implement data source trait for status API by delegating to the underlying data source.
379//! impl<D: HasMetrics> HasMetrics for AppState<D> {
380//! fn metrics(&self) -> &PrometheusMetrics {
381//! self.hotshot_qs.metrics()
382//! }
383//! }
384//! #[async_trait]
385//! impl<D: StatusDataSource + Send + Sync> StatusDataSource for AppState<D> {
386//! async fn block_height(&self) -> QueryResult<usize> {
387//! self.hotshot_qs.block_height().await
388//! }
389//! }
390//!
391//! // Implement data source traits for other modules, using additional state from AppState.
392//! ```
393//!
394//! In the future, we may provide derive macros for
395//! [AvailabilityDataSource](availability::AvailabilityDataSource),
396//! [NodeDataSource](node::NodeDataSource), and [StatusDataSource](status::StatusDataSource) to
397//! eliminate the boilerplate of implementing them for a custom type that has an existing
398//! implementation as one of its fields.
399//!
400//! Once you have created your `AppState` type aggregating the state for each API module, you can
401//! initialize the state as normal, instantiating `D` with a concrete implementation of a data
402//! source and initializing `hotshot_qs` as you normally would that data source.
403//!
404//! _However_, this only works if you want the persistent storage for the availability and node
405//! modules (managed by `hotshot_qs`) to be independent of the persistent storage for other modules.
406//! You may well want to synchronize the storage for all modules together, so that updates to the
407//! entire application state can be done atomically. This is particularly relevant if one of your
408//! application-specific modules updates its storage based on a stream of HotShot leaves. Since the
409//! availability and node modules also update with each new leaf, you probably want all of these
410//! modules to stay in sync. The data source implementations provided by this crate provide means by
411//! which you can add additional data to the same persistent store and synchronize the entire store
412//! together. Refer to the documentation for you specific data source for information on how to
413//! achieve this.
414//!
415
416mod api;
417pub mod availability;
418pub mod data_source;
419mod error;
420pub mod explorer;
421pub mod fetching;
422pub mod merklized_state;
423pub mod metrics;
424pub mod node;
425mod resolvable;
426pub mod status;
427pub mod task;
428pub mod testing;
429pub mod types;
430
431use std::sync::Arc;
432
433use async_trait::async_trait;
434use derive_more::{Deref, From, Into};
435pub use error::Error;
436use futures::{future::BoxFuture, stream::StreamExt};
437use hotshot::types::SystemContextHandle;
438pub use hotshot_query_service_types::{
439 ErrorSnafu, Header, Leaf2, Metadata, MissingSnafu, NotFoundSnafu, Payload, QueryError,
440 QueryResult, QuorumCertificate, SignatureKey, Transaction,
441};
442use hotshot_types::{
443 new_protocol::CoordinatorEvent,
444 traits::node_implementation::{NodeImplementation, NodeType},
445};
446pub use resolvable::Resolvable;
447use task::BackgroundTask;
448use tide_disco::{App, method::ReadState};
449use vbs::version::StaticVersionType;
450
451#[derive(Default)]
452pub struct Options {
453 pub availability: availability::Options,
454 pub node: node::Options,
455 pub status: status::Options,
456 pub port: u16,
457}
458
459/// Read-only wrapper for API state which does not require locking.
460#[derive(Clone, Debug, Deref, From, Into)]
461pub struct ApiState<D>(Arc<D>);
462
463#[async_trait]
464impl<D: 'static + Send + Sync> ReadState for ApiState<D> {
465 type State = D;
466 async fn read<T>(
467 &self,
468 op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait,
469 ) -> T {
470 op(&self.0).await
471 }
472}
473
474impl<D> From<D> for ApiState<D> {
475 fn from(d: D) -> Self {
476 Self::from(Arc::new(d))
477 }
478}
479
480/// Run an instance of the HotShot Query service with no customization.
481pub async fn run_standalone_service<Types: NodeType, I: NodeImplementation<Types>, D, ApiVer>(
482 options: Options,
483 data_source: D,
484 hotshot: SystemContextHandle<Types, I>,
485 bind_version: ApiVer,
486) -> Result<(), Error>
487where
488 Payload<Types>: availability::QueryablePayload<Types>,
489 Header<Types>: availability::QueryableHeader<Types>,
490 D: availability::AvailabilityDataSource<Types>
491 + data_source::UpdateDataSource<Types>
492 + node::NodeDataSource<Types>
493 + status::StatusDataSource
494 + data_source::VersionedDataSource
495 + Send
496 + Sync
497 + 'static,
498 ApiVer: StaticVersionType + 'static,
499{
500 // Create API modules.
501 let availability_api_v0 = availability::define_api(
502 &options.availability,
503 bind_version,
504 "0.0.1".parse().unwrap(),
505 )
506 .map_err(Error::internal)?;
507
508 let availability_api_v1 = availability::define_api(
509 &options.availability,
510 bind_version,
511 "1.0.0".parse().unwrap(),
512 )
513 .map_err(Error::internal)?;
514 let node_api = node::define_api(&options.node, bind_version, "0.0.1".parse().unwrap())
515 .map_err(Error::internal)?;
516 let status_api = status::define_api(&options.status, bind_version, "0.0.1".parse().unwrap())
517 .map_err(Error::internal)?;
518
519 // Create app.
520 let data_source = Arc::new(data_source);
521 let mut app = App::<_, Error>::with_state(ApiState(data_source.clone()));
522 app.register_module("availability", availability_api_v0)
523 .map_err(Error::internal)?
524 .register_module("availability", availability_api_v1)
525 .map_err(Error::internal)?
526 .register_module("node", node_api)
527 .map_err(Error::internal)?
528 .register_module("status", status_api)
529 .map_err(Error::internal)?;
530
531 // Serve app.
532 let url = format!("0.0.0.0:{}", options.port);
533 let _server =
534 BackgroundTask::spawn("server", async move { app.serve(&url, bind_version).await });
535
536 // Subscribe to events before starting consensus, so we don't miss any events.
537 let mut events = hotshot.event_stream();
538 hotshot.hotshot.start_consensus().await;
539
540 // Update query data using HotShot events.
541 while let Some(event) = events.next().await {
542 // Update the query data based on this event. It is safe to ignore errors here; the error
543 // just returns the failed block height for use in garbage collection, but this simple
544 // implementation isn't doing any kind of garbage collection.
545 let event = CoordinatorEvent::LegacyEvent(event);
546 data_source.update(&event).await.ok();
547 }
548
549 Ok(())
550}
551
552#[cfg(test)]
553mod test {
554 use std::{
555 ops::{Bound, RangeBounds},
556 time::Duration,
557 };
558
559 use async_lock::RwLock;
560 use async_trait::async_trait;
561 use atomic_store::{AtomicStore, AtomicStoreLoader, RollingLog, load_store::BincodeLoadStore};
562 use futures::future::FutureExt;
563 use hotshot_example_types::node_types::TEST_VERSIONS;
564 use hotshot_types::{data::VidShare, simple_certificate::QuorumCertificate2};
565 use surf_disco::Client;
566 use tempfile::TempDir;
567 use test_utils::reserve_tcp_port;
568 use testing::mocks::MockBase;
569 use tide_disco::App;
570 use toml::toml;
571
572 use super::*;
573 use crate::{
574 availability::{
575 AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction,
576 Fetch, FetchStream, LeafId, LeafQueryData, NamespaceId, PayloadMetadata,
577 PayloadQueryData, TransactionHash, UpdateAvailabilityData, VidCommonMetadata,
578 VidCommonQueryData,
579 },
580 metrics::PrometheusMetrics,
581 node::{NodeDataSource, SyncStatusQueryData, TimeWindowQueryData, WindowStart},
582 status::{HasMetrics, StatusDataSource},
583 testing::{
584 consensus::MockDataSource,
585 mocks::{MockHeader, MockPayload, MockTypes},
586 },
587 };
588
589 struct CompositeState {
590 store: AtomicStore,
591 hotshot_qs: MockDataSource,
592 module_state: RollingLog<BincodeLoadStore<u64>>,
593 }
594
595 #[async_trait]
596 impl AvailabilityDataSource<MockTypes> for CompositeState {
597 async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<MockTypes>>
598 where
599 ID: Into<LeafId<MockTypes>> + Send + Sync,
600 {
601 self.hotshot_qs.get_leaf(id).await
602 }
603 async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<MockTypes>>
604 where
605 ID: Into<BlockId<MockTypes>> + Send + Sync,
606 {
607 self.hotshot_qs.get_block(id).await
608 }
609
610 async fn get_header<ID>(&self, id: ID) -> Fetch<Header<MockTypes>>
611 where
612 ID: Into<BlockId<MockTypes>> + Send + Sync,
613 {
614 self.hotshot_qs.get_header(id).await
615 }
616 async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<MockTypes>>
617 where
618 ID: Into<BlockId<MockTypes>> + Send + Sync,
619 {
620 self.hotshot_qs.get_payload(id).await
621 }
622 async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<MockTypes>>
623 where
624 ID: Into<BlockId<MockTypes>> + Send + Sync,
625 {
626 self.hotshot_qs.get_payload_metadata(id).await
627 }
628 async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<MockTypes>>
629 where
630 ID: Into<BlockId<MockTypes>> + Send + Sync,
631 {
632 self.hotshot_qs.get_vid_common(id).await
633 }
634 async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<MockTypes>>
635 where
636 ID: Into<BlockId<MockTypes>> + Send + Sync,
637 {
638 self.hotshot_qs.get_vid_common_metadata(id).await
639 }
640 async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<MockTypes>>
641 where
642 R: RangeBounds<usize> + Send + 'static,
643 {
644 self.hotshot_qs.get_leaf_range(range).await
645 }
646 async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<MockTypes>>
647 where
648 R: RangeBounds<usize> + Send + 'static,
649 {
650 self.hotshot_qs.get_block_range(range).await
651 }
652
653 async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<MockTypes>>
654 where
655 R: RangeBounds<usize> + Send + 'static,
656 {
657 self.hotshot_qs.get_header_range(range).await
658 }
659 async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<MockTypes>>
660 where
661 R: RangeBounds<usize> + Send + 'static,
662 {
663 self.hotshot_qs.get_payload_range(range).await
664 }
665 async fn get_payload_metadata_range<R>(
666 &self,
667 range: R,
668 ) -> FetchStream<PayloadMetadata<MockTypes>>
669 where
670 R: RangeBounds<usize> + Send + 'static,
671 {
672 self.hotshot_qs.get_payload_metadata_range(range).await
673 }
674 async fn get_vid_common_range<R>(
675 &self,
676 range: R,
677 ) -> FetchStream<VidCommonQueryData<MockTypes>>
678 where
679 R: RangeBounds<usize> + Send + 'static,
680 {
681 self.hotshot_qs.get_vid_common_range(range).await
682 }
683 async fn get_vid_common_metadata_range<R>(
684 &self,
685 range: R,
686 ) -> FetchStream<VidCommonMetadata<MockTypes>>
687 where
688 R: RangeBounds<usize> + Send + 'static,
689 {
690 self.hotshot_qs.get_vid_common_metadata_range(range).await
691 }
692 async fn get_leaf_range_rev(
693 &self,
694 start: Bound<usize>,
695 end: usize,
696 ) -> FetchStream<LeafQueryData<MockTypes>> {
697 self.hotshot_qs.get_leaf_range_rev(start, end).await
698 }
699 async fn get_block_range_rev(
700 &self,
701 start: Bound<usize>,
702 end: usize,
703 ) -> FetchStream<BlockQueryData<MockTypes>> {
704 self.hotshot_qs.get_block_range_rev(start, end).await
705 }
706 async fn get_payload_range_rev(
707 &self,
708 start: Bound<usize>,
709 end: usize,
710 ) -> FetchStream<PayloadQueryData<MockTypes>> {
711 self.hotshot_qs.get_payload_range_rev(start, end).await
712 }
713 async fn get_payload_metadata_range_rev(
714 &self,
715 start: Bound<usize>,
716 end: usize,
717 ) -> FetchStream<PayloadMetadata<MockTypes>> {
718 self.hotshot_qs
719 .get_payload_metadata_range_rev(start, end)
720 .await
721 }
722 async fn get_vid_common_range_rev(
723 &self,
724 start: Bound<usize>,
725 end: usize,
726 ) -> FetchStream<VidCommonQueryData<MockTypes>> {
727 self.hotshot_qs.get_vid_common_range_rev(start, end).await
728 }
729 async fn get_vid_common_metadata_range_rev(
730 &self,
731 start: Bound<usize>,
732 end: usize,
733 ) -> FetchStream<VidCommonMetadata<MockTypes>> {
734 self.hotshot_qs
735 .get_vid_common_metadata_range_rev(start, end)
736 .await
737 }
738 async fn get_block_containing_transaction(
739 &self,
740 hash: TransactionHash<MockTypes>,
741 ) -> Fetch<BlockWithTransaction<MockTypes>> {
742 self.hotshot_qs.get_block_containing_transaction(hash).await
743 }
744 }
745
746 // Imiplement data source trait for node API.
747 #[async_trait]
748 impl NodeDataSource<MockTypes> for CompositeState {
749 async fn block_height(&self) -> QueryResult<usize> {
750 StatusDataSource::block_height(self).await
751 }
752 async fn count_transactions_in_range(
753 &self,
754 range: impl RangeBounds<usize> + Send,
755 namespace: Option<NamespaceId<MockTypes>>,
756 ) -> QueryResult<usize> {
757 self.hotshot_qs
758 .count_transactions_in_range(range, namespace)
759 .await
760 }
761 async fn payload_size_in_range(
762 &self,
763 range: impl RangeBounds<usize> + Send,
764 namespace: Option<NamespaceId<MockTypes>>,
765 ) -> QueryResult<usize> {
766 self.hotshot_qs
767 .payload_size_in_range(range, namespace)
768 .await
769 }
770 async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
771 where
772 ID: Into<BlockId<MockTypes>> + Send + Sync,
773 {
774 self.hotshot_qs.vid_share(id).await
775 }
776 async fn sync_status(&self) -> QueryResult<SyncStatusQueryData> {
777 self.hotshot_qs.sync_status().await
778 }
779 async fn get_header_window(
780 &self,
781 start: impl Into<WindowStart<MockTypes>> + Send + Sync,
782 end: u64,
783 limit: usize,
784 ) -> QueryResult<TimeWindowQueryData<Header<MockTypes>>> {
785 self.hotshot_qs.get_header_window(start, end, limit).await
786 }
787 }
788
789 // Implement data source trait for status API.
790 impl HasMetrics for CompositeState {
791 fn metrics(&self) -> &PrometheusMetrics {
792 self.hotshot_qs.metrics()
793 }
794 }
795 #[async_trait]
796 impl StatusDataSource for CompositeState {
797 async fn block_height(&self) -> QueryResult<usize> {
798 StatusDataSource::block_height(&self.hotshot_qs).await
799 }
800 }
801
802 #[tokio::test(flavor = "multi_thread")]
803 async fn test_composition() {
804 let dir = TempDir::with_prefix("test_composition").unwrap();
805 let mut loader = AtomicStoreLoader::create(dir.path(), "test_composition").unwrap();
806 let hotshot_qs = MockDataSource::create_builder_with_store(&mut loader, Default::default())
807 .await
808 .unwrap()
809 .with_sync_status_ttl(Duration::ZERO)
810 .build()
811 .await
812 .unwrap();
813
814 // Mock up some data and add a block to the store.
815 let leaf = Leaf2::<MockTypes>::genesis(
816 &Default::default(),
817 &Default::default(),
818 TEST_VERSIONS.test.base,
819 )
820 .await;
821 let qc = QuorumCertificate2::genesis(
822 &Default::default(),
823 &Default::default(),
824 TEST_VERSIONS.test,
825 )
826 .await;
827 let leaf = LeafQueryData::new(leaf, qc).unwrap();
828 let block = BlockQueryData::new(leaf.header().clone(), MockPayload::genesis());
829 hotshot_qs
830 .append(BlockInfo::new(leaf, Some(block), None, None))
831 .await
832 .unwrap();
833
834 let module_state =
835 RollingLog::create(&mut loader, Default::default(), "module_state", 1024).unwrap();
836 let state = CompositeState {
837 hotshot_qs,
838 module_state,
839 store: AtomicStore::open(loader).unwrap(),
840 };
841
842 let module_spec = toml! {
843 [route.post_ext]
844 PATH = ["/ext/:val"]
845 METHOD = "POST"
846 ":val" = "Integer"
847
848 [route.get_ext]
849 PATH = ["/ext"]
850 METHOD = "GET"
851 };
852
853 let mut app = App::<_, Error>::with_state(RwLock::new(state));
854 app.register_module(
855 "availability",
856 availability::define_api(
857 &Default::default(),
858 MockBase::instance(),
859 "0.0.1".parse().unwrap(),
860 )
861 .unwrap(),
862 )
863 .unwrap()
864 .register_module(
865 "node",
866 node::define_api(
867 &Default::default(),
868 MockBase::instance(),
869 "0.0.1".parse().unwrap(),
870 )
871 .unwrap(),
872 )
873 .unwrap()
874 .register_module(
875 "status",
876 status::define_api(
877 &Default::default(),
878 MockBase::instance(),
879 "0.0.1".parse().unwrap(),
880 )
881 .unwrap(),
882 )
883 .unwrap()
884 .module::<Error, MockBase>("mod", module_spec)
885 .unwrap()
886 .get("get_ext", |_, state| {
887 async move { state.module_state.load_latest().map_err(Error::internal) }.boxed()
888 })
889 .unwrap()
890 .post("post_ext", |req, state| {
891 async move {
892 state
893 .module_state
894 .store_resource(&req.integer_param("val").map_err(Error::internal)?)
895 .map_err(Error::internal)?;
896 state
897 .module_state
898 .commit_version()
899 .map_err(Error::internal)?;
900 state
901 .hotshot_qs
902 .skip_version()
903 .await
904 .map_err(Error::internal)?;
905 state.store.commit_version().map_err(Error::internal)
906 }
907 .boxed()
908 })
909 .unwrap();
910
911 let port = reserve_tcp_port().unwrap();
912 let _server = BackgroundTask::spawn(
913 "server",
914 app.serve(format!("0.0.0.0:{port}"), MockBase::instance()),
915 );
916
917 let client =
918 Client::<Error, MockBase>::new(format!("http://localhost:{port}").parse().unwrap());
919 assert!(client.connect(Some(Duration::from_secs(60))).await);
920
921 client.post::<()>("mod/ext/42").send().await.unwrap();
922 assert_eq!(client.get::<u64>("mod/ext").send().await.unwrap(), 42);
923
924 // Check that we can still access the built-in modules.
925 assert_eq!(
926 client
927 .get::<u64>("status/block-height")
928 .send()
929 .await
930 .unwrap(),
931 1
932 );
933 let sync_status: SyncStatusQueryData = client.get("node/sync-status").send().await.unwrap();
934 assert_eq!(sync_status.blocks.missing, 0);
935 assert_eq!(sync_status.leaves.missing, 0);
936 assert_eq!(sync_status.vid_common.missing, 1);
937
938 assert_eq!(
939 client
940 .get::<MockHeader>("availability/header/0")
941 .send()
942 .await
943 .unwrap()
944 .block_number,
945 0
946 );
947 }
948}