hotshot_query_service/data_source/fetching/
vid.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! [`Fetchable`] implementation for [`VidCommonQueryData`].
14
15use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use async_trait::async_trait;
18use derivative::Derivative;
19use derive_more::From;
20use futures::future::{BoxFuture, FutureExt, join_all};
21use hotshot_types::{
22    data::{VidCommon, VidShare},
23    traits::{block_contents::BlockHeader, node_implementation::NodeType},
24};
25
26use super::{
27    AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
28    Storable,
29    header::{HeaderCallback, fetch_header_and_then},
30};
31use crate::{
32    Header, Payload, QueryError, QueryResult,
33    availability::{
34        BlockId, QueryableHeader, QueryablePayload, VidCommonMetadata, VidCommonQueryData,
35    },
36    data_source::{
37        VersionedDataSource,
38        fetching::{header::fetch_header_range_and_then, leaf::RangeRequest},
39        storage::{
40            AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
41            pruning::PrunedHeightStorage,
42        },
43    },
44    fetching::{
45        self, Callback, NonEmptyRange,
46        request::{self, VidCommonRangeRequest},
47    },
48    types::HeightIndexed,
49};
50
51pub(super) type VidCommonFetcher<Types, S, P> =
52    fetching::Fetcher<request::VidCommonRequest, VidCommonCallback<Types, S, P>>;
53pub(super) type VidCommonRangeFetcher<Types, S, P> =
54    fetching::Fetcher<request::VidCommonRangeRequest, VidCommonRangeCallback<Types, S, P>>;
55
56#[derive(Clone, Copy, Debug, From)]
57pub(super) struct VidCommonRequest<Types: NodeType>(BlockId<Types>);
58
59impl<Types: NodeType> From<usize> for VidCommonRequest<Types> {
60    fn from(n: usize) -> Self {
61        Self(n.into())
62    }
63}
64
65impl<Types> FetchRequest for VidCommonRequest<Types>
66where
67    Types: NodeType,
68{
69    fn might_exist(self, heights: Heights) -> bool {
70        self.0.might_exist(heights)
71    }
72}
73
74#[async_trait]
75impl<Types> Fetchable<Types> for VidCommonQueryData<Types>
76where
77    Types: NodeType,
78    Header<Types>: QueryableHeader<Types>,
79    Payload<Types>: QueryablePayload<Types>,
80{
81    type Request = VidCommonRequest<Types>;
82
83    fn satisfies(&self, req: Self::Request) -> bool {
84        match req.0 {
85            BlockId::Number(n) => self.height() == n as u64,
86            BlockId::Hash(h) => self.block_hash() == h,
87            BlockId::PayloadHash(h) => self.payload_hash() == h,
88        }
89    }
90
91    async fn passive_fetch(
92        notifiers: &Notifiers<Types>,
93        req: Self::Request,
94    ) -> BoxFuture<'static, Option<Self>> {
95        notifiers
96            .vid_common
97            .wait_for(move |vid| vid.satisfies(req))
98            .await
99            .into_future()
100            .boxed()
101    }
102
103    async fn active_fetch<S, P>(
104        tx: &mut impl AvailabilityStorage<Types>,
105        fetcher: Arc<Fetcher<Types, S, P>>,
106        req: Self::Request,
107    ) -> anyhow::Result<()>
108    where
109        S: VersionedDataSource + 'static,
110        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
111        for<'a> S::ReadOnly<'a>:
112            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
113        P: AvailabilityProvider<Types>,
114    {
115        fetch_header_and_then(
116            tx,
117            req.0,
118            HeaderCallback::VidCommon {
119                fetcher: fetcher.clone(),
120            },
121        )
122        .await
123    }
124
125    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
126    where
127        S: AvailabilityStorage<Types>,
128    {
129        storage.get_vid_common(req.0).await
130    }
131}
132
133#[async_trait]
134impl<Types> RangedFetchable<Types> for VidCommonQueryData<Types>
135where
136    Types: NodeType,
137    Header<Types>: QueryableHeader<Types>,
138    Payload<Types>: QueryablePayload<Types>,
139{
140    type RangedRequest = VidCommonRequest<Types>;
141
142    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
143    where
144        S: AvailabilityStorage<Types>,
145        R: RangeBounds<usize> + Send + 'static,
146    {
147        storage.get_vid_common_range(range).await
148    }
149}
150
151impl<Types> Storable<Types> for VidCommonQueryData<Types>
152where
153    Types: NodeType,
154{
155    fn debug_name(&self) -> String {
156        format!("VID common {}", self.height())
157    }
158
159    async fn notify(&self, notifiers: &Notifiers<Types>) {
160        notifiers.vid_common.notify(self).await;
161    }
162
163    async fn store(
164        &self,
165        storage: &mut impl UpdateAvailabilityStorage<Types>,
166        _leaf_only: bool,
167    ) -> anyhow::Result<()> {
168        storage.insert_vid(self, None).await
169    }
170}
171
172impl<Types> Storable<Types> for (VidCommonQueryData<Types>, Option<VidShare>)
173where
174    Types: NodeType,
175{
176    fn debug_name(&self) -> String {
177        format!("VID data {}", self.0.height())
178    }
179
180    async fn notify(&self, notifiers: &Notifiers<Types>) {
181        notifiers.vid_common.notify(&self.0).await;
182    }
183
184    async fn store(
185        &self,
186        storage: &mut impl UpdateAvailabilityStorage<Types>,
187        _leaf_only: bool,
188    ) -> anyhow::Result<()> {
189        storage.insert_vid(&self.0, self.1.as_ref()).await
190    }
191}
192
193pub(super) fn fetch_vid_common_with_header<Types, S, P>(
194    fetcher: Arc<Fetcher<Types, S, P>>,
195    header: Header<Types>,
196) where
197    Types: NodeType,
198    Header<Types>: QueryableHeader<Types>,
199    Payload<Types>: QueryablePayload<Types>,
200    S: VersionedDataSource + 'static,
201    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
202    P: AvailabilityProvider<Types>,
203{
204    let Some(vid_fetcher) = fetcher.vid_common_fetcher.as_ref() else {
205        tracing::info!("not fetching vid because of leaf only mode");
206        return;
207    };
208
209    // Now that we have the header, we only need to retrieve the VID common data.
210    tracing::info!(
211        "spawned active fetch for VID common {:?} (height {})",
212        header.payload_commitment(),
213        header.block_number()
214    );
215    vid_fetcher.spawn_fetch(
216        request::VidCommonRequest(header.payload_commitment()),
217        fetcher.provider.clone(),
218        once(VidCommonCallback {
219            header,
220            fetcher: fetcher.clone(),
221        }),
222    );
223}
224
225#[derive(Derivative)]
226#[derivative(Debug(bound = ""))]
227pub(super) struct VidCommonCallback<Types: NodeType, S, P> {
228    header: Header<Types>,
229    #[derivative(Debug = "ignore")]
230    fetcher: Arc<Fetcher<Types, S, P>>,
231}
232
233impl<Types: NodeType, S, P> PartialEq for VidCommonCallback<Types, S, P> {
234    fn eq(&self, other: &Self) -> bool {
235        self.cmp(other).is_eq()
236    }
237}
238
239impl<Types: NodeType, S, P> Eq for VidCommonCallback<Types, S, P> {}
240
241impl<Types: NodeType, S, P> Ord for VidCommonCallback<Types, S, P> {
242    fn cmp(&self, other: &Self) -> Ordering {
243        self.header.block_number().cmp(&other.header.block_number())
244    }
245}
246
247impl<Types: NodeType, S, P> PartialOrd for VidCommonCallback<Types, S, P> {
248    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
249        Some(self.cmp(other))
250    }
251}
252
253impl<Types: NodeType, S, P> Callback<VidCommon> for VidCommonCallback<Types, S, P>
254where
255    Header<Types>: QueryableHeader<Types>,
256    Payload<Types>: QueryablePayload<Types>,
257    S: VersionedDataSource + 'static,
258    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
259    P: AvailabilityProvider<Types>,
260{
261    async fn run(self, common: VidCommon) {
262        let common = VidCommonQueryData::new(self.header, common);
263        self.fetcher.store_and_notify(&common).await;
264    }
265}
266
267#[async_trait]
268impl<Types> Fetchable<Types> for VidCommonMetadata<Types>
269where
270    Types: NodeType,
271    Header<Types>: QueryableHeader<Types>,
272    Payload<Types>: QueryablePayload<Types>,
273{
274    type Request = VidCommonRequest<Types>;
275
276    fn satisfies(&self, req: Self::Request) -> bool {
277        match req.0 {
278            BlockId::Number(n) => self.height == n as u64,
279            BlockId::Hash(h) => self.block_hash == h,
280            BlockId::PayloadHash(h) => self.payload_hash == h,
281        }
282    }
283
284    async fn passive_fetch(
285        notifiers: &Notifiers<Types>,
286        req: Self::Request,
287    ) -> BoxFuture<'static, Option<Self>> {
288        notifiers
289            .vid_common
290            .wait_for(move |vid| vid.satisfies(req))
291            .await
292            .into_future()
293            .map(|opt| opt.map(Self::from))
294            .boxed()
295    }
296
297    async fn active_fetch<S, P>(
298        tx: &mut impl AvailabilityStorage<Types>,
299        fetcher: Arc<Fetcher<Types, S, P>>,
300        req: Self::Request,
301    ) -> anyhow::Result<()>
302    where
303        S: VersionedDataSource + 'static,
304        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
305        for<'a> S::ReadOnly<'a>:
306            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
307        P: AvailabilityProvider<Types>,
308    {
309        // Do not fetch if we are in leaf only mode
310        if fetcher.leaf_only {
311            return Ok(());
312        }
313        // Trigger the full VID object to be fetched. This will be enough to satisfy this request
314        // for the summary.
315        VidCommonQueryData::active_fetch(tx, fetcher, req).await
316    }
317
318    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
319    where
320        S: AvailabilityStorage<Types>,
321    {
322        storage.get_vid_common_metadata(req.0).await
323    }
324}
325
326#[async_trait]
327impl<Types> RangedFetchable<Types> for VidCommonMetadata<Types>
328where
329    Types: NodeType,
330    Header<Types>: QueryableHeader<Types>,
331    Payload<Types>: QueryablePayload<Types>,
332{
333    type RangedRequest = VidCommonRequest<Types>;
334
335    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
336    where
337        S: AvailabilityStorage<Types>,
338        R: RangeBounds<usize> + Send + 'static,
339    {
340        storage.get_vid_common_metadata_range(range).await
341    }
342}
343
344#[async_trait]
345impl<Types> Fetchable<Types> for NonEmptyRange<VidCommonQueryData<Types>>
346where
347    Types: NodeType,
348    Header<Types>: QueryableHeader<Types>,
349    Payload<Types>: QueryablePayload<Types>,
350{
351    type Request = RangeRequest;
352
353    fn satisfies(&self, req: Self::Request) -> bool {
354        req.is_satisfied(self)
355    }
356
357    async fn passive_fetch(
358        notifiers: &Notifiers<Types>,
359        req: Self::Request,
360    ) -> BoxFuture<'static, Option<Self>> {
361        let waits = join_all(req.into_iter().map(|i| {
362            notifiers
363                .vid_common
364                .wait_for(move |vid| vid.satisfies(BlockId::Number(i as usize).into()))
365        }))
366        .await;
367
368        join_all(waits.into_iter().map(|wait| wait.into_future()))
369            .map(|options| NonEmptyRange::new(options.into_iter().flatten()).ok())
370            .boxed()
371    }
372
373    async fn active_fetch<S, P>(
374        tx: &mut impl AvailabilityStorage<Types>,
375        fetcher: Arc<Fetcher<Types, S, P>>,
376        req: Self::Request,
377    ) -> anyhow::Result<()>
378    where
379        S: VersionedDataSource + 'static,
380        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
381        for<'a> S::ReadOnly<'a>:
382            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
383        P: AvailabilityProvider<Types>,
384    {
385        fetch_header_range_and_then(tx, req, HeaderCallback::VidCommon { fetcher }).await
386    }
387
388    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
389    where
390        S: AvailabilityStorage<Types>,
391    {
392        let vid = storage
393            .get_vid_common_range((req.start as usize)..(req.end as usize))
394            .await?
395            .into_iter()
396            .collect::<QueryResult<Vec<_>>>()?;
397        if vid.len() != req.len() {
398            tracing::debug!(
399                ?req,
400                len = vid.len(),
401                "database returned partial result, unable to load full range"
402            );
403            return Err(QueryError::Missing);
404        }
405        NonEmptyRange::new(vid).map_err(|err| QueryError::Error {
406            message: format!("expected contiguous range, but: {err:#}"),
407        })
408    }
409}
410
411impl<Types> Storable<Types> for NonEmptyRange<VidCommonQueryData<Types>>
412where
413    Types: NodeType,
414{
415    fn debug_name(&self) -> String {
416        format!("VID common range {}..{}", self.start(), self.end())
417    }
418
419    async fn notify(&self, notifiers: &Notifiers<Types>) {
420        for common in self {
421            notifiers.vid_common.notify(common).await;
422        }
423    }
424
425    async fn store(
426        &self,
427        storage: &mut impl UpdateAvailabilityStorage<Types>,
428        leaf_only: bool,
429    ) -> anyhow::Result<()> {
430        if leaf_only {
431            return Ok(());
432        }
433
434        storage
435            .insert_vid_range(self.iter().map(|common| (common, None)))
436            .await
437    }
438}
439
440pub(super) fn fetch_vid_common_range_with_headers<Types, S, P>(
441    fetcher: Arc<Fetcher<Types, S, P>>,
442    headers: NonEmptyRange<Header<Types>>,
443) where
444    Types: NodeType,
445    Header<Types>: QueryableHeader<Types>,
446    Payload<Types>: QueryablePayload<Types>,
447    S: VersionedDataSource + 'static,
448    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
449    P: AvailabilityProvider<Types>,
450{
451    let Some(vid_common_range_fetcher) = fetcher.vid_common_range_fetcher.as_ref() else {
452        // If we're in light-weight mode, we don't need to fetch the VID common data.
453        return;
454    };
455
456    // Now that we have the header, we only need to retrieve the VID common.
457    tracing::info!(
458        "spawned active fetch for VID common range {}..{}",
459        headers.start(),
460        headers.end()
461    );
462    vid_common_range_fetcher.spawn_fetch(
463        VidCommonRangeRequest::from_headers(&headers),
464        fetcher.provider.clone(),
465        once(VidCommonRangeCallback {
466            fetcher: fetcher.clone(),
467        }),
468    );
469}
470
471#[derive(Derivative)]
472#[derivative(Debug(bound = ""))]
473pub(super) struct VidCommonRangeCallback<Types: NodeType, S, P> {
474    #[derivative(Debug = "ignore")]
475    fetcher: Arc<Fetcher<Types, S, P>>,
476}
477
478impl<Types: NodeType, S, P> PartialEq for VidCommonRangeCallback<Types, S, P> {
479    fn eq(&self, other: &Self) -> bool {
480        self.cmp(other).is_eq()
481    }
482}
483
484impl<Types: NodeType, S, P> Eq for VidCommonRangeCallback<Types, S, P> {}
485
486impl<Types: NodeType, S, P> Ord for VidCommonRangeCallback<Types, S, P> {
487    fn cmp(&self, _: &Self) -> Ordering {
488        // All callbacks for a given VID common range request do the same thing: just store the
489        // range.
490        Ordering::Equal
491    }
492}
493
494impl<Types: NodeType, S, P> PartialOrd for VidCommonRangeCallback<Types, S, P> {
495    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
496        Some(self.cmp(other))
497    }
498}
499
500impl<Types: NodeType, S, P> Callback<NonEmptyRange<VidCommonQueryData<Types>>>
501    for VidCommonRangeCallback<Types, S, P>
502where
503    Header<Types>: QueryableHeader<Types>,
504    Payload<Types>: QueryablePayload<Types>,
505    S: 'static + VersionedDataSource,
506    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
507    P: AvailabilityProvider<Types>,
508{
509    async fn run(self, commons: NonEmptyRange<VidCommonQueryData<Types>>) {
510        tracing::info!("fetched VID common {}..{}", commons.start(), commons.end());
511        self.fetcher.store_and_notify(&commons).await;
512    }
513}