1use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use async_trait::async_trait;
18use derivative::Derivative;
19use derive_more::From;
20use futures::future::{BoxFuture, FutureExt, join_all};
21use hotshot_types::{
22 data::{VidCommon, VidShare},
23 traits::{block_contents::BlockHeader, node_implementation::NodeType},
24};
25
26use super::{
27 AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
28 Storable,
29 header::{HeaderCallback, fetch_header_and_then},
30};
31use crate::{
32 Header, Payload, QueryError, QueryResult,
33 availability::{
34 BlockId, QueryableHeader, QueryablePayload, VidCommonMetadata, VidCommonQueryData,
35 },
36 data_source::{
37 VersionedDataSource,
38 fetching::{header::fetch_header_range_and_then, leaf::RangeRequest},
39 storage::{
40 AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
41 pruning::PrunedHeightStorage,
42 },
43 },
44 fetching::{
45 self, Callback, NonEmptyRange,
46 request::{self, VidCommonRangeRequest},
47 },
48 types::HeightIndexed,
49};
50
51pub(super) type VidCommonFetcher<Types, S, P> =
52 fetching::Fetcher<request::VidCommonRequest, VidCommonCallback<Types, S, P>>;
53pub(super) type VidCommonRangeFetcher<Types, S, P> =
54 fetching::Fetcher<request::VidCommonRangeRequest, VidCommonRangeCallback<Types, S, P>>;
55
56#[derive(Clone, Copy, Debug, From)]
57pub(super) struct VidCommonRequest<Types: NodeType>(BlockId<Types>);
58
59impl<Types: NodeType> From<usize> for VidCommonRequest<Types> {
60 fn from(n: usize) -> Self {
61 Self(n.into())
62 }
63}
64
65impl<Types> FetchRequest for VidCommonRequest<Types>
66where
67 Types: NodeType,
68{
69 fn might_exist(self, heights: Heights) -> bool {
70 self.0.might_exist(heights)
71 }
72}
73
74#[async_trait]
75impl<Types> Fetchable<Types> for VidCommonQueryData<Types>
76where
77 Types: NodeType,
78 Header<Types>: QueryableHeader<Types>,
79 Payload<Types>: QueryablePayload<Types>,
80{
81 type Request = VidCommonRequest<Types>;
82
83 fn satisfies(&self, req: Self::Request) -> bool {
84 match req.0 {
85 BlockId::Number(n) => self.height() == n as u64,
86 BlockId::Hash(h) => self.block_hash() == h,
87 BlockId::PayloadHash(h) => self.payload_hash() == h,
88 }
89 }
90
91 async fn passive_fetch(
92 notifiers: &Notifiers<Types>,
93 req: Self::Request,
94 ) -> BoxFuture<'static, Option<Self>> {
95 notifiers
96 .vid_common
97 .wait_for(move |vid| vid.satisfies(req))
98 .await
99 .into_future()
100 .boxed()
101 }
102
103 async fn active_fetch<S, P>(
104 tx: &mut impl AvailabilityStorage<Types>,
105 fetcher: Arc<Fetcher<Types, S, P>>,
106 req: Self::Request,
107 ) -> anyhow::Result<()>
108 where
109 S: VersionedDataSource + 'static,
110 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
111 for<'a> S::ReadOnly<'a>:
112 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
113 P: AvailabilityProvider<Types>,
114 {
115 fetch_header_and_then(
116 tx,
117 req.0,
118 HeaderCallback::VidCommon {
119 fetcher: fetcher.clone(),
120 },
121 )
122 .await
123 }
124
125 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
126 where
127 S: AvailabilityStorage<Types>,
128 {
129 storage.get_vid_common(req.0).await
130 }
131}
132
133#[async_trait]
134impl<Types> RangedFetchable<Types> for VidCommonQueryData<Types>
135where
136 Types: NodeType,
137 Header<Types>: QueryableHeader<Types>,
138 Payload<Types>: QueryablePayload<Types>,
139{
140 type RangedRequest = VidCommonRequest<Types>;
141
142 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
143 where
144 S: AvailabilityStorage<Types>,
145 R: RangeBounds<usize> + Send + 'static,
146 {
147 storage.get_vid_common_range(range).await
148 }
149}
150
151impl<Types> Storable<Types> for VidCommonQueryData<Types>
152where
153 Types: NodeType,
154{
155 fn debug_name(&self) -> String {
156 format!("VID common {}", self.height())
157 }
158
159 async fn notify(&self, notifiers: &Notifiers<Types>) {
160 notifiers.vid_common.notify(self).await;
161 }
162
163 async fn store(
164 &self,
165 storage: &mut impl UpdateAvailabilityStorage<Types>,
166 _leaf_only: bool,
167 ) -> anyhow::Result<()> {
168 storage.insert_vid(self, None).await
169 }
170}
171
172impl<Types> Storable<Types> for (VidCommonQueryData<Types>, Option<VidShare>)
173where
174 Types: NodeType,
175{
176 fn debug_name(&self) -> String {
177 format!("VID data {}", self.0.height())
178 }
179
180 async fn notify(&self, notifiers: &Notifiers<Types>) {
181 notifiers.vid_common.notify(&self.0).await;
182 }
183
184 async fn store(
185 &self,
186 storage: &mut impl UpdateAvailabilityStorage<Types>,
187 _leaf_only: bool,
188 ) -> anyhow::Result<()> {
189 storage.insert_vid(&self.0, self.1.as_ref()).await
190 }
191}
192
193pub(super) fn fetch_vid_common_with_header<Types, S, P>(
194 fetcher: Arc<Fetcher<Types, S, P>>,
195 header: Header<Types>,
196) where
197 Types: NodeType,
198 Header<Types>: QueryableHeader<Types>,
199 Payload<Types>: QueryablePayload<Types>,
200 S: VersionedDataSource + 'static,
201 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
202 P: AvailabilityProvider<Types>,
203{
204 let Some(vid_fetcher) = fetcher.vid_common_fetcher.as_ref() else {
205 tracing::info!("not fetching vid because of leaf only mode");
206 return;
207 };
208
209 tracing::info!(
211 "spawned active fetch for VID common {:?} (height {})",
212 header.payload_commitment(),
213 header.block_number()
214 );
215 vid_fetcher.spawn_fetch(
216 request::VidCommonRequest(header.payload_commitment()),
217 fetcher.provider.clone(),
218 once(VidCommonCallback {
219 header,
220 fetcher: fetcher.clone(),
221 }),
222 );
223}
224
225#[derive(Derivative)]
226#[derivative(Debug(bound = ""))]
227pub(super) struct VidCommonCallback<Types: NodeType, S, P> {
228 header: Header<Types>,
229 #[derivative(Debug = "ignore")]
230 fetcher: Arc<Fetcher<Types, S, P>>,
231}
232
233impl<Types: NodeType, S, P> PartialEq for VidCommonCallback<Types, S, P> {
234 fn eq(&self, other: &Self) -> bool {
235 self.cmp(other).is_eq()
236 }
237}
238
239impl<Types: NodeType, S, P> Eq for VidCommonCallback<Types, S, P> {}
240
241impl<Types: NodeType, S, P> Ord for VidCommonCallback<Types, S, P> {
242 fn cmp(&self, other: &Self) -> Ordering {
243 self.header.block_number().cmp(&other.header.block_number())
244 }
245}
246
247impl<Types: NodeType, S, P> PartialOrd for VidCommonCallback<Types, S, P> {
248 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
249 Some(self.cmp(other))
250 }
251}
252
253impl<Types: NodeType, S, P> Callback<VidCommon> for VidCommonCallback<Types, S, P>
254where
255 Header<Types>: QueryableHeader<Types>,
256 Payload<Types>: QueryablePayload<Types>,
257 S: VersionedDataSource + 'static,
258 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
259 P: AvailabilityProvider<Types>,
260{
261 async fn run(self, common: VidCommon) {
262 let common = VidCommonQueryData::new(self.header, common);
263 self.fetcher.store_and_notify(&common).await;
264 }
265}
266
267#[async_trait]
268impl<Types> Fetchable<Types> for VidCommonMetadata<Types>
269where
270 Types: NodeType,
271 Header<Types>: QueryableHeader<Types>,
272 Payload<Types>: QueryablePayload<Types>,
273{
274 type Request = VidCommonRequest<Types>;
275
276 fn satisfies(&self, req: Self::Request) -> bool {
277 match req.0 {
278 BlockId::Number(n) => self.height == n as u64,
279 BlockId::Hash(h) => self.block_hash == h,
280 BlockId::PayloadHash(h) => self.payload_hash == h,
281 }
282 }
283
284 async fn passive_fetch(
285 notifiers: &Notifiers<Types>,
286 req: Self::Request,
287 ) -> BoxFuture<'static, Option<Self>> {
288 notifiers
289 .vid_common
290 .wait_for(move |vid| vid.satisfies(req))
291 .await
292 .into_future()
293 .map(|opt| opt.map(Self::from))
294 .boxed()
295 }
296
297 async fn active_fetch<S, P>(
298 tx: &mut impl AvailabilityStorage<Types>,
299 fetcher: Arc<Fetcher<Types, S, P>>,
300 req: Self::Request,
301 ) -> anyhow::Result<()>
302 where
303 S: VersionedDataSource + 'static,
304 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
305 for<'a> S::ReadOnly<'a>:
306 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
307 P: AvailabilityProvider<Types>,
308 {
309 if fetcher.leaf_only {
311 return Ok(());
312 }
313 VidCommonQueryData::active_fetch(tx, fetcher, req).await
316 }
317
318 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
319 where
320 S: AvailabilityStorage<Types>,
321 {
322 storage.get_vid_common_metadata(req.0).await
323 }
324}
325
326#[async_trait]
327impl<Types> RangedFetchable<Types> for VidCommonMetadata<Types>
328where
329 Types: NodeType,
330 Header<Types>: QueryableHeader<Types>,
331 Payload<Types>: QueryablePayload<Types>,
332{
333 type RangedRequest = VidCommonRequest<Types>;
334
335 async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
336 where
337 S: AvailabilityStorage<Types>,
338 R: RangeBounds<usize> + Send + 'static,
339 {
340 storage.get_vid_common_metadata_range(range).await
341 }
342}
343
344#[async_trait]
345impl<Types> Fetchable<Types> for NonEmptyRange<VidCommonQueryData<Types>>
346where
347 Types: NodeType,
348 Header<Types>: QueryableHeader<Types>,
349 Payload<Types>: QueryablePayload<Types>,
350{
351 type Request = RangeRequest;
352
353 fn satisfies(&self, req: Self::Request) -> bool {
354 req.is_satisfied(self)
355 }
356
357 async fn passive_fetch(
358 notifiers: &Notifiers<Types>,
359 req: Self::Request,
360 ) -> BoxFuture<'static, Option<Self>> {
361 let waits = join_all(req.into_iter().map(|i| {
362 notifiers
363 .vid_common
364 .wait_for(move |vid| vid.satisfies(BlockId::Number(i as usize).into()))
365 }))
366 .await;
367
368 join_all(waits.into_iter().map(|wait| wait.into_future()))
369 .map(|options| NonEmptyRange::new(options.into_iter().flatten()).ok())
370 .boxed()
371 }
372
373 async fn active_fetch<S, P>(
374 tx: &mut impl AvailabilityStorage<Types>,
375 fetcher: Arc<Fetcher<Types, S, P>>,
376 req: Self::Request,
377 ) -> anyhow::Result<()>
378 where
379 S: VersionedDataSource + 'static,
380 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
381 for<'a> S::ReadOnly<'a>:
382 AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
383 P: AvailabilityProvider<Types>,
384 {
385 fetch_header_range_and_then(tx, req, HeaderCallback::VidCommon { fetcher }).await
386 }
387
388 async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
389 where
390 S: AvailabilityStorage<Types>,
391 {
392 let vid = storage
393 .get_vid_common_range((req.start as usize)..(req.end as usize))
394 .await?
395 .into_iter()
396 .collect::<QueryResult<Vec<_>>>()?;
397 if vid.len() != req.len() {
398 tracing::debug!(
399 ?req,
400 len = vid.len(),
401 "database returned partial result, unable to load full range"
402 );
403 return Err(QueryError::Missing);
404 }
405 NonEmptyRange::new(vid).map_err(|err| QueryError::Error {
406 message: format!("expected contiguous range, but: {err:#}"),
407 })
408 }
409}
410
411impl<Types> Storable<Types> for NonEmptyRange<VidCommonQueryData<Types>>
412where
413 Types: NodeType,
414{
415 fn debug_name(&self) -> String {
416 format!("VID common range {}..{}", self.start(), self.end())
417 }
418
419 async fn notify(&self, notifiers: &Notifiers<Types>) {
420 for common in self {
421 notifiers.vid_common.notify(common).await;
422 }
423 }
424
425 async fn store(
426 &self,
427 storage: &mut impl UpdateAvailabilityStorage<Types>,
428 leaf_only: bool,
429 ) -> anyhow::Result<()> {
430 if leaf_only {
431 return Ok(());
432 }
433
434 storage
435 .insert_vid_range(self.iter().map(|common| (common, None)))
436 .await
437 }
438}
439
440pub(super) fn fetch_vid_common_range_with_headers<Types, S, P>(
441 fetcher: Arc<Fetcher<Types, S, P>>,
442 headers: NonEmptyRange<Header<Types>>,
443) where
444 Types: NodeType,
445 Header<Types>: QueryableHeader<Types>,
446 Payload<Types>: QueryablePayload<Types>,
447 S: VersionedDataSource + 'static,
448 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
449 P: AvailabilityProvider<Types>,
450{
451 let Some(vid_common_range_fetcher) = fetcher.vid_common_range_fetcher.as_ref() else {
452 return;
454 };
455
456 tracing::info!(
458 "spawned active fetch for VID common range {}..{}",
459 headers.start(),
460 headers.end()
461 );
462 vid_common_range_fetcher.spawn_fetch(
463 VidCommonRangeRequest::from_headers(&headers),
464 fetcher.provider.clone(),
465 once(VidCommonRangeCallback {
466 fetcher: fetcher.clone(),
467 }),
468 );
469}
470
471#[derive(Derivative)]
472#[derivative(Debug(bound = ""))]
473pub(super) struct VidCommonRangeCallback<Types: NodeType, S, P> {
474 #[derivative(Debug = "ignore")]
475 fetcher: Arc<Fetcher<Types, S, P>>,
476}
477
478impl<Types: NodeType, S, P> PartialEq for VidCommonRangeCallback<Types, S, P> {
479 fn eq(&self, other: &Self) -> bool {
480 self.cmp(other).is_eq()
481 }
482}
483
484impl<Types: NodeType, S, P> Eq for VidCommonRangeCallback<Types, S, P> {}
485
486impl<Types: NodeType, S, P> Ord for VidCommonRangeCallback<Types, S, P> {
487 fn cmp(&self, _: &Self) -> Ordering {
488 Ordering::Equal
491 }
492}
493
494impl<Types: NodeType, S, P> PartialOrd for VidCommonRangeCallback<Types, S, P> {
495 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
496 Some(self.cmp(other))
497 }
498}
499
500impl<Types: NodeType, S, P> Callback<NonEmptyRange<VidCommonQueryData<Types>>>
501 for VidCommonRangeCallback<Types, S, P>
502where
503 Header<Types>: QueryableHeader<Types>,
504 Payload<Types>: QueryablePayload<Types>,
505 S: 'static + VersionedDataSource,
506 for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
507 P: AvailabilityProvider<Types>,
508{
509 async fn run(self, commons: NonEmptyRange<VidCommonQueryData<Types>>) {
510 tracing::info!("fetched VID common {}..{}", commons.start(), commons.end());
511 self.fetcher.store_and_notify(&commons).await;
512 }
513}