Skip to main content

hotshot_query_service/data_source/
extension.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use std::{
14    ops::{Bound, RangeBounds},
15    sync::Arc,
16};
17
18use async_trait::async_trait;
19use futures::stream::BoxStream;
20use hotshot::types::Event;
21use hotshot_events_service::events_source::{EventFilterSet, EventsSource, StartupInfo};
22use hotshot_types::{
23    data::VidShare, event::LegacyEvent, new_protocol::CoordinatorEvent,
24    traits::node_implementation::NodeType,
25};
26use jf_merkle_tree_compat::prelude::MerkleProof;
27use tagged_base64::TaggedBase64;
28
29use super::VersionedDataSource;
30use crate::{
31    Header, Payload, QueryResult, Transaction,
32    availability::{
33        AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction,
34        Certificate2, Fetch, FetchStream, LeafId, LeafQueryData, NamespaceId, PayloadMetadata,
35        PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash,
36        UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
37    },
38    data_source::storage::pruning::PrunedHeightDataSource,
39    explorer::{self, ExplorerDataSource, ExplorerHeader, ExplorerTransaction},
40    merklized_state::{
41        MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
42        UpdateStateData,
43    },
44    metrics::PrometheusMetrics,
45    node::{NodeDataSource, SyncStatusQueryData, TimeWindowQueryData, WindowStart},
46    status::{HasMetrics, StatusDataSource},
47};
48/// Wrapper to add extensibility to an existing data source.
49///
50/// [`ExtensibleDataSource`] adds app-specific data to any existing data source. It implements all
51/// the data source traits defined in this crate as long as the underlying data source does so,
52/// which means it can be used as state for instantiating the APIs defined in this crate. At the
53/// same time, it provides access to an application-defined state type, which means it can also be
54/// used to implement application-specific endpoints.
55///
56/// [`ExtensibleDataSource`] implements `AsRef<U>` and `AsMut<U>` for some user-defined type `U`, so
57/// your API extensions can always access application-specific state from [`ExtensibleDataSource`].
58/// We can use this to complete the [UTXO example](crate#extension) by extending our data source
59/// with an index to look up transactions by the UTXOs they contain:
60///
61/// ```
62/// # use async_trait::async_trait;
63/// # use hotshot_query_service::availability::{AvailabilityDataSource, TransactionIndex};
64/// # use hotshot_query_service::data_source::ExtensibleDataSource;
65/// # use hotshot_query_service::testing::mocks::MockTypes as AppTypes;
66/// # use std::collections::HashMap;
67/// # #[async_trait]
68/// # trait UtxoDataSource: AvailabilityDataSource<AppTypes> {
69/// #   async fn find_utxo(&self, utxo: u64) -> Option<(usize, TransactionIndex<AppTypes>, usize)>;
70/// # }
71/// type UtxoIndex = HashMap<u64, (usize, TransactionIndex<AppTypes>, usize)>;
72///
73/// #[async_trait]
74/// impl<UnderlyingDataSource> UtxoDataSource for
75///     ExtensibleDataSource<UnderlyingDataSource, UtxoIndex>
76/// where
77///     UnderlyingDataSource: AvailabilityDataSource<AppTypes> + Send + Sync,
78/// {
79///     async fn find_utxo(&self, utxo: u64) -> Option<(usize, TransactionIndex<AppTypes>, usize)> {
80///         self.as_ref().get(&utxo).cloned()
81///     }
82/// }
83/// ```
84#[derive(Clone, Copy, Debug)]
85pub struct ExtensibleDataSource<D, U> {
86    data_source: D,
87    user_data: U,
88}
89
90impl<D, U> ExtensibleDataSource<D, U> {
91    pub fn new(data_source: D, user_data: U) -> Self {
92        Self {
93            data_source,
94            user_data,
95        }
96    }
97
98    /// Access the underlying data source.
99    ///
100    /// This functionality is provided as an inherent method rather than an implementation of the
101    /// [`AsRef`] trait so that `self.as_ref()` unambiguously returns `&U`, helping with type
102    /// inference.
103    pub fn inner(&self) -> &D {
104        &self.data_source
105    }
106
107    /// Mutably access the underlying data source.
108    ///
109    /// This functionality is provided as an inherent method rather than an implementation of the
110    /// [`AsMut`] trait so that `self.as_mut()` unambiguously returns `&U`, helping with type
111    /// inference.
112    pub fn inner_mut(&mut self) -> &mut D {
113        &mut self.data_source
114    }
115}
116
117impl<D, U> AsRef<U> for ExtensibleDataSource<D, U> {
118    fn as_ref(&self) -> &U {
119        &self.user_data
120    }
121}
122
123impl<D, U> AsMut<U> for ExtensibleDataSource<D, U> {
124    fn as_mut(&mut self) -> &mut U {
125        &mut self.user_data
126    }
127}
128
129impl<D, U> VersionedDataSource for ExtensibleDataSource<D, U>
130where
131    D: VersionedDataSource + Send,
132    U: Send + Sync,
133{
134    type Transaction<'a>
135        = D::Transaction<'a>
136    where
137        Self: 'a;
138
139    type ReadOnly<'a>
140        = D::ReadOnly<'a>
141    where
142        Self: 'a;
143
144    async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
145        self.data_source.write().await
146    }
147
148    async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
149        self.data_source.read().await
150    }
151}
152
153#[async_trait]
154impl<D, U> PrunedHeightDataSource for ExtensibleDataSource<D, U>
155where
156    D: PrunedHeightDataSource + Send + Sync,
157    U: Send + Sync,
158{
159    async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
160        self.data_source.load_pruned_height().await
161    }
162}
163
164#[async_trait]
165impl<D, U, Types> AvailabilityDataSource<Types> for ExtensibleDataSource<D, U>
166where
167    D: AvailabilityDataSource<Types> + Send + Sync,
168    U: Send + Sync,
169    Types: NodeType,
170    Header<Types>: QueryableHeader<Types>,
171    Payload<Types>: QueryablePayload<Types>,
172{
173    async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
174    where
175        ID: Into<LeafId<Types>> + Send + Sync,
176    {
177        self.data_source.get_leaf(id).await
178    }
179
180    async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
181    where
182        ID: Into<BlockId<Types>> + Send + Sync,
183    {
184        self.data_source.get_header(id).await
185    }
186
187    async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
188    where
189        ID: Into<BlockId<Types>> + Send + Sync,
190    {
191        self.data_source.get_block(id).await
192    }
193    async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
194    where
195        ID: Into<BlockId<Types>> + Send + Sync,
196    {
197        self.data_source.get_payload(id).await
198    }
199    async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
200    where
201        ID: Into<BlockId<Types>> + Send + Sync,
202    {
203        self.data_source.get_payload_metadata(id).await
204    }
205    async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
206    where
207        ID: Into<BlockId<Types>> + Send + Sync,
208    {
209        self.data_source.get_vid_common(id).await
210    }
211    async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
212    where
213        ID: Into<BlockId<Types>> + Send + Sync,
214    {
215        self.data_source.get_vid_common_metadata(id).await
216    }
217    async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
218    where
219        R: RangeBounds<usize> + Send + 'static,
220    {
221        self.data_source.get_leaf_range(range).await
222    }
223    async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
224    where
225        R: RangeBounds<usize> + Send + 'static,
226    {
227        self.data_source.get_block_range(range).await
228    }
229
230    async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
231    where
232        R: RangeBounds<usize> + Send + 'static,
233    {
234        self.data_source.get_header_range(range).await
235    }
236    async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
237    where
238        R: RangeBounds<usize> + Send + 'static,
239    {
240        self.data_source.get_payload_range(range).await
241    }
242    async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
243    where
244        R: RangeBounds<usize> + Send + 'static,
245    {
246        self.data_source.get_payload_metadata_range(range).await
247    }
248    async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
249    where
250        R: RangeBounds<usize> + Send + 'static,
251    {
252        self.data_source.get_vid_common_range(range).await
253    }
254    async fn get_vid_common_metadata_range<R>(
255        &self,
256        range: R,
257    ) -> FetchStream<VidCommonMetadata<Types>>
258    where
259        R: RangeBounds<usize> + Send + 'static,
260    {
261        self.data_source.get_vid_common_metadata_range(range).await
262    }
263
264    async fn get_leaf_range_rev(
265        &self,
266        start: Bound<usize>,
267        end: usize,
268    ) -> FetchStream<LeafQueryData<Types>> {
269        self.data_source.get_leaf_range_rev(start, end).await
270    }
271    async fn get_block_range_rev(
272        &self,
273        start: Bound<usize>,
274        end: usize,
275    ) -> FetchStream<BlockQueryData<Types>> {
276        self.data_source.get_block_range_rev(start, end).await
277    }
278    async fn get_payload_range_rev(
279        &self,
280        start: Bound<usize>,
281        end: usize,
282    ) -> FetchStream<PayloadQueryData<Types>> {
283        self.data_source.get_payload_range_rev(start, end).await
284    }
285    async fn get_payload_metadata_range_rev(
286        &self,
287        start: Bound<usize>,
288        end: usize,
289    ) -> FetchStream<PayloadMetadata<Types>> {
290        self.data_source
291            .get_payload_metadata_range_rev(start, end)
292            .await
293    }
294    async fn get_vid_common_range_rev(
295        &self,
296        start: Bound<usize>,
297        end: usize,
298    ) -> FetchStream<VidCommonQueryData<Types>> {
299        self.data_source.get_vid_common_range_rev(start, end).await
300    }
301    async fn get_vid_common_metadata_range_rev(
302        &self,
303        start: Bound<usize>,
304        end: usize,
305    ) -> FetchStream<VidCommonMetadata<Types>> {
306        self.data_source
307            .get_vid_common_metadata_range_rev(start, end)
308            .await
309    }
310    async fn get_block_containing_transaction(
311        &self,
312        h: TransactionHash<Types>,
313    ) -> Fetch<BlockWithTransaction<Types>> {
314        self.data_source.get_block_containing_transaction(h).await
315    }
316
317    async fn get_cert2(&self, height: u64) -> QueryResult<Option<Certificate2<Types>>> {
318        self.data_source.get_cert2(height).await
319    }
320}
321
322impl<D, U, Types> UpdateAvailabilityData<Types> for ExtensibleDataSource<D, U>
323where
324    D: UpdateAvailabilityData<Types> + Send + Sync,
325    U: Send + Sync,
326    Types: NodeType,
327{
328    async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
329        self.data_source.append(info).await
330    }
331
332    async fn append_payload(&self, block: BlockQueryData<Types>) -> anyhow::Result<()> {
333        self.data_source.append_payload(block).await
334    }
335}
336
337#[async_trait]
338impl<D, U, Types> NodeDataSource<Types> for ExtensibleDataSource<D, U>
339where
340    D: NodeDataSource<Types> + Send + Sync,
341    U: Send + Sync,
342    Types: NodeType,
343    Header<Types>: QueryableHeader<Types>,
344{
345    async fn block_height(&self) -> QueryResult<usize> {
346        self.data_source.block_height().await
347    }
348    async fn count_transactions_in_range(
349        &self,
350        range: impl RangeBounds<usize> + Send,
351        namespace: Option<NamespaceId<Types>>,
352    ) -> QueryResult<usize> {
353        self.data_source
354            .count_transactions_in_range(range, namespace)
355            .await
356    }
357    async fn payload_size_in_range(
358        &self,
359        range: impl RangeBounds<usize> + Send,
360        namespace: Option<NamespaceId<Types>>,
361    ) -> QueryResult<usize> {
362        self.data_source
363            .payload_size_in_range(range, namespace)
364            .await
365    }
366    async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
367    where
368        ID: Into<BlockId<Types>> + Send + Sync,
369    {
370        self.data_source.vid_share(id).await
371    }
372    async fn sync_status(&self) -> QueryResult<SyncStatusQueryData> {
373        self.data_source.sync_status().await
374    }
375    async fn get_header_window(
376        &self,
377        start: impl Into<WindowStart<Types>> + Send + Sync,
378        end: u64,
379        limit: usize,
380    ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
381        self.data_source.get_header_window(start, end, limit).await
382    }
383}
384
385impl<D, U> HasMetrics for ExtensibleDataSource<D, U>
386where
387    D: HasMetrics,
388{
389    fn metrics(&self) -> &PrometheusMetrics {
390        self.data_source.metrics()
391    }
392}
393
394#[async_trait]
395impl<D, U> StatusDataSource for ExtensibleDataSource<D, U>
396where
397    D: StatusDataSource + Send + Sync,
398    U: Send + Sync,
399{
400    async fn block_height(&self) -> QueryResult<usize> {
401        self.data_source.block_height().await
402    }
403}
404
405#[async_trait]
406impl<D, U, Types, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
407    for ExtensibleDataSource<D, U>
408where
409    D: MerklizedStateDataSource<Types, State, ARITY> + Sync,
410    U: Send + Sync,
411    Types: NodeType,
412    State: MerklizedState<Types, ARITY>,
413{
414    async fn get_path(
415        &self,
416        snapshot: Snapshot<Types, State, ARITY>,
417        key: State::Key,
418    ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
419        self.data_source.get_path(snapshot, key).await
420    }
421}
422
423#[async_trait]
424impl<D, U> MerklizedStateHeightPersistence for ExtensibleDataSource<D, U>
425where
426    D: MerklizedStateHeightPersistence + Sync,
427    U: Send + Sync,
428{
429    async fn get_last_state_height(&self) -> QueryResult<usize> {
430        self.data_source.get_last_state_height().await
431    }
432}
433
434#[async_trait]
435impl<D, U, Types, State, const ARITY: usize> UpdateStateData<Types, State, ARITY>
436    for ExtensibleDataSource<D, U>
437where
438    D: UpdateStateData<Types, State, ARITY> + Send + Sync,
439    U: Send + Sync,
440    State: MerklizedState<Types, ARITY>,
441    Types: NodeType,
442{
443    async fn set_last_state_height(&mut self, height: usize) -> anyhow::Result<()> {
444        self.data_source.set_last_state_height(height).await
445    }
446
447    async fn insert_merkle_nodes(
448        &mut self,
449        path: MerkleProof<State::Entry, State::Key, State::T, ARITY>,
450        traversal_path: Vec<usize>,
451        block_number: u64,
452    ) -> anyhow::Result<()> {
453        self.data_source
454            .insert_merkle_nodes(path, traversal_path, block_number)
455            .await
456    }
457
458    async fn insert_merkle_nodes_batch(
459        &mut self,
460        proofs: Vec<(
461            MerkleProof<State::Entry, State::Key, State::T, ARITY>,
462            Vec<usize>,
463        )>,
464        block_number: u64,
465    ) -> anyhow::Result<()> {
466        self.data_source
467            .insert_merkle_nodes_batch(proofs, block_number)
468            .await
469    }
470}
471
472#[async_trait]
473impl<D, U, Types> ExplorerDataSource<Types> for ExtensibleDataSource<D, U>
474where
475    D: ExplorerDataSource<Types> + Sync,
476    U: Send + Sync,
477    Types: NodeType,
478    Payload<Types>: QueryablePayload<Types>,
479    Header<Types>: ExplorerHeader<Types> + QueryableHeader<Types>,
480    Transaction<Types>: ExplorerTransaction<Types>,
481{
482    async fn get_block_detail(
483        &self,
484        request: explorer::query_data::BlockIdentifier<Types>,
485    ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
486    {
487        self.data_source.get_block_detail(request).await
488    }
489
490    async fn get_block_summaries(
491        &self,
492        request: explorer::query_data::GetBlockSummariesRequest<Types>,
493    ) -> Result<
494        Vec<explorer::query_data::BlockSummary<Types>>,
495        explorer::query_data::GetBlockSummariesError,
496    > {
497        self.data_source.get_block_summaries(request).await
498    }
499
500    async fn get_transaction_detail(
501        &self,
502        request: explorer::query_data::TransactionIdentifier<Types>,
503    ) -> Result<
504        explorer::query_data::TransactionDetailResponse<Types>,
505        explorer::query_data::GetTransactionDetailError,
506    > {
507        self.data_source.get_transaction_detail(request).await
508    }
509
510    async fn get_transaction_summaries(
511        &self,
512        request: explorer::query_data::GetTransactionSummariesRequest<Types>,
513    ) -> Result<
514        Vec<explorer::query_data::TransactionSummary<Types>>,
515        explorer::query_data::GetTransactionSummariesError,
516    > {
517        self.data_source.get_transaction_summaries(request).await
518    }
519
520    async fn get_explorer_summary(
521        &self,
522    ) -> Result<
523        explorer::query_data::ExplorerSummary<Types>,
524        explorer::query_data::GetExplorerSummaryError,
525    > {
526        self.data_source.get_explorer_summary().await
527    }
528
529    async fn get_search_results(
530        &self,
531        query: TaggedBase64,
532    ) -> Result<
533        explorer::query_data::SearchResult<Types>,
534        explorer::query_data::GetSearchResultsError,
535    > {
536        self.data_source.get_search_results(query).await
537    }
538}
539
540/// Where the user data type supports it, derive `EventsSource` for the extensible data
541/// source.
542#[async_trait]
543impl<D, U, Types> EventsSource<Types> for ExtensibleDataSource<D, U>
544where
545    U: EventsSource<Types> + Sync,
546    D: Send + Sync,
547    Types: NodeType,
548{
549    type EventStream = BoxStream<'static, Arc<Event<Types>>>;
550    type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<Types>>>;
551
552    async fn get_event_stream(&self, filter: Option<EventFilterSet<Types>>) -> Self::EventStream {
553        Box::pin(self.user_data.get_event_stream(filter).await)
554    }
555
556    async fn get_legacy_event_stream(
557        &self,
558        filter: Option<EventFilterSet<Types>>,
559    ) -> Self::LegacyEventStream {
560        Box::pin(self.user_data.get_legacy_event_stream(filter).await)
561    }
562
563    async fn get_startup_info(&self) -> StartupInfo<Types> {
564        self.user_data.get_startup_info().await
565    }
566}
567
568#[cfg(any(test, feature = "testing"))]
569mod impl_testable_data_source {
570    use hotshot::types::Event;
571
572    use super::*;
573    use crate::{
574        data_source::{UpdateDataSource, fetching::Builder},
575        testing::{
576            consensus::{DataSourceLifeCycle, TestableDataSource},
577            mocks::MockTypes,
578        },
579    };
580
581    #[async_trait]
582    impl<D, U> DataSourceLifeCycle for ExtensibleDataSource<D, U>
583    where
584        D: TestableDataSource + UpdateDataSource<MockTypes>,
585        U: Clone + Default + Send + Sync + 'static,
586    {
587        type Storage = D::Storage;
588        type S = D::S;
589        type P = D::P;
590
591        async fn create(node_id: usize) -> Self::Storage {
592            D::create(node_id).await
593        }
594
595        async fn build(
596            storage: &Self::Storage,
597            opt: impl Send
598            + FnOnce(
599                Builder<MockTypes, Self::S, Self::P>,
600            ) -> Builder<MockTypes, Self::S, Self::P>,
601        ) -> Self {
602            Self::new(D::build(storage, opt).await, Default::default())
603        }
604
605        async fn reset(storage: &Self::Storage) -> Self {
606            Self::new(D::reset(storage).await, Default::default())
607        }
608
609        async fn handle_event(&self, event: &Event<MockTypes>) {
610            let event = CoordinatorEvent::LegacyEvent(event.clone());
611            self.update(&event).await.unwrap();
612        }
613    }
614}
615
616#[cfg(test)]
617mod test {
618    use super::ExtensibleDataSource;
619    use crate::testing::consensus::MockDataSource;
620    // For some reason this is the only way to import the macro defined in another module of this
621    // crate.
622    use crate::*;
623
624    instantiate_data_source_tests!(ExtensibleDataSource<MockDataSource, ()>);
625}