1use std::{
16 cmp::Ordering,
17 fmt::Debug,
18 future::IntoFuture,
19 iter::once,
20 ops::{Range, RangeBounds},
21 sync::Arc,
22};
23
24use anyhow::bail;
25use async_trait::async_trait;
26use committable::Committable;
27use derivative::Derivative;
28use derive_more::From;
29use futures::future::{BoxFuture, FutureExt, join_all};
30use hotshot_types::traits::node_implementation::NodeType;
31use tokio::spawn;
32use tracing::Instrument;
33
34use super::{
35 AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
36 Storable, header::HeaderCallback,
37};
38use crate::{
39 Header, Payload, QueryError, QueryResult,
40 availability::{LeafId, LeafQueryData, QueryableHeader, QueryablePayload},
41 data_source::{
42 VersionedDataSource,
43 storage::{
44 AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
45 pruning::PrunedHeightStorage,
46 },
47 },
48 fetching::{
49 self, Callback, NonEmptyRange,
50 request::{self, LeafRangeRequest},
51 },
52 types::HeightIndexed,
53};
54
55pub(super) type LeafFetcher<Types, S, P> =
56 fetching::Fetcher<request::LeafRequest<Types>, LeafCallback<Types, S, P>>;
57
58pub(super) type LeafRangeFetcher<Types, S, P> =
59 fetching::Fetcher<LeafRangeRequest<Types>, LeafCallback<Types, S, P>>;
60
61impl<Types> FetchRequest for LeafId<Types>
62where
63 Types: NodeType,
64{
65 fn might_exist(self, heights: Heights) -> bool {
66 if let LeafId::Number(n) = self {
67 heights.might_exist(n as u64)
68 } else {
69 true
70 }
71 }
72}
73
74#[async_trait]
75impl<Types> Fetchable<Types> for LeafQueryData<Types>
76where
77 Types: NodeType,
78 Header<Types>: QueryableHeader<Types>,
79 Payload<Types>: QueryablePayload<Types>,
80{
81 type Request = LeafId<Types>;
82
83 fn satisfies(&self, req: Self::Request) -> bool {
84 match req {
85 LeafId::Number(n) => self.height() == n as u64,
86 LeafId::Hash(h) => self.hash() == h,
87 }
88 }
89
90 async fn passive_fetch(
91 notifiers: &Notifiers<Types>,
92 req: Self::Request,
93 ) -> BoxFuture<'static, Option<Self>> {
94 notifiers
95 .leaf
96 .wait_for(move |leaf| leaf.satisfies(req))
97 .await
98 .into_future()
99 .boxed()
100 }
101
102 async fn active_fetch<S, P>(
103 tx: &mut impl AvailabilityStorage<Types>,
104 fetcher: Arc<Fetcher<Types, S, P>>,
105 req: Self::Request,
106 ) -> anyhow::Result<()>
107 where
108 S: VersionedDataSource + 'static,
109 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
110 for<'a> S::ReadOnly<'a>:
111 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
112 P: AvailabilityProvider<Types>,
113 {
114 fetch_leaf_with_callbacks(tx, fetcher, req, None).await
115 }
116
117 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
118 where
119 S: AvailabilityStorage<Types>,
120 {
121 storage.get_leaf(req).await
122 }
123}
124
125pub(super) async fn fetch_leaf_with_callbacks<Types, S, P, I>(
126 tx: &mut impl AvailabilityStorage<Types>,
127 fetcher: Arc<Fetcher<Types, S, P>>,
128 req: LeafId<Types>,
129 callbacks: I,
130) -> anyhow::Result<()>
131where
132 Types: NodeType,
133 Header<Types>: QueryableHeader<Types>,
134 Payload<Types>: QueryablePayload<Types>,
135 S: VersionedDataSource + 'static,
136 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
137 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
138 P: AvailabilityProvider<Types>,
139 I: IntoIterator<Item = LeafCallback<Types, S, P>> + Send + 'static,
140 I::IntoIter: Send,
141{
142 match req {
143 LeafId::Number(n) => {
144 let next = (n + 1) as u64;
147 let next = match tx.first_available_leaf(next).await {
148 Ok(leaf) if leaf.height() == next => leaf,
149 Ok(leaf) => {
150 tracing::debug!(
155 n,
156 fetching = leaf.height() - 1,
157 "do not have necessary leaf; trigger fetch of a later leaf"
158 );
159
160 let mut callbacks = vec![LeafCallback::Leaf {
161 fetcher: fetcher.clone(),
162 }];
163
164 if !fetcher.leaf_only {
165 callbacks.push(
166 HeaderCallback::Payload {
167 fetcher: fetcher.clone(),
168 }
169 .into(),
170 );
171 callbacks.push(
172 HeaderCallback::VidCommon {
173 fetcher: fetcher.clone(),
174 }
175 .into(),
176 );
177 }
178
179 fetcher.leaf_fetcher.clone().spawn_fetch(
180 request::LeafRequest::new(
181 leaf.height() - 1,
182 leaf.leaf().parent_commitment(),
183 leaf.leaf().justify_qc().commit(),
184 ),
185 fetcher.provider.clone(),
186 callbacks,
189 );
190 return Ok(());
191 },
192 Err(QueryError::Missing | QueryError::NotFound) => {
193 tracing::debug!(n, "not fetching leaf with unknown successor");
197 return Ok(());
198 },
199 Err(QueryError::Error { message }) => {
200 bail!("failed to fetch successor for leaf {n}: {message}");
203 },
204 };
205
206 let fetcher = fetcher.clone();
207 fetcher.leaf_fetcher.clone().spawn_fetch(
208 request::LeafRequest::new(
209 n as u64,
210 next.leaf().parent_commitment(),
211 next.leaf().justify_qc().commit(),
212 ),
213 fetcher.provider.clone(),
214 once(LeafCallback::Leaf { fetcher }).chain(callbacks),
215 );
216 },
217 LeafId::Hash(h) => {
218 tracing::debug!("not fetching unknown leaf {h}");
222 },
223 }
224
225 Ok(())
226}
227
228pub(super) fn trigger_fetch_for_parent<Types, S, P>(
238 fetcher: &Arc<Fetcher<Types, S, P>>,
239 leaf: &LeafQueryData<Types>,
240) where
241 Types: NodeType,
242 Header<Types>: QueryableHeader<Types>,
243 Payload<Types>: QueryablePayload<Types>,
244 S: VersionedDataSource + 'static,
245 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
246 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
247 P: AvailabilityProvider<Types>,
248{
249 let height = leaf.height();
250 let parent = leaf.leaf().parent_commitment();
251 let parent_qc = leaf.leaf().justify_qc().commit();
252
253 if height == 0 {
255 return;
256 }
257
258 let fetcher = fetcher.clone();
261 let span = tracing::info_span!("fetch parent leaf", height, %parent, %parent_qc);
262 spawn(
263 async move {
264 match fetcher.storage.read().await {
266 Ok(mut tx) => {
267 if let Ok(pruned_height) = tx.load_pruned_height().await
269 && pruned_height.is_some_and(|ph| height <= ph)
270 {
271 tracing::info!(height, ?pruned_height, "not fetching pruned parent leaf");
272 return;
273 }
274
275 if tx.get_leaf(((height - 1) as usize).into()).await.is_ok() {
276 return;
277 }
278 },
279 Err(err) => {
280 tracing::warn!(
283 height,
284 %parent,
285 "error opening transaction to check for parent leaf: {err:#}",
286 );
287 },
288 }
289
290 tracing::info!(height, %parent, "received new leaf; fetching missing parent");
291 fetcher.leaf_fetcher.clone().spawn_fetch(
292 request::LeafRequest::new(height - 1, parent, parent_qc),
293 fetcher.provider.clone(),
294 [
297 LeafCallback::Leaf {
298 fetcher: fetcher.clone(),
299 },
300 HeaderCallback::Payload {
301 fetcher: fetcher.clone(),
302 }
303 .into(),
304 HeaderCallback::VidCommon {
305 fetcher: fetcher.clone(),
306 }
307 .into(),
308 ],
309 );
310 }
311 .instrument(span),
312 );
313}
314
315#[async_trait]
316impl<Types> RangedFetchable<Types> for LeafQueryData<Types>
317where
318 Types: NodeType,
319 Header<Types>: QueryableHeader<Types>,
320 Payload<Types>: QueryablePayload<Types>,
321{
322 type RangedRequest = LeafId<Types>;
323
324 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
325 where
326 S: AvailabilityStorage<Types>,
327 R: RangeBounds<usize> + Send + 'static,
328 {
329 storage.get_leaf_range(range).await
330 }
331}
332
333impl<Types> Storable<Types> for LeafQueryData<Types>
334where
335 Types: NodeType,
336{
337 fn debug_name(&self) -> String {
338 format!("leaf {}", self.height())
339 }
340
341 async fn notify(&self, notifiers: &Notifiers<Types>) {
342 notifiers.leaf.notify(self).await;
343 }
344
345 async fn store(
346 &self,
347 storage: &mut impl UpdateAvailabilityStorage<Types>,
348 _leaf_only: bool,
349 ) -> anyhow::Result<()> {
350 storage.insert_leaf(self).await
351 }
352}
353
354#[derive(Derivative, From)]
355#[derivative(Debug(bound = ""))]
356pub(super) enum LeafCallback<Types: NodeType, S, P> {
357 #[from(ignore)]
359 Leaf {
360 #[derivative(Debug = "ignore")]
361 fetcher: Arc<Fetcher<Types, S, P>>,
362 },
363 Continuation {
365 callback: HeaderCallback<Types, S, P>,
366 },
367}
368
369impl<Types: NodeType, S, P> PartialEq for LeafCallback<Types, S, P> {
370 fn eq(&self, other: &Self) -> bool {
371 self.cmp(other).is_eq()
372 }
373}
374
375impl<Types: NodeType, S, P> Eq for LeafCallback<Types, S, P> {}
376
377impl<Types: NodeType, S, P> Ord for LeafCallback<Types, S, P> {
378 fn cmp(&self, other: &Self) -> Ordering {
379 match (self, other) {
380 (Self::Leaf { .. }, Self::Continuation { .. }) => Ordering::Less,
382 (Self::Continuation { .. }, Self::Leaf { .. }) => Ordering::Greater,
383
384 (Self::Continuation { callback: cb1 }, Self::Continuation { callback: cb2 }) => {
385 cb1.cmp(cb2)
386 },
387 _ => Ordering::Equal,
388 }
389 }
390}
391
392impl<Types: NodeType, S, P> PartialOrd for LeafCallback<Types, S, P> {
393 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
394 Some(self.cmp(other))
395 }
396}
397
398impl<Types: NodeType, S, P> Callback<LeafQueryData<Types>> for LeafCallback<Types, S, P>
399where
400 Header<Types>: QueryableHeader<Types>,
401 Payload<Types>: QueryablePayload<Types>,
402 S: VersionedDataSource + 'static,
403 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
404 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
405 P: AvailabilityProvider<Types>,
406{
407 async fn run(self, leaf: LeafQueryData<Types>) {
408 match self {
409 Self::Leaf { fetcher } => {
410 tracing::info!("fetched leaf {}", leaf.height());
411 trigger_fetch_for_parent(&fetcher, &leaf);
413 fetcher.store_and_notify(&leaf).await;
414 },
415 Self::Continuation { callback } => callback.run(leaf.leaf.block_header().clone()),
416 }
417 }
418}
419
420impl<Types: NodeType, S, P> Callback<NonEmptyRange<LeafQueryData<Types>>>
421 for LeafCallback<Types, S, P>
422where
423 Header<Types>: QueryableHeader<Types>,
424 Payload<Types>: QueryablePayload<Types>,
425 S: VersionedDataSource + 'static,
426 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
427 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
428 P: AvailabilityProvider<Types>,
429{
430 async fn run(self, leaves: NonEmptyRange<LeafQueryData<Types>>) {
431 match self {
432 Self::Leaf { fetcher } => {
433 tracing::info!("fetched leaf range {}..{}", leaves.start(), leaves.end());
434 fetcher.store_and_notify(&leaves).await;
435
436 },
443 Self::Continuation { callback } => callback.run_range(leaves.as_ref_cloned()),
444 }
445 }
446}
447
448#[derive(Clone, Copy, Debug)]
450pub struct RangeRequest {
451 pub start: u64,
452 pub end: u64,
453}
454
455impl IntoIterator for RangeRequest {
456 type Item = u64;
457 type IntoIter = Range<u64>;
458
459 fn into_iter(self) -> Self::IntoIter {
460 self.start..self.end
461 }
462}
463
464impl FetchRequest for RangeRequest {
465 fn might_exist(self, heights: Heights) -> bool {
466 heights.pruned_height.is_none_or(|h| h < self.start) && self.end < heights.height
467 }
468}
469
470impl RangeRequest {
471 pub fn is_satisfied(&self, range: &NonEmptyRange<impl HeightIndexed>) -> bool {
472 range.start() == self.start && range.end() == self.end
473 }
474
475 pub fn len(&self) -> usize {
476 (self.end - self.start) as usize
477 }
478}
479
480#[async_trait]
481impl<Types> Fetchable<Types> for NonEmptyRange<LeafQueryData<Types>>
482where
483 Types: NodeType,
484 Header<Types>: QueryableHeader<Types>,
485 Payload<Types>: QueryablePayload<Types>,
486{
487 type Request = RangeRequest;
488
489 fn satisfies(&self, req: Self::Request) -> bool {
490 req.is_satisfied(self)
491 }
492
493 async fn passive_fetch(
494 notifiers: &Notifiers<Types>,
495 req: Self::Request,
496 ) -> BoxFuture<'static, Option<Self>> {
497 let waits = join_all(req.into_iter().map(|i| {
498 notifiers
499 .leaf
500 .wait_for(move |leaf| leaf.satisfies(LeafId::Number(i as usize)))
501 }))
502 .await;
503
504 join_all(waits.into_iter().map(|wait| wait.into_future()))
505 .map(|options| NonEmptyRange::new(options.into_iter().flatten()).ok())
506 .boxed()
507 }
508
509 async fn active_fetch<S, P>(
510 tx: &mut impl AvailabilityStorage<Types>,
511 fetcher: Arc<Fetcher<Types, S, P>>,
512 req: Self::Request,
513 ) -> anyhow::Result<()>
514 where
515 S: VersionedDataSource + 'static,
516 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
517 for<'a> S::ReadOnly<'a>:
518 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
519 P: AvailabilityProvider<Types>,
520 {
521 fetch_leaf_range_with_callbacks(tx, fetcher, req, None).await
522 }
523
524 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
525 where
526 S: AvailabilityStorage<Types>,
527 {
528 let leaves = storage
529 .get_leaf_range((req.start as usize)..(req.end as usize))
530 .await?
531 .into_iter()
532 .collect::<QueryResult<Vec<_>>>()?;
533 if leaves.len() != req.len() {
534 tracing::debug!(
535 ?req,
536 len = leaves.len(),
537 "database returned partial result, unable to load full range"
538 );
539 return Err(QueryError::Missing);
540 }
541 NonEmptyRange::new(leaves).map_err(|err| QueryError::Error {
542 message: format!("expected contiguous range, but: {err:#}"),
543 })
544 }
545}
546
547impl<Types> Storable<Types> for NonEmptyRange<LeafQueryData<Types>>
548where
549 Types: NodeType,
550{
551 fn debug_name(&self) -> String {
552 format!("leaf range {}..{}", self.start(), self.end())
553 }
554
555 async fn notify(&self, notifiers: &Notifiers<Types>) {
556 for leaf in self {
557 notifiers.leaf.notify(leaf).await;
558 }
559 }
560
561 async fn store(
562 &self,
563 storage: &mut impl UpdateAvailabilityStorage<Types>,
564 _leaf_only: bool,
565 ) -> anyhow::Result<()> {
566 storage.insert_leaf_range(self).await
567 }
568}
569
570pub(super) async fn fetch_leaf_range_with_callbacks<Types, S, P, I>(
571 tx: &mut impl AvailabilityStorage<Types>,
572 fetcher: Arc<Fetcher<Types, S, P>>,
573 req: RangeRequest,
574 callbacks: I,
575) -> anyhow::Result<()>
576where
577 Types: NodeType,
578 Header<Types>: QueryableHeader<Types>,
579 Payload<Types>: QueryablePayload<Types>,
580 S: VersionedDataSource + 'static,
581 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
582 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
583 P: AvailabilityProvider<Types>,
584 I: IntoIterator<Item = LeafCallback<Types, S, P>> + Send + 'static,
585 I::IntoIter: Send,
586{
587 let next = match tx.first_available_leaf(req.end).await {
590 Ok(leaf) if leaf.height() == req.end => leaf,
591 Ok(leaf) => {
592 tracing::debug!(
597 req.end,
598 fetching = leaf.height() - 1,
599 "do not have necessary leaf; trigger fetch of a later leaf"
600 );
601
602 let mut callbacks = vec![LeafCallback::Leaf {
603 fetcher: fetcher.clone(),
604 }];
605
606 if !fetcher.leaf_only {
607 callbacks.push(
608 HeaderCallback::Payload {
609 fetcher: fetcher.clone(),
610 }
611 .into(),
612 );
613 callbacks.push(
614 HeaderCallback::VidCommon {
615 fetcher: fetcher.clone(),
616 }
617 .into(),
618 );
619 }
620
621 fetcher.leaf_fetcher.clone().spawn_fetch(
622 request::LeafRequest::new(
623 leaf.height() - 1,
624 leaf.leaf().parent_commitment(),
625 leaf.leaf().justify_qc().commit(),
626 ),
627 fetcher.provider.clone(),
628 callbacks,
631 );
632 return Ok(());
633 },
634 Err(QueryError::Missing | QueryError::NotFound) => {
635 tracing::debug!(req.end, "not fetching leaf chain with unknown successor");
638 return Ok(());
639 },
640 Err(QueryError::Error { message }) => {
641 bail!(
644 "failed to fetch successor for leaf chain {}: {message}",
645 req.end
646 );
647 },
648 };
649
650 let fetcher = fetcher.clone();
651 fetcher.leaf_range_fetcher.clone().spawn_fetch(
652 LeafRangeRequest {
653 start: req.start,
654 end: req.end,
655 last_leaf: next.leaf().parent_commitment(),
656 last_qc: next.leaf().justify_qc().commit(),
657 },
658 fetcher.provider.clone(),
659 once(LeafCallback::Leaf { fetcher }).chain(callbacks),
660 );
661
662 Ok(())
663}