1use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use async_trait::async_trait;
18use derivative::Derivative;
19use futures::future::{BoxFuture, FutureExt, join_all};
20use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::NodeType};
21
22use super::{
23 AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
24 Storable,
25 header::{HeaderCallback, fetch_header_and_then},
26};
27use crate::{
28 Header, Payload, QueryError, QueryResult,
29 availability::{
30 BlockId, BlockQueryData, PayloadMetadata, PayloadQueryData, QueryableHeader,
31 QueryablePayload,
32 },
33 data_source::{
34 VersionedDataSource,
35 fetching::{header::fetch_header_range_and_then, leaf::RangeRequest},
36 storage::{
37 AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
38 pruning::PrunedHeightStorage,
39 },
40 },
41 fetching::{
42 self, Callback, NonEmptyRange,
43 request::{self, BlockRangeRequest, PayloadRequest},
44 },
45 types::HeightIndexed,
46};
47pub(super) type PayloadFetcher<Types, S, P> =
48 fetching::Fetcher<request::PayloadRequest, PayloadCallback<Types, S, P>>;
49
50pub(super) type PayloadRangeFetcher<Types, S, P> =
51 fetching::Fetcher<request::BlockRangeRequest, BlockRangeCallback<Types, S, P>>;
52
53impl<Types> FetchRequest for BlockId<Types>
54where
55 Types: NodeType,
56{
57 fn might_exist(self, heights: Heights) -> bool {
58 if let BlockId::Number(n) = self {
59 heights.might_exist(n as u64)
60 } else {
61 true
62 }
63 }
64}
65
66#[async_trait]
67impl<Types> Fetchable<Types> for BlockQueryData<Types>
68where
69 Types: NodeType,
70 Header<Types>: QueryableHeader<Types>,
71 Payload<Types>: QueryablePayload<Types>,
72{
73 type Request = BlockId<Types>;
74
75 fn satisfies(&self, req: Self::Request) -> bool {
76 match req {
77 BlockId::Number(n) => self.height() == n as u64,
78 BlockId::Hash(h) => self.hash() == h,
79 BlockId::PayloadHash(h) => self.payload_hash() == h,
80 }
81 }
82
83 async fn passive_fetch(
84 notifiers: &Notifiers<Types>,
85 req: Self::Request,
86 ) -> BoxFuture<'static, Option<Self>> {
87 notifiers
88 .block
89 .wait_for(move |block| block.satisfies(req))
90 .await
91 .into_future()
92 .boxed()
93 }
94
95 async fn active_fetch<S, P>(
96 tx: &mut impl AvailabilityStorage<Types>,
97 fetcher: Arc<Fetcher<Types, S, P>>,
98 req: Self::Request,
99 ) -> anyhow::Result<()>
100 where
101 S: VersionedDataSource + 'static,
102 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
103 for<'a> S::ReadOnly<'a>:
104 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
105 P: AvailabilityProvider<Types>,
106 {
107 fetch_header_and_then(
108 tx,
109 req,
110 HeaderCallback::Payload {
111 fetcher: fetcher.clone(),
112 },
113 )
114 .await
115 }
116
117 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
118 where
119 S: AvailabilityStorage<Types>,
120 {
121 storage.get_block(req).await
122 }
123}
124
125#[async_trait]
126impl<Types> RangedFetchable<Types> for BlockQueryData<Types>
127where
128 Types: NodeType,
129 Header<Types>: QueryableHeader<Types>,
130 Payload<Types>: QueryablePayload<Types>,
131{
132 type RangedRequest = BlockId<Types>;
133
134 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
135 where
136 S: AvailabilityStorage<Types>,
137 R: RangeBounds<usize> + Send + 'static,
138 {
139 storage.get_block_range(range).await
140 }
141}
142
143impl<Types> Storable<Types> for BlockQueryData<Types>
144where
145 Types: NodeType,
146{
147 fn debug_name(&self) -> String {
148 format!("block {}", self.height())
149 }
150
151 async fn notify(&self, notifiers: &Notifiers<Types>) {
152 notifiers.block.notify(self).await;
153 }
154
155 async fn store(
156 &self,
157 storage: &mut impl UpdateAvailabilityStorage<Types>,
158 leaf_only: bool,
159 ) -> anyhow::Result<()> {
160 if leaf_only {
161 return Ok(());
162 }
163
164 storage.insert_block(self).await
165 }
166}
167
168pub(super) fn fetch_block_with_header<Types, S, P>(
169 fetcher: Arc<Fetcher<Types, S, P>>,
170 header: Header<Types>,
171) where
172 Types: NodeType,
173 Header<Types>: QueryableHeader<Types>,
174 Payload<Types>: QueryablePayload<Types>,
175 S: VersionedDataSource + 'static,
176 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
177 P: AvailabilityProvider<Types>,
178{
179 let Some(payload_fetcher) = fetcher.payload_fetcher.as_ref() else {
180 return;
182 };
183
184 tracing::info!(
186 "spawned active fetch for payload {:?} (height {})",
187 header.payload_commitment(),
188 header.block_number()
189 );
190 payload_fetcher.spawn_fetch(
191 PayloadRequest(header.payload_commitment()),
192 fetcher.provider.clone(),
193 once(PayloadCallback {
194 header,
195 fetcher: fetcher.clone(),
196 }),
197 );
198}
199
200#[async_trait]
201impl<Types> Fetchable<Types> for PayloadQueryData<Types>
202where
203 Types: NodeType,
204 Header<Types>: QueryableHeader<Types>,
205 Payload<Types>: QueryablePayload<Types>,
206{
207 type Request = BlockId<Types>;
208
209 fn satisfies(&self, req: Self::Request) -> bool {
210 match req {
211 BlockId::Number(n) => self.height() == n as u64,
212 BlockId::Hash(h) => self.block_hash() == h,
213 BlockId::PayloadHash(h) => self.hash() == h,
214 }
215 }
216
217 async fn passive_fetch(
218 notifiers: &Notifiers<Types>,
219 req: Self::Request,
220 ) -> BoxFuture<'static, Option<Self>> {
221 notifiers
222 .block
223 .wait_for(move |block| block.satisfies(req))
224 .await
225 .into_future()
226 .map(|block| block.map(PayloadQueryData::from))
227 .boxed()
228 }
229
230 async fn active_fetch<S, P>(
231 tx: &mut impl AvailabilityStorage<Types>,
232 fetcher: Arc<Fetcher<Types, S, P>>,
233 req: Self::Request,
234 ) -> anyhow::Result<()>
235 where
236 S: VersionedDataSource + 'static,
237 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
238 for<'a> S::ReadOnly<'a>:
239 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
240 P: AvailabilityProvider<Types>,
241 {
242 BlockQueryData::active_fetch(tx, fetcher, req).await
246 }
247
248 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
249 where
250 S: AvailabilityStorage<Types>,
251 {
252 storage.get_payload(req).await
253 }
254}
255
256#[async_trait]
257impl<Types> RangedFetchable<Types> for PayloadQueryData<Types>
258where
259 Types: NodeType,
260 Header<Types>: QueryableHeader<Types>,
261 Payload<Types>: QueryablePayload<Types>,
262{
263 type RangedRequest = BlockId<Types>;
264
265 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
266 where
267 S: AvailabilityStorage<Types>,
268 R: RangeBounds<usize> + Send + 'static,
269 {
270 storage.get_payload_range(range).await
271 }
272}
273
274#[derive(Derivative)]
275#[derivative(Debug(bound = ""))]
276pub(super) struct PayloadCallback<Types: NodeType, S, P> {
277 header: Header<Types>,
278 #[derivative(Debug = "ignore")]
279 fetcher: Arc<Fetcher<Types, S, P>>,
280}
281
282impl<Types: NodeType, S, P> PartialEq for PayloadCallback<Types, S, P> {
283 fn eq(&self, other: &Self) -> bool {
284 self.cmp(other).is_eq()
285 }
286}
287
288impl<Types: NodeType, S, P> Eq for PayloadCallback<Types, S, P> {}
289
290impl<Types: NodeType, S, P> Ord for PayloadCallback<Types, S, P> {
291 fn cmp(&self, other: &Self) -> Ordering {
292 self.header.block_number().cmp(&other.header.block_number())
293 }
294}
295
296impl<Types: NodeType, S, P> PartialOrd for PayloadCallback<Types, S, P> {
297 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
298 Some(self.cmp(other))
299 }
300}
301
302impl<Types: NodeType, S, P> Callback<Payload<Types>> for PayloadCallback<Types, S, P>
303where
304 Header<Types>: QueryableHeader<Types>,
305 Payload<Types>: QueryablePayload<Types>,
306 S: 'static + VersionedDataSource,
307 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
308 P: AvailabilityProvider<Types>,
309{
310 async fn run(self, payload: Payload<Types>) {
311 tracing::info!("fetched payload {:?}", self.header.payload_commitment());
312 let block = BlockQueryData::new(self.header, payload);
313 self.fetcher.store_and_notify(&block).await;
314 }
315}
316
317#[async_trait]
318impl<Types> Fetchable<Types> for PayloadMetadata<Types>
319where
320 Types: NodeType,
321 Header<Types>: QueryableHeader<Types>,
322 Payload<Types>: QueryablePayload<Types>,
323{
324 type Request = BlockId<Types>;
325
326 fn satisfies(&self, req: Self::Request) -> bool {
327 match req {
328 BlockId::Number(n) => self.height == n as u64,
329 BlockId::Hash(h) => self.block_hash == h,
330 BlockId::PayloadHash(h) => self.hash == h,
331 }
332 }
333
334 async fn passive_fetch(
335 notifiers: &Notifiers<Types>,
336 req: Self::Request,
337 ) -> BoxFuture<'static, Option<Self>> {
338 notifiers
339 .block
340 .wait_for(move |block| block.satisfies(req))
341 .await
342 .into_future()
343 .map(|opt| opt.map(Self::from))
344 .boxed()
345 }
346
347 async fn active_fetch<S, P>(
348 tx: &mut impl AvailabilityStorage<Types>,
349 fetcher: Arc<Fetcher<Types, S, P>>,
350 req: Self::Request,
351 ) -> anyhow::Result<()>
352 where
353 S: VersionedDataSource + 'static,
354 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
355 for<'a> S::ReadOnly<'a>:
356 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
357 P: AvailabilityProvider<Types>,
358 {
359 BlockQueryData::active_fetch(tx, fetcher, req).await
362 }
363
364 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
365 where
366 S: AvailabilityStorage<Types>,
367 {
368 storage.get_payload_metadata(req).await
369 }
370}
371
372#[async_trait]
373impl<Types> RangedFetchable<Types> for PayloadMetadata<Types>
374where
375 Types: NodeType,
376 Header<Types>: QueryableHeader<Types>,
377 Payload<Types>: QueryablePayload<Types>,
378{
379 type RangedRequest = BlockId<Types>;
380
381 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
382 where
383 S: AvailabilityStorage<Types>,
384 R: RangeBounds<usize> + Send + 'static,
385 {
386 storage.get_payload_metadata_range(range).await
387 }
388}
389
390#[async_trait]
391impl<Types> Fetchable<Types> for NonEmptyRange<BlockQueryData<Types>>
392where
393 Types: NodeType,
394 Header<Types>: QueryableHeader<Types>,
395 Payload<Types>: QueryablePayload<Types>,
396{
397 type Request = RangeRequest;
398
399 fn satisfies(&self, req: Self::Request) -> bool {
400 req.is_satisfied(self)
401 }
402
403 async fn passive_fetch(
404 notifiers: &Notifiers<Types>,
405 req: Self::Request,
406 ) -> BoxFuture<'static, Option<Self>> {
407 let waits = join_all(req.into_iter().map(|i| {
408 notifiers
409 .block
410 .wait_for(move |block| block.satisfies(BlockId::Number(i as usize)))
411 }))
412 .await;
413
414 join_all(waits.into_iter().map(|wait| wait.into_future()))
415 .map(|options| NonEmptyRange::new(options.into_iter().flatten()).ok())
416 .boxed()
417 }
418
419 async fn active_fetch<S, P>(
420 tx: &mut impl AvailabilityStorage<Types>,
421 fetcher: Arc<Fetcher<Types, S, P>>,
422 req: Self::Request,
423 ) -> anyhow::Result<()>
424 where
425 S: VersionedDataSource + 'static,
426 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
427 for<'a> S::ReadOnly<'a>:
428 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
429 P: AvailabilityProvider<Types>,
430 {
431 fetch_header_range_and_then(tx, req, HeaderCallback::Payload { fetcher }).await
432 }
433
434 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
435 where
436 S: AvailabilityStorage<Types>,
437 {
438 let blocks = storage
439 .get_block_range((req.start as usize)..(req.end as usize))
440 .await?
441 .into_iter()
442 .collect::<QueryResult<Vec<_>>>()?;
443 if blocks.len() != req.len() {
444 tracing::debug!(
445 ?req,
446 len = blocks.len(),
447 "database returned partial result, unable to load full range"
448 );
449 return Err(QueryError::Missing);
450 }
451 NonEmptyRange::new(blocks).map_err(|err| QueryError::Error {
452 message: format!("expected contiguous range, but: {err:#}"),
453 })
454 }
455}
456
457impl<Types> Storable<Types> for NonEmptyRange<BlockQueryData<Types>>
458where
459 Types: NodeType,
460{
461 fn debug_name(&self) -> String {
462 format!("block range {}..{}", self.start(), self.end())
463 }
464
465 async fn notify(&self, notifiers: &Notifiers<Types>) {
466 for block in self {
467 notifiers.block.notify(block).await;
468 }
469 }
470
471 async fn store(
472 &self,
473 storage: &mut impl UpdateAvailabilityStorage<Types>,
474 leaf_only: bool,
475 ) -> anyhow::Result<()> {
476 if leaf_only {
477 return Ok(());
478 }
479
480 storage.insert_block_range(self).await
481 }
482}
483
484pub(super) fn fetch_block_range_with_headers<Types, S, P>(
485 fetcher: Arc<Fetcher<Types, S, P>>,
486 headers: NonEmptyRange<Header<Types>>,
487) where
488 Types: NodeType,
489 Header<Types>: QueryableHeader<Types>,
490 Payload<Types>: QueryablePayload<Types>,
491 S: VersionedDataSource + 'static,
492 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
493 P: AvailabilityProvider<Types>,
494{
495 let Some(payload_range_fetcher) = fetcher.payload_range_fetcher.as_ref() else {
496 return;
498 };
499
500 tracing::info!(
502 "spawned active fetch for payload range {}..{}",
503 headers.start(),
504 headers.end()
505 );
506 payload_range_fetcher.spawn_fetch(
507 BlockRangeRequest::from_headers(&headers),
508 fetcher.provider.clone(),
509 once(BlockRangeCallback {
510 fetcher: fetcher.clone(),
511 }),
512 );
513}
514
515#[derive(Derivative)]
516#[derivative(Debug(bound = ""))]
517pub(super) struct BlockRangeCallback<Types: NodeType, S, P> {
518 #[derivative(Debug = "ignore")]
519 fetcher: Arc<Fetcher<Types, S, P>>,
520}
521
522impl<Types: NodeType, S, P> PartialEq for BlockRangeCallback<Types, S, P> {
523 fn eq(&self, other: &Self) -> bool {
524 self.cmp(other).is_eq()
525 }
526}
527
528impl<Types: NodeType, S, P> Eq for BlockRangeCallback<Types, S, P> {}
529
530impl<Types: NodeType, S, P> Ord for BlockRangeCallback<Types, S, P> {
531 fn cmp(&self, _: &Self) -> Ordering {
532 Ordering::Equal
534 }
535}
536
537impl<Types: NodeType, S, P> PartialOrd for BlockRangeCallback<Types, S, P> {
538 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
539 Some(self.cmp(other))
540 }
541}
542
543impl<Types: NodeType, S, P> Callback<NonEmptyRange<BlockQueryData<Types>>>
544 for BlockRangeCallback<Types, S, P>
545where
546 Header<Types>: QueryableHeader<Types>,
547 Payload<Types>: QueryablePayload<Types>,
548 S: 'static + VersionedDataSource,
549 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
550 P: AvailabilityProvider<Types>,
551{
552 async fn run(self, blocks: NonEmptyRange<BlockQueryData<Types>>) {
553 tracing::info!("fetched blocks {}..{}", blocks.start(), blocks.end());
554 self.fetcher.store_and_notify(&blocks).await;
555 }
556}