1use std::{
14 ops::{Bound, RangeBounds},
15 sync::Arc,
16};
17
18use async_trait::async_trait;
19use futures::stream::BoxStream;
20use hotshot::types::Event;
21use hotshot_events_service::events_source::{EventFilterSet, EventsSource, StartupInfo};
22use hotshot_types::{
23 data::VidShare, event::LegacyEvent, new_protocol::CoordinatorEvent,
24 traits::node_implementation::NodeType,
25};
26use jf_merkle_tree_compat::prelude::MerkleProof;
27use tagged_base64::TaggedBase64;
28
29use super::VersionedDataSource;
30use crate::{
31 Header, Payload, QueryResult, Transaction,
32 availability::{
33 AvailabilityDataSource, BlockId, BlockInfo, BlockQueryData, BlockWithTransaction,
34 Certificate2, Fetch, FetchStream, LeafId, LeafQueryData, NamespaceId, PayloadMetadata,
35 PayloadQueryData, QueryableHeader, QueryablePayload, TransactionHash,
36 UpdateAvailabilityData, VidCommonMetadata, VidCommonQueryData,
37 },
38 data_source::storage::pruning::PrunedHeightDataSource,
39 explorer::{self, ExplorerDataSource, ExplorerHeader, ExplorerTransaction},
40 merklized_state::{
41 MerklizedState, MerklizedStateDataSource, MerklizedStateHeightPersistence, Snapshot,
42 UpdateStateData,
43 },
44 metrics::PrometheusMetrics,
45 node::{NodeDataSource, SyncStatusQueryData, TimeWindowQueryData, WindowStart},
46 status::{HasMetrics, StatusDataSource},
47};
48#[derive(Clone, Copy, Debug)]
85pub struct ExtensibleDataSource<D, U> {
86 data_source: D,
87 user_data: U,
88}
89
90impl<D, U> ExtensibleDataSource<D, U> {
91 pub fn new(data_source: D, user_data: U) -> Self {
92 Self {
93 data_source,
94 user_data,
95 }
96 }
97
98 pub fn inner(&self) -> &D {
104 &self.data_source
105 }
106
107 pub fn inner_mut(&mut self) -> &mut D {
113 &mut self.data_source
114 }
115}
116
117impl<D, U> AsRef<U> for ExtensibleDataSource<D, U> {
118 fn as_ref(&self) -> &U {
119 &self.user_data
120 }
121}
122
123impl<D, U> AsMut<U> for ExtensibleDataSource<D, U> {
124 fn as_mut(&mut self) -> &mut U {
125 &mut self.user_data
126 }
127}
128
129impl<D, U> VersionedDataSource for ExtensibleDataSource<D, U>
130where
131 D: VersionedDataSource + Send,
132 U: Send + Sync,
133{
134 type Transaction<'a>
135 = D::Transaction<'a>
136 where
137 Self: 'a;
138
139 type ReadOnly<'a>
140 = D::ReadOnly<'a>
141 where
142 Self: 'a;
143
144 async fn write(&self) -> anyhow::Result<Self::Transaction<'_>> {
145 self.data_source.write().await
146 }
147
148 async fn read(&self) -> anyhow::Result<Self::ReadOnly<'_>> {
149 self.data_source.read().await
150 }
151}
152
153#[async_trait]
154impl<D, U> PrunedHeightDataSource for ExtensibleDataSource<D, U>
155where
156 D: PrunedHeightDataSource + Send + Sync,
157 U: Send + Sync,
158{
159 async fn load_pruned_height(&self) -> anyhow::Result<Option<u64>> {
160 self.data_source.load_pruned_height().await
161 }
162}
163
164#[async_trait]
165impl<D, U, Types> AvailabilityDataSource<Types> for ExtensibleDataSource<D, U>
166where
167 D: AvailabilityDataSource<Types> + Send + Sync,
168 U: Send + Sync,
169 Types: NodeType,
170 Header<Types>: QueryableHeader<Types>,
171 Payload<Types>: QueryablePayload<Types>,
172{
173 async fn get_leaf<ID>(&self, id: ID) -> Fetch<LeafQueryData<Types>>
174 where
175 ID: Into<LeafId<Types>> + Send + Sync,
176 {
177 self.data_source.get_leaf(id).await
178 }
179
180 async fn get_header<ID>(&self, id: ID) -> Fetch<Header<Types>>
181 where
182 ID: Into<BlockId<Types>> + Send + Sync,
183 {
184 self.data_source.get_header(id).await
185 }
186
187 async fn get_block<ID>(&self, id: ID) -> Fetch<BlockQueryData<Types>>
188 where
189 ID: Into<BlockId<Types>> + Send + Sync,
190 {
191 self.data_source.get_block(id).await
192 }
193 async fn get_payload<ID>(&self, id: ID) -> Fetch<PayloadQueryData<Types>>
194 where
195 ID: Into<BlockId<Types>> + Send + Sync,
196 {
197 self.data_source.get_payload(id).await
198 }
199 async fn get_payload_metadata<ID>(&self, id: ID) -> Fetch<PayloadMetadata<Types>>
200 where
201 ID: Into<BlockId<Types>> + Send + Sync,
202 {
203 self.data_source.get_payload_metadata(id).await
204 }
205 async fn get_vid_common<ID>(&self, id: ID) -> Fetch<VidCommonQueryData<Types>>
206 where
207 ID: Into<BlockId<Types>> + Send + Sync,
208 {
209 self.data_source.get_vid_common(id).await
210 }
211 async fn get_vid_common_metadata<ID>(&self, id: ID) -> Fetch<VidCommonMetadata<Types>>
212 where
213 ID: Into<BlockId<Types>> + Send + Sync,
214 {
215 self.data_source.get_vid_common_metadata(id).await
216 }
217 async fn get_leaf_range<R>(&self, range: R) -> FetchStream<LeafQueryData<Types>>
218 where
219 R: RangeBounds<usize> + Send + 'static,
220 {
221 self.data_source.get_leaf_range(range).await
222 }
223 async fn get_block_range<R>(&self, range: R) -> FetchStream<BlockQueryData<Types>>
224 where
225 R: RangeBounds<usize> + Send + 'static,
226 {
227 self.data_source.get_block_range(range).await
228 }
229
230 async fn get_header_range<R>(&self, range: R) -> FetchStream<Header<Types>>
231 where
232 R: RangeBounds<usize> + Send + 'static,
233 {
234 self.data_source.get_header_range(range).await
235 }
236 async fn get_payload_range<R>(&self, range: R) -> FetchStream<PayloadQueryData<Types>>
237 where
238 R: RangeBounds<usize> + Send + 'static,
239 {
240 self.data_source.get_payload_range(range).await
241 }
242 async fn get_payload_metadata_range<R>(&self, range: R) -> FetchStream<PayloadMetadata<Types>>
243 where
244 R: RangeBounds<usize> + Send + 'static,
245 {
246 self.data_source.get_payload_metadata_range(range).await
247 }
248 async fn get_vid_common_range<R>(&self, range: R) -> FetchStream<VidCommonQueryData<Types>>
249 where
250 R: RangeBounds<usize> + Send + 'static,
251 {
252 self.data_source.get_vid_common_range(range).await
253 }
254 async fn get_vid_common_metadata_range<R>(
255 &self,
256 range: R,
257 ) -> FetchStream<VidCommonMetadata<Types>>
258 where
259 R: RangeBounds<usize> + Send + 'static,
260 {
261 self.data_source.get_vid_common_metadata_range(range).await
262 }
263
264 async fn get_leaf_range_rev(
265 &self,
266 start: Bound<usize>,
267 end: usize,
268 ) -> FetchStream<LeafQueryData<Types>> {
269 self.data_source.get_leaf_range_rev(start, end).await
270 }
271 async fn get_block_range_rev(
272 &self,
273 start: Bound<usize>,
274 end: usize,
275 ) -> FetchStream<BlockQueryData<Types>> {
276 self.data_source.get_block_range_rev(start, end).await
277 }
278 async fn get_payload_range_rev(
279 &self,
280 start: Bound<usize>,
281 end: usize,
282 ) -> FetchStream<PayloadQueryData<Types>> {
283 self.data_source.get_payload_range_rev(start, end).await
284 }
285 async fn get_payload_metadata_range_rev(
286 &self,
287 start: Bound<usize>,
288 end: usize,
289 ) -> FetchStream<PayloadMetadata<Types>> {
290 self.data_source
291 .get_payload_metadata_range_rev(start, end)
292 .await
293 }
294 async fn get_vid_common_range_rev(
295 &self,
296 start: Bound<usize>,
297 end: usize,
298 ) -> FetchStream<VidCommonQueryData<Types>> {
299 self.data_source.get_vid_common_range_rev(start, end).await
300 }
301 async fn get_vid_common_metadata_range_rev(
302 &self,
303 start: Bound<usize>,
304 end: usize,
305 ) -> FetchStream<VidCommonMetadata<Types>> {
306 self.data_source
307 .get_vid_common_metadata_range_rev(start, end)
308 .await
309 }
310 async fn get_block_containing_transaction(
311 &self,
312 h: TransactionHash<Types>,
313 ) -> Fetch<BlockWithTransaction<Types>> {
314 self.data_source.get_block_containing_transaction(h).await
315 }
316
317 async fn get_cert2(&self, height: u64) -> QueryResult<Option<Certificate2<Types>>> {
318 self.data_source.get_cert2(height).await
319 }
320}
321
322impl<D, U, Types> UpdateAvailabilityData<Types> for ExtensibleDataSource<D, U>
323where
324 D: UpdateAvailabilityData<Types> + Send + Sync,
325 U: Send + Sync,
326 Types: NodeType,
327{
328 async fn append(&self, info: BlockInfo<Types>) -> anyhow::Result<()> {
329 self.data_source.append(info).await
330 }
331
332 async fn append_payload(&self, block: BlockQueryData<Types>) -> anyhow::Result<()> {
333 self.data_source.append_payload(block).await
334 }
335}
336
337#[async_trait]
338impl<D, U, Types> NodeDataSource<Types> for ExtensibleDataSource<D, U>
339where
340 D: NodeDataSource<Types> + Send + Sync,
341 U: Send + Sync,
342 Types: NodeType,
343 Header<Types>: QueryableHeader<Types>,
344{
345 async fn block_height(&self) -> QueryResult<usize> {
346 self.data_source.block_height().await
347 }
348 async fn count_transactions_in_range(
349 &self,
350 range: impl RangeBounds<usize> + Send,
351 namespace: Option<NamespaceId<Types>>,
352 ) -> QueryResult<usize> {
353 self.data_source
354 .count_transactions_in_range(range, namespace)
355 .await
356 }
357 async fn payload_size_in_range(
358 &self,
359 range: impl RangeBounds<usize> + Send,
360 namespace: Option<NamespaceId<Types>>,
361 ) -> QueryResult<usize> {
362 self.data_source
363 .payload_size_in_range(range, namespace)
364 .await
365 }
366 async fn vid_share<ID>(&self, id: ID) -> QueryResult<VidShare>
367 where
368 ID: Into<BlockId<Types>> + Send + Sync,
369 {
370 self.data_source.vid_share(id).await
371 }
372 async fn sync_status(&self) -> QueryResult<SyncStatusQueryData> {
373 self.data_source.sync_status().await
374 }
375 async fn get_header_window(
376 &self,
377 start: impl Into<WindowStart<Types>> + Send + Sync,
378 end: u64,
379 limit: usize,
380 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
381 self.data_source.get_header_window(start, end, limit).await
382 }
383}
384
385impl<D, U> HasMetrics for ExtensibleDataSource<D, U>
386where
387 D: HasMetrics,
388{
389 fn metrics(&self) -> &PrometheusMetrics {
390 self.data_source.metrics()
391 }
392}
393
394#[async_trait]
395impl<D, U> StatusDataSource for ExtensibleDataSource<D, U>
396where
397 D: StatusDataSource + Send + Sync,
398 U: Send + Sync,
399{
400 async fn block_height(&self) -> QueryResult<usize> {
401 self.data_source.block_height().await
402 }
403}
404
405#[async_trait]
406impl<D, U, Types, State, const ARITY: usize> MerklizedStateDataSource<Types, State, ARITY>
407 for ExtensibleDataSource<D, U>
408where
409 D: MerklizedStateDataSource<Types, State, ARITY> + Sync,
410 U: Send + Sync,
411 Types: NodeType,
412 State: MerklizedState<Types, ARITY>,
413{
414 async fn get_path(
415 &self,
416 snapshot: Snapshot<Types, State, ARITY>,
417 key: State::Key,
418 ) -> QueryResult<MerkleProof<State::Entry, State::Key, State::T, ARITY>> {
419 self.data_source.get_path(snapshot, key).await
420 }
421}
422
423#[async_trait]
424impl<D, U> MerklizedStateHeightPersistence for ExtensibleDataSource<D, U>
425where
426 D: MerklizedStateHeightPersistence + Sync,
427 U: Send + Sync,
428{
429 async fn get_last_state_height(&self) -> QueryResult<usize> {
430 self.data_source.get_last_state_height().await
431 }
432}
433
434#[async_trait]
435impl<D, U, Types, State, const ARITY: usize> UpdateStateData<Types, State, ARITY>
436 for ExtensibleDataSource<D, U>
437where
438 D: UpdateStateData<Types, State, ARITY> + Send + Sync,
439 U: Send + Sync,
440 State: MerklizedState<Types, ARITY>,
441 Types: NodeType,
442{
443 async fn set_last_state_height(&mut self, height: usize) -> anyhow::Result<()> {
444 self.data_source.set_last_state_height(height).await
445 }
446
447 async fn insert_merkle_nodes(
448 &mut self,
449 path: MerkleProof<State::Entry, State::Key, State::T, ARITY>,
450 traversal_path: Vec<usize>,
451 block_number: u64,
452 ) -> anyhow::Result<()> {
453 self.data_source
454 .insert_merkle_nodes(path, traversal_path, block_number)
455 .await
456 }
457
458 async fn insert_merkle_nodes_batch(
459 &mut self,
460 proofs: Vec<(
461 MerkleProof<State::Entry, State::Key, State::T, ARITY>,
462 Vec<usize>,
463 )>,
464 block_number: u64,
465 ) -> anyhow::Result<()> {
466 self.data_source
467 .insert_merkle_nodes_batch(proofs, block_number)
468 .await
469 }
470}
471
472#[async_trait]
473impl<D, U, Types> ExplorerDataSource<Types> for ExtensibleDataSource<D, U>
474where
475 D: ExplorerDataSource<Types> + Sync,
476 U: Send + Sync,
477 Types: NodeType,
478 Payload<Types>: QueryablePayload<Types>,
479 Header<Types>: ExplorerHeader<Types> + QueryableHeader<Types>,
480 Transaction<Types>: ExplorerTransaction<Types>,
481{
482 async fn get_block_detail(
483 &self,
484 request: explorer::query_data::BlockIdentifier<Types>,
485 ) -> Result<explorer::query_data::BlockDetail<Types>, explorer::query_data::GetBlockDetailError>
486 {
487 self.data_source.get_block_detail(request).await
488 }
489
490 async fn get_block_summaries(
491 &self,
492 request: explorer::query_data::GetBlockSummariesRequest<Types>,
493 ) -> Result<
494 Vec<explorer::query_data::BlockSummary<Types>>,
495 explorer::query_data::GetBlockSummariesError,
496 > {
497 self.data_source.get_block_summaries(request).await
498 }
499
500 async fn get_transaction_detail(
501 &self,
502 request: explorer::query_data::TransactionIdentifier<Types>,
503 ) -> Result<
504 explorer::query_data::TransactionDetailResponse<Types>,
505 explorer::query_data::GetTransactionDetailError,
506 > {
507 self.data_source.get_transaction_detail(request).await
508 }
509
510 async fn get_transaction_summaries(
511 &self,
512 request: explorer::query_data::GetTransactionSummariesRequest<Types>,
513 ) -> Result<
514 Vec<explorer::query_data::TransactionSummary<Types>>,
515 explorer::query_data::GetTransactionSummariesError,
516 > {
517 self.data_source.get_transaction_summaries(request).await
518 }
519
520 async fn get_explorer_summary(
521 &self,
522 ) -> Result<
523 explorer::query_data::ExplorerSummary<Types>,
524 explorer::query_data::GetExplorerSummaryError,
525 > {
526 self.data_source.get_explorer_summary().await
527 }
528
529 async fn get_search_results(
530 &self,
531 query: TaggedBase64,
532 ) -> Result<
533 explorer::query_data::SearchResult<Types>,
534 explorer::query_data::GetSearchResultsError,
535 > {
536 self.data_source.get_search_results(query).await
537 }
538}
539
540#[async_trait]
543impl<D, U, Types> EventsSource<Types> for ExtensibleDataSource<D, U>
544where
545 U: EventsSource<Types> + Sync,
546 D: Send + Sync,
547 Types: NodeType,
548{
549 type EventStream = BoxStream<'static, Arc<Event<Types>>>;
550 type LegacyEventStream = BoxStream<'static, Arc<LegacyEvent<Types>>>;
551
552 async fn get_event_stream(&self, filter: Option<EventFilterSet<Types>>) -> Self::EventStream {
553 Box::pin(self.user_data.get_event_stream(filter).await)
554 }
555
556 async fn get_legacy_event_stream(
557 &self,
558 filter: Option<EventFilterSet<Types>>,
559 ) -> Self::LegacyEventStream {
560 Box::pin(self.user_data.get_legacy_event_stream(filter).await)
561 }
562
563 async fn get_startup_info(&self) -> StartupInfo<Types> {
564 self.user_data.get_startup_info().await
565 }
566}
567
568#[cfg(any(test, feature = "testing"))]
569mod impl_testable_data_source {
570 use hotshot::types::Event;
571
572 use super::*;
573 use crate::{
574 data_source::{UpdateDataSource, fetching::Builder},
575 testing::{
576 consensus::{DataSourceLifeCycle, TestableDataSource},
577 mocks::MockTypes,
578 },
579 };
580
581 #[async_trait]
582 impl<D, U> DataSourceLifeCycle for ExtensibleDataSource<D, U>
583 where
584 D: TestableDataSource + UpdateDataSource<MockTypes>,
585 U: Clone + Default + Send + Sync + 'static,
586 {
587 type Storage = D::Storage;
588 type S = D::S;
589 type P = D::P;
590
591 async fn create(node_id: usize) -> Self::Storage {
592 D::create(node_id).await
593 }
594
595 async fn build(
596 storage: &Self::Storage,
597 opt: impl Send
598 + FnOnce(
599 Builder<MockTypes, Self::S, Self::P>,
600 ) -> Builder<MockTypes, Self::S, Self::P>,
601 ) -> Self {
602 Self::new(D::build(storage, opt).await, Default::default())
603 }
604
605 async fn reset(storage: &Self::Storage) -> Self {
606 Self::new(D::reset(storage).await, Default::default())
607 }
608
609 async fn handle_event(&self, event: &Event<MockTypes>) {
610 let event = CoordinatorEvent::LegacyEvent(event.clone());
611 self.update(&event).await.unwrap();
612 }
613 }
614}
615
616#[cfg(test)]
617mod test {
618 use super::ExtensibleDataSource;
619 use crate::testing::consensus::MockDataSource;
620 use crate::*;
623
624 instantiate_data_source_tests!(ExtensibleDataSource<MockDataSource, ()>);
625}