hotshot_query_service/data_source/fetching/
block.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! [`Fetchable`] implementation for [`BlockQueryData`] and [`PayloadQueryData`].
14
15use std::{cmp::Ordering, future::IntoFuture, iter::once, ops::RangeBounds, sync::Arc};
16
17use async_trait::async_trait;
18use derivative::Derivative;
19use futures::future::{BoxFuture, FutureExt, join_all};
20use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::NodeType};
21
22use super::{
23    AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
24    Storable,
25    header::{HeaderCallback, fetch_header_and_then},
26};
27use crate::{
28    Header, Payload, QueryError, QueryResult,
29    availability::{
30        BlockId, BlockQueryData, PayloadMetadata, PayloadQueryData, QueryableHeader,
31        QueryablePayload,
32    },
33    data_source::{
34        VersionedDataSource,
35        fetching::{header::fetch_header_range_and_then, leaf::RangeRequest},
36        storage::{
37            AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
38            pruning::PrunedHeightStorage,
39        },
40    },
41    fetching::{
42        self, Callback, NonEmptyRange,
43        request::{self, BlockRangeRequest, PayloadRequest},
44    },
45    types::HeightIndexed,
46};
47pub(super) type PayloadFetcher<Types, S, P> =
48    fetching::Fetcher<request::PayloadRequest, PayloadCallback<Types, S, P>>;
49
50pub(super) type PayloadRangeFetcher<Types, S, P> =
51    fetching::Fetcher<request::BlockRangeRequest, BlockRangeCallback<Types, S, P>>;
52
53impl<Types> FetchRequest for BlockId<Types>
54where
55    Types: NodeType,
56{
57    fn might_exist(self, heights: Heights) -> bool {
58        if let BlockId::Number(n) = self {
59            heights.might_exist(n as u64)
60        } else {
61            true
62        }
63    }
64}
65
66#[async_trait]
67impl<Types> Fetchable<Types> for BlockQueryData<Types>
68where
69    Types: NodeType,
70    Header<Types>: QueryableHeader<Types>,
71    Payload<Types>: QueryablePayload<Types>,
72{
73    type Request = BlockId<Types>;
74
75    fn satisfies(&self, req: Self::Request) -> bool {
76        match req {
77            BlockId::Number(n) => self.height() == n as u64,
78            BlockId::Hash(h) => self.hash() == h,
79            BlockId::PayloadHash(h) => self.payload_hash() == h,
80        }
81    }
82
83    async fn passive_fetch(
84        notifiers: &Notifiers<Types>,
85        req: Self::Request,
86    ) -> BoxFuture<'static, Option<Self>> {
87        notifiers
88            .block
89            .wait_for(move |block| block.satisfies(req))
90            .await
91            .into_future()
92            .boxed()
93    }
94
95    async fn active_fetch<S, P>(
96        tx: &mut impl AvailabilityStorage<Types>,
97        fetcher: Arc<Fetcher<Types, S, P>>,
98        req: Self::Request,
99    ) -> anyhow::Result<()>
100    where
101        S: VersionedDataSource + 'static,
102        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
103        for<'a> S::ReadOnly<'a>:
104            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
105        P: AvailabilityProvider<Types>,
106    {
107        fetch_header_and_then(
108            tx,
109            req,
110            HeaderCallback::Payload {
111                fetcher: fetcher.clone(),
112            },
113        )
114        .await
115    }
116
117    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
118    where
119        S: AvailabilityStorage<Types>,
120    {
121        storage.get_block(req).await
122    }
123}
124
125#[async_trait]
126impl<Types> RangedFetchable<Types> for BlockQueryData<Types>
127where
128    Types: NodeType,
129    Header<Types>: QueryableHeader<Types>,
130    Payload<Types>: QueryablePayload<Types>,
131{
132    type RangedRequest = BlockId<Types>;
133
134    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
135    where
136        S: AvailabilityStorage<Types>,
137        R: RangeBounds<usize> + Send + 'static,
138    {
139        storage.get_block_range(range).await
140    }
141}
142
143impl<Types> Storable<Types> for BlockQueryData<Types>
144where
145    Types: NodeType,
146{
147    fn debug_name(&self) -> String {
148        format!("block {}", self.height())
149    }
150
151    async fn notify(&self, notifiers: &Notifiers<Types>) {
152        notifiers.block.notify(self).await;
153    }
154
155    async fn store(
156        &self,
157        storage: &mut impl UpdateAvailabilityStorage<Types>,
158        leaf_only: bool,
159    ) -> anyhow::Result<()> {
160        if leaf_only {
161            return Ok(());
162        }
163
164        storage.insert_block(self).await
165    }
166}
167
168pub(super) fn fetch_block_with_header<Types, S, P>(
169    fetcher: Arc<Fetcher<Types, S, P>>,
170    header: Header<Types>,
171) where
172    Types: NodeType,
173    Header<Types>: QueryableHeader<Types>,
174    Payload<Types>: QueryablePayload<Types>,
175    S: VersionedDataSource + 'static,
176    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
177    P: AvailabilityProvider<Types>,
178{
179    let Some(payload_fetcher) = fetcher.payload_fetcher.as_ref() else {
180        // If we're in light-weight mode, we don't need to fetch the block data.
181        return;
182    };
183
184    // Now that we have the header, we only need to retrieve the payload.
185    tracing::info!(
186        "spawned active fetch for payload {:?} (height {})",
187        header.payload_commitment(),
188        header.block_number()
189    );
190    payload_fetcher.spawn_fetch(
191        PayloadRequest(header.payload_commitment()),
192        fetcher.provider.clone(),
193        once(PayloadCallback {
194            header,
195            fetcher: fetcher.clone(),
196        }),
197    );
198}
199
200#[async_trait]
201impl<Types> Fetchable<Types> for PayloadQueryData<Types>
202where
203    Types: NodeType,
204    Header<Types>: QueryableHeader<Types>,
205    Payload<Types>: QueryablePayload<Types>,
206{
207    type Request = BlockId<Types>;
208
209    fn satisfies(&self, req: Self::Request) -> bool {
210        match req {
211            BlockId::Number(n) => self.height() == n as u64,
212            BlockId::Hash(h) => self.block_hash() == h,
213            BlockId::PayloadHash(h) => self.hash() == h,
214        }
215    }
216
217    async fn passive_fetch(
218        notifiers: &Notifiers<Types>,
219        req: Self::Request,
220    ) -> BoxFuture<'static, Option<Self>> {
221        notifiers
222            .block
223            .wait_for(move |block| block.satisfies(req))
224            .await
225            .into_future()
226            .map(|block| block.map(PayloadQueryData::from))
227            .boxed()
228    }
229
230    async fn active_fetch<S, P>(
231        tx: &mut impl AvailabilityStorage<Types>,
232        fetcher: Arc<Fetcher<Types, S, P>>,
233        req: Self::Request,
234    ) -> anyhow::Result<()>
235    where
236        S: VersionedDataSource + 'static,
237        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
238        for<'a> S::ReadOnly<'a>:
239            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
240        P: AvailabilityProvider<Types>,
241    {
242        // We don't have storage for the payload alone, only the whole block. So if we need to fetch
243        // the payload, we just fetch the whole block (which may end up fetching only the payload,
244        // if that's all that's needed to complete the block).
245        BlockQueryData::active_fetch(tx, fetcher, req).await
246    }
247
248    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
249    where
250        S: AvailabilityStorage<Types>,
251    {
252        storage.get_payload(req).await
253    }
254}
255
256#[async_trait]
257impl<Types> RangedFetchable<Types> for PayloadQueryData<Types>
258where
259    Types: NodeType,
260    Header<Types>: QueryableHeader<Types>,
261    Payload<Types>: QueryablePayload<Types>,
262{
263    type RangedRequest = BlockId<Types>;
264
265    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
266    where
267        S: AvailabilityStorage<Types>,
268        R: RangeBounds<usize> + Send + 'static,
269    {
270        storage.get_payload_range(range).await
271    }
272}
273
274#[derive(Derivative)]
275#[derivative(Debug(bound = ""))]
276pub(super) struct PayloadCallback<Types: NodeType, S, P> {
277    header: Header<Types>,
278    #[derivative(Debug = "ignore")]
279    fetcher: Arc<Fetcher<Types, S, P>>,
280}
281
282impl<Types: NodeType, S, P> PartialEq for PayloadCallback<Types, S, P> {
283    fn eq(&self, other: &Self) -> bool {
284        self.cmp(other).is_eq()
285    }
286}
287
288impl<Types: NodeType, S, P> Eq for PayloadCallback<Types, S, P> {}
289
290impl<Types: NodeType, S, P> Ord for PayloadCallback<Types, S, P> {
291    fn cmp(&self, other: &Self) -> Ordering {
292        self.header.block_number().cmp(&other.header.block_number())
293    }
294}
295
296impl<Types: NodeType, S, P> PartialOrd for PayloadCallback<Types, S, P> {
297    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
298        Some(self.cmp(other))
299    }
300}
301
302impl<Types: NodeType, S, P> Callback<Payload<Types>> for PayloadCallback<Types, S, P>
303where
304    Header<Types>: QueryableHeader<Types>,
305    Payload<Types>: QueryablePayload<Types>,
306    S: 'static + VersionedDataSource,
307    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
308    P: AvailabilityProvider<Types>,
309{
310    async fn run(self, payload: Payload<Types>) {
311        tracing::info!("fetched payload {:?}", self.header.payload_commitment());
312        let block = BlockQueryData::new(self.header, payload);
313        self.fetcher.store_and_notify(&block).await;
314    }
315}
316
317#[async_trait]
318impl<Types> Fetchable<Types> for PayloadMetadata<Types>
319where
320    Types: NodeType,
321    Header<Types>: QueryableHeader<Types>,
322    Payload<Types>: QueryablePayload<Types>,
323{
324    type Request = BlockId<Types>;
325
326    fn satisfies(&self, req: Self::Request) -> bool {
327        match req {
328            BlockId::Number(n) => self.height == n as u64,
329            BlockId::Hash(h) => self.block_hash == h,
330            BlockId::PayloadHash(h) => self.hash == h,
331        }
332    }
333
334    async fn passive_fetch(
335        notifiers: &Notifiers<Types>,
336        req: Self::Request,
337    ) -> BoxFuture<'static, Option<Self>> {
338        notifiers
339            .block
340            .wait_for(move |block| block.satisfies(req))
341            .await
342            .into_future()
343            .map(|opt| opt.map(Self::from))
344            .boxed()
345    }
346
347    async fn active_fetch<S, P>(
348        tx: &mut impl AvailabilityStorage<Types>,
349        fetcher: Arc<Fetcher<Types, S, P>>,
350        req: Self::Request,
351    ) -> anyhow::Result<()>
352    where
353        S: VersionedDataSource + 'static,
354        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
355        for<'a> S::ReadOnly<'a>:
356            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
357        P: AvailabilityProvider<Types>,
358    {
359        // Trigger the full block to be fetched. This will be enough to satisfy this request for the
360        // payload summary.
361        BlockQueryData::active_fetch(tx, fetcher, req).await
362    }
363
364    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
365    where
366        S: AvailabilityStorage<Types>,
367    {
368        storage.get_payload_metadata(req).await
369    }
370}
371
372#[async_trait]
373impl<Types> RangedFetchable<Types> for PayloadMetadata<Types>
374where
375    Types: NodeType,
376    Header<Types>: QueryableHeader<Types>,
377    Payload<Types>: QueryablePayload<Types>,
378{
379    type RangedRequest = BlockId<Types>;
380
381    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
382    where
383        S: AvailabilityStorage<Types>,
384        R: RangeBounds<usize> + Send + 'static,
385    {
386        storage.get_payload_metadata_range(range).await
387    }
388}
389
390#[async_trait]
391impl<Types> Fetchable<Types> for NonEmptyRange<BlockQueryData<Types>>
392where
393    Types: NodeType,
394    Header<Types>: QueryableHeader<Types>,
395    Payload<Types>: QueryablePayload<Types>,
396{
397    type Request = RangeRequest;
398
399    fn satisfies(&self, req: Self::Request) -> bool {
400        req.is_satisfied(self)
401    }
402
403    async fn passive_fetch(
404        notifiers: &Notifiers<Types>,
405        req: Self::Request,
406    ) -> BoxFuture<'static, Option<Self>> {
407        let waits = join_all(req.into_iter().map(|i| {
408            notifiers
409                .block
410                .wait_for(move |block| block.satisfies(BlockId::Number(i as usize)))
411        }))
412        .await;
413
414        join_all(waits.into_iter().map(|wait| wait.into_future()))
415            .map(|options| NonEmptyRange::new(options.into_iter().flatten()).ok())
416            .boxed()
417    }
418
419    async fn active_fetch<S, P>(
420        tx: &mut impl AvailabilityStorage<Types>,
421        fetcher: Arc<Fetcher<Types, S, P>>,
422        req: Self::Request,
423    ) -> anyhow::Result<()>
424    where
425        S: VersionedDataSource + 'static,
426        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
427        for<'a> S::ReadOnly<'a>:
428            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
429        P: AvailabilityProvider<Types>,
430    {
431        fetch_header_range_and_then(tx, req, HeaderCallback::Payload { fetcher }).await
432    }
433
434    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
435    where
436        S: AvailabilityStorage<Types>,
437    {
438        let blocks = storage
439            .get_block_range((req.start as usize)..(req.end as usize))
440            .await?
441            .into_iter()
442            .collect::<QueryResult<Vec<_>>>()?;
443        if blocks.len() != req.len() {
444            tracing::debug!(
445                ?req,
446                len = blocks.len(),
447                "database returned partial result, unable to load full range"
448            );
449            return Err(QueryError::Missing);
450        }
451        NonEmptyRange::new(blocks).map_err(|err| QueryError::Error {
452            message: format!("expected contiguous range, but: {err:#}"),
453        })
454    }
455}
456
457impl<Types> Storable<Types> for NonEmptyRange<BlockQueryData<Types>>
458where
459    Types: NodeType,
460{
461    fn debug_name(&self) -> String {
462        format!("block range {}..{}", self.start(), self.end())
463    }
464
465    async fn notify(&self, notifiers: &Notifiers<Types>) {
466        for block in self {
467            notifiers.block.notify(block).await;
468        }
469    }
470
471    async fn store(
472        &self,
473        storage: &mut impl UpdateAvailabilityStorage<Types>,
474        leaf_only: bool,
475    ) -> anyhow::Result<()> {
476        if leaf_only {
477            return Ok(());
478        }
479
480        storage.insert_block_range(self).await
481    }
482}
483
484pub(super) fn fetch_block_range_with_headers<Types, S, P>(
485    fetcher: Arc<Fetcher<Types, S, P>>,
486    headers: NonEmptyRange<Header<Types>>,
487) where
488    Types: NodeType,
489    Header<Types>: QueryableHeader<Types>,
490    Payload<Types>: QueryablePayload<Types>,
491    S: VersionedDataSource + 'static,
492    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
493    P: AvailabilityProvider<Types>,
494{
495    let Some(payload_range_fetcher) = fetcher.payload_range_fetcher.as_ref() else {
496        // If we're in light-weight mode, we don't need to fetch the block data.
497        return;
498    };
499
500    // Now that we have the header, we only need to retrieve the payload.
501    tracing::info!(
502        "spawned active fetch for payload range {}..{}",
503        headers.start(),
504        headers.end()
505    );
506    payload_range_fetcher.spawn_fetch(
507        BlockRangeRequest::from_headers(&headers),
508        fetcher.provider.clone(),
509        once(BlockRangeCallback {
510            fetcher: fetcher.clone(),
511        }),
512    );
513}
514
515#[derive(Derivative)]
516#[derivative(Debug(bound = ""))]
517pub(super) struct BlockRangeCallback<Types: NodeType, S, P> {
518    #[derivative(Debug = "ignore")]
519    fetcher: Arc<Fetcher<Types, S, P>>,
520}
521
522impl<Types: NodeType, S, P> PartialEq for BlockRangeCallback<Types, S, P> {
523    fn eq(&self, other: &Self) -> bool {
524        self.cmp(other).is_eq()
525    }
526}
527
528impl<Types: NodeType, S, P> Eq for BlockRangeCallback<Types, S, P> {}
529
530impl<Types: NodeType, S, P> Ord for BlockRangeCallback<Types, S, P> {
531    fn cmp(&self, _: &Self) -> Ordering {
532        // All callbacks for a given block range request do the same thing: just store the range.
533        Ordering::Equal
534    }
535}
536
537impl<Types: NodeType, S, P> PartialOrd for BlockRangeCallback<Types, S, P> {
538    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
539        Some(self.cmp(other))
540    }
541}
542
543impl<Types: NodeType, S, P> Callback<NonEmptyRange<BlockQueryData<Types>>>
544    for BlockRangeCallback<Types, S, P>
545where
546    Header<Types>: QueryableHeader<Types>,
547    Payload<Types>: QueryablePayload<Types>,
548    S: 'static + VersionedDataSource,
549    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
550    P: AvailabilityProvider<Types>,
551{
552    async fn run(self, blocks: NonEmptyRange<BlockQueryData<Types>>) {
553        tracing::info!("fetched blocks {}..{}", blocks.start(), blocks.end());
554        self.fetcher.store_and_notify(&blocks).await;
555    }
556}