Skip to main content

hotshot_query_service/data_source/fetching/
header.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Header fetching.
14
15use std::{cmp::Ordering, future::IntoFuture, sync::Arc};
16
17use anyhow::bail;
18use async_trait::async_trait;
19use committable::Committable;
20use derivative::Derivative;
21use futures::{FutureExt, future::BoxFuture};
22use hotshot_types::traits::{block_contents::BlockHeader, node_implementation::NodeType};
23
24use super::{
25    AvailabilityProvider, Fetcher, block::fetch_block_with_header, leaf::fetch_leaf_with_callbacks,
26    vid::fetch_vid_common_with_header,
27};
28use crate::{
29    Header, Payload, QueryError, QueryResult,
30    availability::{BlockId, QueryableHeader, QueryablePayload},
31    data_source::{
32        fetching::{
33            Fetchable, HeaderQueryData, LeafQueryData, Notifiers,
34            block::fetch_block_range,
35            leaf::{RangeRequest, fetch_leaf_range_with_callbacks},
36            vid::fetch_vid_common_range,
37        },
38        storage::{
39            AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
40            pruning::PrunedHeightStorage,
41        },
42        update::VersionedDataSource,
43    },
44    fetching::NonEmptyRange,
45};
46
47fn satisfies_header_req_from_leaf<Types>(leaf: &LeafQueryData<Types>, req: BlockId<Types>) -> bool
48where
49    Types: NodeType,
50    Header<Types>: QueryableHeader<Types>,
51    Payload<Types>: QueryablePayload<Types>,
52{
53    HeaderQueryData::satisfies(&HeaderQueryData::new(leaf.header().clone()), req)
54}
55
56#[async_trait]
57impl<Types> Fetchable<Types> for HeaderQueryData<Types>
58where
59    Types: NodeType,
60    Header<Types>: QueryableHeader<Types>,
61    Payload<Types>: QueryablePayload<Types>,
62{
63    type Request = BlockId<Types>;
64
65    fn satisfies(&self, req: Self::Request) -> bool {
66        let header = self.header();
67        match req {
68            BlockId::Number(n) => header.block_number() as usize == n,
69            BlockId::Hash(h) => header.commit() == h,
70            BlockId::PayloadHash(h) => header.payload_commitment() == h,
71        }
72    }
73
74    async fn passive_fetch(
75        notifiers: &Notifiers<Types>,
76        req: Self::Request,
77    ) -> BoxFuture<'static, Option<Self>> {
78        notifiers
79            .leaf
80            .wait_for(move |leaf| satisfies_header_req_from_leaf(leaf, req))
81            .await
82            .into_future()
83            .map(|leaf| leaf.map(HeaderQueryData::from))
84            .boxed()
85    }
86
87    async fn active_fetch<S, P>(
88        tx: &mut impl AvailabilityStorage<Types>,
89        fetcher: Arc<Fetcher<Types, S, P>>,
90        req: Self::Request,
91    ) -> anyhow::Result<()>
92    where
93        S: VersionedDataSource + 'static,
94        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
95        for<'a> S::ReadOnly<'a>:
96            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
97        P: AvailabilityProvider<Types>,
98    {
99        // Note: if leaf only mode is enabled
100        // the payload callback will not do any active fetching and just return
101        // This is because we don't have payload fetcher for leaf only mode
102        fetch_header_and_then(
103            tx,
104            req,
105            HeaderCallback::Payload {
106                fetcher: fetcher.clone(),
107            },
108        )
109        .await
110    }
111
112    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
113    where
114        S: AvailabilityStorage<Types>,
115    {
116        storage.get_header(req).await.map(|header| Self { header })
117    }
118}
119
120#[derive(Derivative)]
121#[derivative(Debug(bound = ""))]
122pub(super) enum HeaderCallback<Types, S, P>
123where
124    Types: NodeType,
125{
126    /// Callback when fetching the leaf in order to then look up the corresponding block.
127    Payload {
128        #[derivative(Debug = "ignore")]
129        fetcher: Arc<Fetcher<Types, S, P>>,
130    },
131    /// Callback when fetching the leaf in order to then look up the corresponding VID common data.
132    VidCommon {
133        #[derivative(Debug = "ignore")]
134        fetcher: Arc<Fetcher<Types, S, P>>,
135    },
136}
137
138impl<Types: NodeType, S, P> PartialEq for HeaderCallback<Types, S, P> {
139    fn eq(&self, other: &Self) -> bool {
140        self.cmp(other).is_eq()
141    }
142}
143
144impl<Types: NodeType, S, P> Eq for HeaderCallback<Types, S, P> {}
145
146impl<Types: NodeType, S, P> PartialOrd for HeaderCallback<Types, S, P> {
147    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
148        Some(self.cmp(other))
149    }
150}
151
152impl<Types: NodeType, S, P> Ord for HeaderCallback<Types, S, P> {
153    fn cmp(&self, other: &Self) -> Ordering {
154        // Arbitrarily, we choose to run payload callbacks before VID callbacks.
155        match (self, other) {
156            (Self::Payload { .. }, Self::VidCommon { .. }) => Ordering::Less,
157            (Self::VidCommon { .. }, Self::Payload { .. }) => Ordering::Greater,
158            _ => Ordering::Equal,
159        }
160    }
161}
162
163impl<Types, S, P> HeaderCallback<Types, S, P>
164where
165    Types: NodeType,
166    Header<Types>: QueryableHeader<Types>,
167    Payload<Types>: QueryablePayload<Types>,
168    S: VersionedDataSource + 'static,
169    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
170    P: AvailabilityProvider<Types>,
171{
172    fn fetcher(&self) -> Arc<Fetcher<Types, S, P>> {
173        match self {
174            Self::Payload { fetcher } => fetcher.clone(),
175            Self::VidCommon { fetcher } => fetcher.clone(),
176        }
177    }
178
179    pub(super) fn run(self, header: Header<Types>) {
180        match self {
181            Self::Payload { fetcher } => {
182                tracing::info!(
183                    "fetched leaf {}, will now fetch payload",
184                    header.block_number()
185                );
186                fetch_block_with_header(fetcher, header);
187            },
188            Self::VidCommon { fetcher } => {
189                tracing::info!(
190                    "fetched leaf {}, will now fetch VID common",
191                    header.block_number()
192                );
193                fetch_vid_common_with_header(fetcher, header);
194            },
195        }
196    }
197
198    pub(super) fn run_range(self, start: u64, end: u64) {
199        match self {
200            Self::Payload { fetcher } => {
201                tracing::info!("fetched leaves {start}..{end}, will now fetch payload",);
202                fetch_block_range(fetcher, start, end);
203            },
204            Self::VidCommon { fetcher } => {
205                tracing::info!("fetched leaves {start}..{end}, will now fetch VID common",);
206                fetch_vid_common_range(fetcher, start, end);
207            },
208        }
209    }
210}
211
212pub(super) async fn fetch_header_and_then<Types, S, P>(
213    tx: &mut impl AvailabilityStorage<Types>,
214    req: BlockId<Types>,
215    callback: HeaderCallback<Types, S, P>,
216) -> anyhow::Result<()>
217where
218    Types: NodeType,
219    Header<Types>: QueryableHeader<Types>,
220    Payload<Types>: QueryablePayload<Types>,
221    S: VersionedDataSource + 'static,
222    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
223    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
224    P: AvailabilityProvider<Types>,
225{
226    // Check if at least the header is available in local storage. If it is, we benefit two ways:
227    // 1. We know for sure the corresponding block exists, so we can unconditionally trigger an
228    //    active fetch without unnecessarily bothering our peers.
229    // 2. We only need to fetch the missing data (e.g. payload or VID common), not the full block.
230    //    Not only is this marginally less data to download, there are some providers that may only
231    //    be able to provide certain data. For example, the HotShot DA committee members may be able
232    //    to provide paylaods, but not full blocks. Or, in the case where VID recovery is needed,
233    //    the VID common data may be available but the full block may not exist anywhere.
234    match tx.get_header(req).await {
235        Ok(header) => {
236            callback.run(header);
237            return Ok(());
238        },
239        Err(QueryError::Missing | QueryError::NotFound) => {
240            // We successfully queried the database, but the header wasn't there. Fall through to
241            // fetching it.
242            tracing::debug!(?req, "header not available locally; trying fetch");
243        },
244        Err(QueryError::Error { message }) => {
245            // An error occurred while querying the database. We don't know if we need to fetch the
246            // header or not. Return an error so we can try again.
247            bail!("failed to fetch header for block {req:?}: {message}");
248        },
249    }
250
251    // If the header is _not_ present, we may still be able to fetch the request, but we need to
252    // fetch the header (in fact, the entire leaf) first. This is because we have an invariant that
253    // we should not store derived objects in the database unless we already have the corresponding
254    // header and leaf.
255    match req {
256        BlockId::Number(n) => {
257            fetch_leaf_with_callbacks(callback.fetcher(), n.into(), [callback.into()]).await?;
258        },
259        BlockId::Hash(h) => {
260            // Given only the hash, we cannot tell if the corresponding leaf actually exists, since
261            // we don't have a corresponding header. Therefore, we will not spawn an active fetch.
262            tracing::debug!("not fetching unknown block {h}");
263        },
264        BlockId::PayloadHash(h) => {
265            // Same as above, we don't fetch a block with a payload that is not known to exist.
266            tracing::debug!("not fetching block with unknown payload {h}");
267        },
268    }
269
270    Ok(())
271}
272
273pub(super) async fn fetch_header_range_and_then<Types, S, P>(
274    tx: &mut impl AvailabilityStorage<Types>,
275    req: RangeRequest,
276    callback: HeaderCallback<Types, S, P>,
277) -> anyhow::Result<()>
278where
279    Types: NodeType,
280    Header<Types>: QueryableHeader<Types>,
281    Payload<Types>: QueryablePayload<Types>,
282    S: VersionedDataSource + 'static,
283    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
284    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
285    P: AvailabilityProvider<Types>,
286{
287    // Check if the headers are already available in local storage; then we only need to fetch the
288    // payload data.
289    match <NonEmptyRange<LeafQueryData<Types>>>::load(tx, req).await {
290        Ok(_) => {
291            callback.run_range(req.start, req.end);
292            return Ok(());
293        },
294        Err(QueryError::Missing | QueryError::NotFound) => {
295            // We successfully queried the database, but at least one header wasn't there. Fall
296            // through to fetching it.
297            tracing::debug!(?req, "headers not available locally; trying fetch");
298        },
299        Err(QueryError::Error { message }) => {
300            // An error occurred while querying the database. We don't know if we need to fetch the
301            // headers or not. Return an error so we can try again.
302            bail!("failed to fetch headers for range {req:?}: {message}");
303        },
304    }
305
306    // Fetch the headers (in fact, the entire leaves) first, then fetch the remaining payload data.
307    fetch_leaf_range_with_callbacks(callback.fetcher(), req, [callback.into()]).await?;
308
309    Ok(())
310}