hotshot_query_service/data_source/fetching/
header.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Header fetching.
14
15use std::{cmp::Ordering, future::IntoFuture, sync::Arc};
16
17use anyhow::bail;
18use async_trait::async_trait;
19use committable::Committable;
20use derivative::Derivative;
21use futures::{FutureExt, future::BoxFuture};
22use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::NodeType};
23
24use super::{
25    AvailabilityProvider, Fetcher, block::fetch_block_with_header, leaf::fetch_leaf_with_callbacks,
26    vid::fetch_vid_common_with_header,
27};
28use crate::{
29    Header, Payload, QueryError, QueryResult,
30    availability::{BlockId, QueryableHeader, QueryablePayload},
31    data_source::{
32        fetching::{
33            Fetchable, HeaderQueryData, LeafQueryData, Notifiers,
34            block::fetch_block_range_with_headers,
35            leaf::{RangeRequest, fetch_leaf_range_with_callbacks},
36            vid::fetch_vid_common_range_with_headers,
37        },
38        storage::{
39            AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
40            pruning::PrunedHeightStorage,
41        },
42        update::VersionedDataSource,
43    },
44    fetching::NonEmptyRange,
45};
46
47impl<Types: NodeType> From<LeafQueryData<Types>> for HeaderQueryData<Types> {
48    fn from(leaf: LeafQueryData<Types>) -> Self {
49        let header = leaf.header().clone();
50
51        Self { header }
52    }
53}
54
55fn satisfies_header_req_from_leaf<Types>(leaf: &LeafQueryData<Types>, req: BlockId<Types>) -> bool
56where
57    Types: NodeType,
58    Header<Types>: QueryableHeader<Types>,
59    Payload<Types>: QueryablePayload<Types>,
60{
61    HeaderQueryData::satisfies(&HeaderQueryData::new(leaf.header().clone()), req)
62}
63
64#[async_trait]
65impl<Types> Fetchable<Types> for HeaderQueryData<Types>
66where
67    Types: NodeType,
68    Header<Types>: QueryableHeader<Types>,
69    Payload<Types>: QueryablePayload<Types>,
70{
71    type Request = BlockId<Types>;
72
73    fn satisfies(&self, req: Self::Request) -> bool {
74        let header = self.header();
75        match req {
76            BlockId::Number(n) => header.block_number() as usize == n,
77            BlockId::Hash(h) => header.commit() == h,
78            BlockId::PayloadHash(h) => header.payload_commitment() == h,
79        }
80    }
81
82    async fn passive_fetch(
83        notifiers: &Notifiers<Types>,
84        req: Self::Request,
85    ) -> BoxFuture<'static, Option<Self>> {
86        notifiers
87            .leaf
88            .wait_for(move |leaf| satisfies_header_req_from_leaf(leaf, req))
89            .await
90            .into_future()
91            .map(|leaf| leaf.map(HeaderQueryData::from))
92            .boxed()
93    }
94
95    async fn active_fetch<S, P>(
96        tx: &mut impl AvailabilityStorage<Types>,
97        fetcher: Arc<Fetcher<Types, S, P>>,
98        req: Self::Request,
99    ) -> anyhow::Result<()>
100    where
101        S: VersionedDataSource + 'static,
102        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
103        for<'a> S::ReadOnly<'a>:
104            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
105        P: AvailabilityProvider<Types>,
106    {
107        // Note: if leaf only mode is enabled
108        // the payload callback will not do any active fetching and just return
109        // This is because we don't have payload fetcher for leaf only mode
110        fetch_header_and_then(
111            tx,
112            req,
113            HeaderCallback::Payload {
114                fetcher: fetcher.clone(),
115            },
116        )
117        .await
118    }
119
120    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
121    where
122        S: AvailabilityStorage<Types>,
123    {
124        storage.get_header(req).await.map(|header| Self { header })
125    }
126}
127
128#[derive(Derivative)]
129#[derivative(Debug(bound = ""))]
130pub(super) enum HeaderCallback<Types, S, P>
131where
132    Types: NodeType,
133{
134    /// Callback when fetching the leaf in order to then look up the corresponding block.
135    Payload {
136        #[derivative(Debug = "ignore")]
137        fetcher: Arc<Fetcher<Types, S, P>>,
138    },
139    /// Callback when fetching the leaf in order to then look up the corresponding VID common data.
140    VidCommon {
141        #[derivative(Debug = "ignore")]
142        fetcher: Arc<Fetcher<Types, S, P>>,
143    },
144}
145
146impl<Types: NodeType, S, P> PartialEq for HeaderCallback<Types, S, P> {
147    fn eq(&self, other: &Self) -> bool {
148        self.cmp(other).is_eq()
149    }
150}
151
152impl<Types: NodeType, S, P> Eq for HeaderCallback<Types, S, P> {}
153
154impl<Types: NodeType, S, P> PartialOrd for HeaderCallback<Types, S, P> {
155    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
156        Some(self.cmp(other))
157    }
158}
159
160impl<Types: NodeType, S, P> Ord for HeaderCallback<Types, S, P> {
161    fn cmp(&self, other: &Self) -> Ordering {
162        // Arbitrarily, we choose to run payload callbacks before VID callbacks.
163        match (self, other) {
164            (Self::Payload { .. }, Self::VidCommon { .. }) => Ordering::Less,
165            (Self::VidCommon { .. }, Self::Payload { .. }) => Ordering::Greater,
166            _ => Ordering::Equal,
167        }
168    }
169}
170
171impl<Types, S, P> HeaderCallback<Types, S, P>
172where
173    Types: NodeType,
174    Header<Types>: QueryableHeader<Types>,
175    Payload<Types>: QueryablePayload<Types>,
176    S: VersionedDataSource + 'static,
177    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
178    P: AvailabilityProvider<Types>,
179{
180    fn fetcher(&self) -> Arc<Fetcher<Types, S, P>> {
181        match self {
182            Self::Payload { fetcher } => fetcher.clone(),
183            Self::VidCommon { fetcher } => fetcher.clone(),
184        }
185    }
186
187    pub(super) fn run(self, header: Header<Types>) {
188        match self {
189            Self::Payload { fetcher } => {
190                tracing::info!(
191                    "fetched leaf {}, will now fetch payload",
192                    header.block_number()
193                );
194                fetch_block_with_header(fetcher, header);
195            },
196            Self::VidCommon { fetcher } => {
197                tracing::info!(
198                    "fetched leaf {}, will now fetch VID common",
199                    header.block_number()
200                );
201                fetch_vid_common_with_header(fetcher, header);
202            },
203        }
204    }
205
206    pub(super) fn run_range(self, headers: NonEmptyRange<Header<Types>>) {
207        match self {
208            Self::Payload { fetcher } => {
209                tracing::info!(
210                    "fetched leaves {}..{}, will now fetch payload",
211                    headers.start(),
212                    headers.end(),
213                );
214                fetch_block_range_with_headers(fetcher, headers);
215            },
216            Self::VidCommon { fetcher } => {
217                tracing::info!(
218                    "fetched leaves {}..{}, will now fetch VID common",
219                    headers.start(),
220                    headers.end(),
221                );
222                fetch_vid_common_range_with_headers(fetcher, headers);
223            },
224        }
225    }
226}
227
228pub(super) async fn fetch_header_and_then<Types, S, P>(
229    tx: &mut impl AvailabilityStorage<Types>,
230    req: BlockId<Types>,
231    callback: HeaderCallback<Types, S, P>,
232) -> anyhow::Result<()>
233where
234    Types: NodeType,
235    Header<Types>: QueryableHeader<Types>,
236    Payload<Types>: QueryablePayload<Types>,
237    S: VersionedDataSource + 'static,
238    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
239    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
240    P: AvailabilityProvider<Types>,
241{
242    // Check if at least the header is available in local storage. If it is, we benefit two ways:
243    // 1. We know for sure the corresponding block exists, so we can unconditionally trigger an
244    //    active fetch without unnecessarily bothering our peers.
245    // 2. We only need to fetch the missing data (e.g. payload or VID common), not the full block.
246    //    Not only is this marginally less data to download, there are some providers that may only
247    //    be able to provide certain data. For example, the HotShot DA committee members may be able
248    //    to provide paylaods, but not full blocks. Or, in the case where VID recovery is needed,
249    //    the VID common data may be available but the full block may not exist anywhere.
250    match tx.get_header(req).await {
251        Ok(header) => {
252            callback.run(header);
253            return Ok(());
254        },
255        Err(QueryError::Missing | QueryError::NotFound) => {
256            // We successfully queried the database, but the header wasn't there. Fall through to
257            // fetching it.
258            tracing::debug!(?req, "header not available locally; trying fetch");
259        },
260        Err(QueryError::Error { message }) => {
261            // An error occurred while querying the database. We don't know if we need to fetch the
262            // header or not. Return an error so we can try again.
263            bail!("failed to fetch header for block {req:?}: {message}");
264        },
265    }
266
267    // If the header is _not_ present, we may still be able to fetch the request, but we need to
268    // fetch the header (in fact, the entire leaf) first. This is because we have an invariant that
269    // we should not store derived objects in the database unless we already have the corresponding
270    // header and leaf.
271    match req {
272        BlockId::Number(n) => {
273            fetch_leaf_with_callbacks(tx, callback.fetcher(), n.into(), [callback.into()]).await?;
274        },
275        BlockId::Hash(h) => {
276            // Given only the hash, we cannot tell if the corresponding leaf actually exists, since
277            // we don't have a corresponding header. Therefore, we will not spawn an active fetch.
278            tracing::debug!("not fetching unknown block {h}");
279        },
280        BlockId::PayloadHash(h) => {
281            // Same as above, we don't fetch a block with a payload that is not known to exist.
282            tracing::debug!("not fetching block with unknown payload {h}");
283        },
284    }
285
286    Ok(())
287}
288
289pub(super) async fn fetch_header_range_and_then<Types, S, P>(
290    tx: &mut impl AvailabilityStorage<Types>,
291    req: RangeRequest,
292    callback: HeaderCallback<Types, S, P>,
293) -> anyhow::Result<()>
294where
295    Types: NodeType,
296    Header<Types>: QueryableHeader<Types>,
297    Payload<Types>: QueryablePayload<Types>,
298    S: VersionedDataSource + 'static,
299    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
300    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
301    P: AvailabilityProvider<Types>,
302{
303    // Check if at least the headers are available in local storage.
304    match <NonEmptyRange<LeafQueryData<Types>>>::load(tx, req).await {
305        Ok(leaves) => {
306            callback.run_range(leaves.as_ref_cloned());
307            return Ok(());
308        },
309        Err(QueryError::Missing | QueryError::NotFound) => {
310            // We successfully queried the database, but at least one header wasn't there. Fall
311            // through to fetching it.
312            tracing::debug!(?req, "headers not available locally; trying fetch");
313        },
314        Err(QueryError::Error { message }) => {
315            // An error occurred while querying the database. We don't know if we need to fetch the
316            // headers or not. Return an error so we can try again.
317            bail!("failed to fetch headers for range {req:?}: {message}");
318        },
319    }
320
321    // Fetch the headers (in fact, the entire leaves) first, then fetch the remaining payload data.
322    fetch_leaf_range_with_callbacks(tx, callback.fetcher(), req, [callback.into()]).await?;
323
324    Ok(())
325}