1use std::{
16 cmp::Ordering,
17 fmt::Debug,
18 future::IntoFuture,
19 iter::once,
20 ops::{Range, RangeBounds},
21 sync::Arc,
22};
23
24use async_trait::async_trait;
25use derivative::Derivative;
26use derive_more::From;
27use futures::future::{BoxFuture, FutureExt, join_all};
28use hotshot_types::traits::node_implementation::NodeType;
29use tokio::spawn;
30use tracing::Instrument;
31
32use super::{
33 AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
34 Storable, header::HeaderCallback,
35};
36use crate::{
37 Header, Payload, QueryError, QueryResult,
38 availability::{LeafId, LeafQueryData, QueryableHeader, QueryablePayload},
39 data_source::{
40 VersionedDataSource,
41 storage::{
42 AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
43 pruning::PrunedHeightStorage,
44 },
45 },
46 fetching::{
47 self, Callback, NonEmptyRange,
48 request::{self, LeafRangeRequest},
49 },
50 types::HeightIndexed,
51};
52
53pub(super) type LeafFetcher<Types, S, P> =
54 fetching::Fetcher<request::LeafRequest, LeafCallback<Types, S, P>>;
55
56pub(super) type LeafRangeFetcher<Types, S, P> =
57 fetching::Fetcher<LeafRangeRequest, LeafCallback<Types, S, P>>;
58
59impl<Types> FetchRequest for LeafId<Types>
60where
61 Types: NodeType,
62{
63 fn might_exist(self, heights: Heights) -> bool {
64 if let LeafId::Number(n) = self {
65 heights.might_exist(n as u64)
66 } else {
67 true
68 }
69 }
70}
71
72#[async_trait]
73impl<Types> Fetchable<Types> for LeafQueryData<Types>
74where
75 Types: NodeType,
76 Header<Types>: QueryableHeader<Types>,
77 Payload<Types>: QueryablePayload<Types>,
78{
79 type Request = LeafId<Types>;
80
81 fn satisfies(&self, req: Self::Request) -> bool {
82 match req {
83 LeafId::Number(n) => self.height() == n as u64,
84 LeafId::Hash(h) => self.hash() == h,
85 }
86 }
87
88 async fn passive_fetch(
89 notifiers: &Notifiers<Types>,
90 req: Self::Request,
91 ) -> BoxFuture<'static, Option<Self>> {
92 notifiers
93 .leaf
94 .wait_for(move |leaf| leaf.satisfies(req))
95 .await
96 .into_future()
97 .boxed()
98 }
99
100 async fn active_fetch<S, P>(
101 _tx: &mut impl AvailabilityStorage<Types>,
102 fetcher: Arc<Fetcher<Types, S, P>>,
103 req: Self::Request,
104 ) -> anyhow::Result<()>
105 where
106 S: VersionedDataSource + 'static,
107 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
108 for<'a> S::ReadOnly<'a>:
109 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
110 P: AvailabilityProvider<Types>,
111 {
112 fetch_leaf_with_callbacks(fetcher, req, None).await
113 }
114
115 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
116 where
117 S: AvailabilityStorage<Types>,
118 {
119 storage.get_leaf(req).await
120 }
121}
122
123pub(super) async fn fetch_leaf_with_callbacks<Types, S, P, I>(
124 fetcher: Arc<Fetcher<Types, S, P>>,
125 req: LeafId<Types>,
126 callbacks: I,
127) -> anyhow::Result<()>
128where
129 Types: NodeType,
130 Header<Types>: QueryableHeader<Types>,
131 Payload<Types>: QueryablePayload<Types>,
132 S: VersionedDataSource + 'static,
133 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
134 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
135 P: AvailabilityProvider<Types>,
136 I: IntoIterator<Item = LeafCallback<Types, S, P>> + Send + 'static,
137 I::IntoIter: Send,
138{
139 match req {
140 LeafId::Number(n) => {
141 let fetcher = fetcher.clone();
142 fetcher.leaf_fetcher.clone().spawn_fetch(
143 request::LeafRequest::new(n as u64),
144 fetcher.provider.clone(),
145 once(LeafCallback::Leaf { fetcher }).chain(callbacks),
146 );
147 },
148 LeafId::Hash(h) => {
149 tracing::debug!("not fetching unknown leaf {h}");
153 },
154 }
155
156 Ok(())
157}
158
159pub(super) fn trigger_fetch_for_parent<Types, S, P>(
165 fetcher: &Arc<Fetcher<Types, S, P>>,
166 leaf: &LeafQueryData<Types>,
167) where
168 Types: NodeType,
169 Header<Types>: QueryableHeader<Types>,
170 Payload<Types>: QueryablePayload<Types>,
171 S: VersionedDataSource + 'static,
172 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
173 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
174 P: AvailabilityProvider<Types>,
175{
176 let height = leaf.height();
177
178 if height == 0 {
180 return;
181 }
182
183 let fetcher = fetcher.clone();
186 let span = tracing::info_span!("fetch parent leaf", height);
187 spawn(
188 async move {
189 match fetcher.storage.read().await {
191 Ok(mut tx) => {
192 if let Ok(pruned_height) = tx.load_pruned_height().await
194 && pruned_height.is_some_and(|ph| height <= ph)
195 {
196 tracing::info!(height, ?pruned_height, "not fetching pruned parent leaf");
197 return;
198 }
199
200 if tx.get_leaf(((height - 1) as usize).into()).await.is_ok() {
201 return;
202 }
203 },
204 Err(err) => {
205 tracing::warn!(
208 height,
209 "error opening transaction to check for parent leaf: {err:#}",
210 );
211 },
212 }
213
214 tracing::info!(height, "received new leaf; fetching missing parent");
215 fetcher.leaf_fetcher.clone().spawn_fetch(
216 request::LeafRequest::new(height - 1),
217 fetcher.provider.clone(),
218 [
221 LeafCallback::Leaf {
222 fetcher: fetcher.clone(),
223 },
224 HeaderCallback::Payload {
225 fetcher: fetcher.clone(),
226 }
227 .into(),
228 HeaderCallback::VidCommon {
229 fetcher: fetcher.clone(),
230 }
231 .into(),
232 ],
233 );
234 }
235 .instrument(span),
236 );
237}
238
239#[async_trait]
240impl<Types> RangedFetchable<Types> for LeafQueryData<Types>
241where
242 Types: NodeType,
243 Header<Types>: QueryableHeader<Types>,
244 Payload<Types>: QueryablePayload<Types>,
245{
246 type RangedRequest = LeafId<Types>;
247
248 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
249 where
250 S: AvailabilityStorage<Types>,
251 R: RangeBounds<usize> + Send + 'static,
252 {
253 storage.get_leaf_range(range).await
254 }
255}
256
257impl<Types> Storable<Types> for LeafQueryData<Types>
258where
259 Types: NodeType,
260{
261 fn debug_name(&self) -> String {
262 format!("leaf {}", self.height())
263 }
264
265 async fn notify(&self, notifiers: &Notifiers<Types>) {
266 notifiers.leaf.notify(self).await;
267 }
268
269 async fn store(
270 &self,
271 storage: &mut impl UpdateAvailabilityStorage<Types>,
272 _leaf_only: bool,
273 ) -> anyhow::Result<()> {
274 storage.insert_leaf(self).await
275 }
276}
277
278#[derive(Derivative, From)]
279#[derivative(Debug(bound = ""))]
280pub(super) enum LeafCallback<Types: NodeType, S, P> {
281 #[from(ignore)]
283 Leaf {
284 #[derivative(Debug = "ignore")]
285 fetcher: Arc<Fetcher<Types, S, P>>,
286 },
287 Continuation {
289 callback: HeaderCallback<Types, S, P>,
290 },
291}
292
293impl<Types: NodeType, S, P> PartialEq for LeafCallback<Types, S, P> {
294 fn eq(&self, other: &Self) -> bool {
295 self.cmp(other).is_eq()
296 }
297}
298
299impl<Types: NodeType, S, P> Eq for LeafCallback<Types, S, P> {}
300
301impl<Types: NodeType, S, P> Ord for LeafCallback<Types, S, P> {
302 fn cmp(&self, other: &Self) -> Ordering {
303 match (self, other) {
304 (Self::Leaf { .. }, Self::Continuation { .. }) => Ordering::Less,
306 (Self::Continuation { .. }, Self::Leaf { .. }) => Ordering::Greater,
307
308 (Self::Continuation { callback: cb1 }, Self::Continuation { callback: cb2 }) => {
309 cb1.cmp(cb2)
310 },
311 _ => Ordering::Equal,
312 }
313 }
314}
315
316impl<Types: NodeType, S, P> PartialOrd for LeafCallback<Types, S, P> {
317 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
318 Some(self.cmp(other))
319 }
320}
321
322impl<Types: NodeType, S, P> Callback<LeafQueryData<Types>> for LeafCallback<Types, S, P>
323where
324 Header<Types>: QueryableHeader<Types>,
325 Payload<Types>: QueryablePayload<Types>,
326 S: VersionedDataSource + 'static,
327 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
328 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
329 P: AvailabilityProvider<Types>,
330{
331 async fn run(self, leaf: LeafQueryData<Types>) {
332 match self {
333 Self::Leaf { fetcher } => {
334 tracing::info!("fetched leaf {}", leaf.height());
335 trigger_fetch_for_parent(&fetcher, &leaf);
337 fetcher.store_and_notify(&leaf).await;
338 },
339 Self::Continuation { callback } => callback.run(leaf.leaf.block_header().clone()),
340 }
341 }
342}
343
344impl<Types: NodeType, S, P> Callback<NonEmptyRange<LeafQueryData<Types>>>
345 for LeafCallback<Types, S, P>
346where
347 Header<Types>: QueryableHeader<Types>,
348 Payload<Types>: QueryablePayload<Types>,
349 S: VersionedDataSource + 'static,
350 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
351 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
352 P: AvailabilityProvider<Types>,
353{
354 async fn run(self, leaves: NonEmptyRange<LeafQueryData<Types>>) {
355 match self {
356 Self::Leaf { fetcher } => {
357 tracing::info!("fetched leaf range {}..{}", leaves.start(), leaves.end());
358 fetcher.store_and_notify(&leaves).await;
359
360 },
367 Self::Continuation { callback } => callback.run_range(leaves.start(), leaves.end()),
368 }
369 }
370}
371
372#[derive(Clone, Copy, Debug)]
374pub struct RangeRequest {
375 pub start: u64,
376 pub end: u64,
377}
378
379impl IntoIterator for RangeRequest {
380 type Item = u64;
381 type IntoIter = Range<u64>;
382
383 fn into_iter(self) -> Self::IntoIter {
384 self.start..self.end
385 }
386}
387
388impl FetchRequest for RangeRequest {
389 fn might_exist(self, heights: Heights) -> bool {
390 heights.pruned_height.is_none_or(|h| h < self.start) && self.end < heights.height
391 }
392}
393
394impl RangeRequest {
395 pub fn is_satisfied(&self, range: &NonEmptyRange<impl HeightIndexed>) -> bool {
396 range.start() == self.start && range.end() == self.end
397 }
398
399 pub fn len(&self) -> usize {
400 (self.end - self.start) as usize
401 }
402}
403
404#[async_trait]
405impl<Types> Fetchable<Types> for NonEmptyRange<LeafQueryData<Types>>
406where
407 Types: NodeType,
408 Header<Types>: QueryableHeader<Types>,
409 Payload<Types>: QueryablePayload<Types>,
410{
411 type Request = RangeRequest;
412
413 fn satisfies(&self, req: Self::Request) -> bool {
414 req.is_satisfied(self)
415 }
416
417 async fn passive_fetch(
418 notifiers: &Notifiers<Types>,
419 req: Self::Request,
420 ) -> BoxFuture<'static, Option<Self>> {
421 let waits = join_all(req.into_iter().map(|i| {
422 notifiers
423 .leaf
424 .wait_for(move |leaf| leaf.satisfies(LeafId::Number(i as usize)))
425 }))
426 .await;
427
428 join_all(waits.into_iter().map(|wait| wait.into_future()))
429 .map(|options| NonEmptyRange::new(options.into_iter().flatten()).ok())
430 .boxed()
431 }
432
433 async fn active_fetch<S, P>(
434 _tx: &mut impl AvailabilityStorage<Types>,
435 fetcher: Arc<Fetcher<Types, S, P>>,
436 req: Self::Request,
437 ) -> anyhow::Result<()>
438 where
439 S: VersionedDataSource + 'static,
440 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
441 for<'a> S::ReadOnly<'a>:
442 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
443 P: AvailabilityProvider<Types>,
444 {
445 fetch_leaf_range_with_callbacks(fetcher, req, None).await
446 }
447
448 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
449 where
450 S: AvailabilityStorage<Types>,
451 {
452 let leaves = storage
453 .get_leaf_range((req.start as usize)..(req.end as usize))
454 .await?
455 .into_iter()
456 .collect::<QueryResult<Vec<_>>>()?;
457 if leaves.len() != req.len() {
458 tracing::debug!(
459 ?req,
460 len = leaves.len(),
461 "database returned partial result, unable to load full range"
462 );
463 return Err(QueryError::Missing);
464 }
465 NonEmptyRange::new(leaves).map_err(|err| QueryError::Error {
466 message: format!("expected contiguous range, but: {err:#}"),
467 })
468 }
469}
470
471impl<Types> Storable<Types> for NonEmptyRange<LeafQueryData<Types>>
472where
473 Types: NodeType,
474{
475 fn debug_name(&self) -> String {
476 format!("leaf range {}..{}", self.start(), self.end())
477 }
478
479 async fn notify(&self, notifiers: &Notifiers<Types>) {
480 for leaf in self {
481 notifiers.leaf.notify(leaf).await;
482 }
483 }
484
485 async fn store(
486 &self,
487 storage: &mut impl UpdateAvailabilityStorage<Types>,
488 _leaf_only: bool,
489 ) -> anyhow::Result<()> {
490 storage.insert_leaf_range(self).await
491 }
492}
493
494pub(super) async fn fetch_leaf_range_with_callbacks<Types, S, P, I>(
495 fetcher: Arc<Fetcher<Types, S, P>>,
496 req: RangeRequest,
497 callbacks: I,
498) -> anyhow::Result<()>
499where
500 Types: NodeType,
501 Header<Types>: QueryableHeader<Types>,
502 Payload<Types>: QueryablePayload<Types>,
503 S: VersionedDataSource + 'static,
504 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
505 for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
506 P: AvailabilityProvider<Types>,
507 I: IntoIterator<Item = LeafCallback<Types, S, P>> + Send + 'static,
508 I::IntoIter: Send,
509{
510 let fetcher = fetcher.clone();
511 fetcher.leaf_range_fetcher.clone().spawn_fetch(
512 LeafRangeRequest {
513 start: req.start,
514 end: req.end,
515 },
516 fetcher.provider.clone(),
517 once(LeafCallback::Leaf { fetcher }).chain(callbacks),
518 );
519
520 Ok(())
521}