Skip to main content

hotshot_query_service/data_source/fetching/
leaf.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! [`Fetchable`] implementation for [`LeafQueryData`].
14
15use std::{
16    cmp::Ordering,
17    fmt::Debug,
18    future::IntoFuture,
19    iter::once,
20    ops::{Range, RangeBounds},
21    sync::Arc,
22};
23
24use async_trait::async_trait;
25use derivative::Derivative;
26use derive_more::From;
27use futures::future::{BoxFuture, FutureExt, join_all};
28use hotshot_types::traits::node_implementation::NodeType;
29use tokio::spawn;
30use tracing::Instrument;
31
32use super::{
33    AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
34    Storable, header::HeaderCallback,
35};
36use crate::{
37    Header, Payload, QueryError, QueryResult,
38    availability::{LeafId, LeafQueryData, QueryableHeader, QueryablePayload},
39    data_source::{
40        VersionedDataSource,
41        storage::{
42            AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
43            pruning::PrunedHeightStorage,
44        },
45    },
46    fetching::{
47        self, Callback, NonEmptyRange,
48        request::{self, LeafRangeRequest},
49    },
50    types::HeightIndexed,
51};
52
53pub(super) type LeafFetcher<Types, S, P> =
54    fetching::Fetcher<request::LeafRequest, LeafCallback<Types, S, P>>;
55
56pub(super) type LeafRangeFetcher<Types, S, P> =
57    fetching::Fetcher<LeafRangeRequest, LeafCallback<Types, S, P>>;
58
59impl<Types> FetchRequest for LeafId<Types>
60where
61    Types: NodeType,
62{
63    fn might_exist(self, heights: Heights) -> bool {
64        if let LeafId::Number(n) = self {
65            heights.might_exist(n as u64)
66        } else {
67            true
68        }
69    }
70}
71
72#[async_trait]
73impl<Types> Fetchable<Types> for LeafQueryData<Types>
74where
75    Types: NodeType,
76    Header<Types>: QueryableHeader<Types>,
77    Payload<Types>: QueryablePayload<Types>,
78{
79    type Request = LeafId<Types>;
80
81    fn satisfies(&self, req: Self::Request) -> bool {
82        match req {
83            LeafId::Number(n) => self.height() == n as u64,
84            LeafId::Hash(h) => self.hash() == h,
85        }
86    }
87
88    async fn passive_fetch(
89        notifiers: &Notifiers<Types>,
90        req: Self::Request,
91    ) -> BoxFuture<'static, Option<Self>> {
92        notifiers
93            .leaf
94            .wait_for(move |leaf| leaf.satisfies(req))
95            .await
96            .into_future()
97            .boxed()
98    }
99
100    async fn active_fetch<S, P>(
101        _tx: &mut impl AvailabilityStorage<Types>,
102        fetcher: Arc<Fetcher<Types, S, P>>,
103        req: Self::Request,
104    ) -> anyhow::Result<()>
105    where
106        S: VersionedDataSource + 'static,
107        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
108        for<'a> S::ReadOnly<'a>:
109            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
110        P: AvailabilityProvider<Types>,
111    {
112        fetch_leaf_with_callbacks(fetcher, req, None).await
113    }
114
115    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
116    where
117        S: AvailabilityStorage<Types>,
118    {
119        storage.get_leaf(req).await
120    }
121}
122
123pub(super) async fn fetch_leaf_with_callbacks<Types, S, P, I>(
124    fetcher: Arc<Fetcher<Types, S, P>>,
125    req: LeafId<Types>,
126    callbacks: I,
127) -> anyhow::Result<()>
128where
129    Types: NodeType,
130    Header<Types>: QueryableHeader<Types>,
131    Payload<Types>: QueryablePayload<Types>,
132    S: VersionedDataSource + 'static,
133    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
134    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
135    P: AvailabilityProvider<Types>,
136    I: IntoIterator<Item = LeafCallback<Types, S, P>> + Send + 'static,
137    I::IntoIter: Send,
138{
139    match req {
140        LeafId::Number(n) => {
141            let fetcher = fetcher.clone();
142            fetcher.leaf_fetcher.clone().spawn_fetch(
143                request::LeafRequest::new(n as u64),
144                fetcher.provider.clone(),
145                once(LeafCallback::Leaf { fetcher }).chain(callbacks),
146            );
147        },
148        LeafId::Hash(h) => {
149            // We don't actively fetch leaves when requested by hash, because we have no way of
150            // knowing whether a leaf with such a hash actually exists, and we don't want to bother
151            // peers with requests for non-existent leaves.
152            tracing::debug!("not fetching unknown leaf {h}");
153        },
154    }
155
156    Ok(())
157}
158
159/// Trigger a fetch of the parent of the given `leaf`, if it is missing.
160///
161/// This ensures that a passive fetch for a leaf will always resolve eventually, even if we miss the
162/// decide event for that leaf: we will eventually get a decide for a later leaf, and then fetch the
163/// chain backwards.
164pub(super) fn trigger_fetch_for_parent<Types, S, P>(
165    fetcher: &Arc<Fetcher<Types, S, P>>,
166    leaf: &LeafQueryData<Types>,
167) where
168    Types: NodeType,
169    Header<Types>: QueryableHeader<Types>,
170    Payload<Types>: QueryablePayload<Types>,
171    S: VersionedDataSource + 'static,
172    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
173    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
174    P: AvailabilityProvider<Types>,
175{
176    let height = leaf.height();
177
178    // Check that there is a parent to fetch.
179    if height == 0 {
180        return;
181    }
182
183    // Spawn an async task; we're triggering a fire-and-forget fetch of a leaf that might now be
184    // available; we don't need to block the caller on this.
185    let fetcher = fetcher.clone();
186    let span = tracing::info_span!("fetch parent leaf", height);
187    spawn(
188        async move {
189            // Check if we already have the parent.
190            match fetcher.storage.read().await {
191                Ok(mut tx) => {
192                    // Don't bother fetching a pruned leaf.
193                    if let Ok(pruned_height) = tx.load_pruned_height().await
194                        && pruned_height.is_some_and(|ph| height <= ph)
195                    {
196                        tracing::info!(height, ?pruned_height, "not fetching pruned parent leaf");
197                        return;
198                    }
199
200                    if tx.get_leaf(((height - 1) as usize).into()).await.is_ok() {
201                        return;
202                    }
203                },
204                Err(err) => {
205                    // If we can't open a transaction, we can't be sure that we already have the
206                    // parent, so we fall through to fetching it just to be safe.
207                    tracing::warn!(
208                        height,
209                        "error opening transaction to check for parent leaf: {err:#}",
210                    );
211                },
212            }
213
214            tracing::info!(height, "received new leaf; fetching missing parent");
215            fetcher.leaf_fetcher.clone().spawn_fetch(
216                request::LeafRequest::new(height - 1),
217                fetcher.provider.clone(),
218                // After getting the leaf, grab the other data as well; that will be missing
219                // whenever the leaf was.
220                [
221                    LeafCallback::Leaf {
222                        fetcher: fetcher.clone(),
223                    },
224                    HeaderCallback::Payload {
225                        fetcher: fetcher.clone(),
226                    }
227                    .into(),
228                    HeaderCallback::VidCommon {
229                        fetcher: fetcher.clone(),
230                    }
231                    .into(),
232                ],
233            );
234        }
235        .instrument(span),
236    );
237}
238
239#[async_trait]
240impl<Types> RangedFetchable<Types> for LeafQueryData<Types>
241where
242    Types: NodeType,
243    Header<Types>: QueryableHeader<Types>,
244    Payload<Types>: QueryablePayload<Types>,
245{
246    type RangedRequest = LeafId<Types>;
247
248    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
249    where
250        S: AvailabilityStorage<Types>,
251        R: RangeBounds<usize> + Send + 'static,
252    {
253        storage.get_leaf_range(range).await
254    }
255}
256
257impl<Types> Storable<Types> for LeafQueryData<Types>
258where
259    Types: NodeType,
260{
261    fn debug_name(&self) -> String {
262        format!("leaf {}", self.height())
263    }
264
265    async fn notify(&self, notifiers: &Notifiers<Types>) {
266        notifiers.leaf.notify(self).await;
267    }
268
269    async fn store(
270        &self,
271        storage: &mut impl UpdateAvailabilityStorage<Types>,
272        _leaf_only: bool,
273    ) -> anyhow::Result<()> {
274        storage.insert_leaf(self).await
275    }
276}
277
278#[derive(Derivative, From)]
279#[derivative(Debug(bound = ""))]
280pub(super) enum LeafCallback<Types: NodeType, S, P> {
281    /// Callback when fetching the leaf for its own sake.
282    #[from(ignore)]
283    Leaf {
284        #[derivative(Debug = "ignore")]
285        fetcher: Arc<Fetcher<Types, S, P>>,
286    },
287    /// Callback when fetching the leaf in order to then look up something else.
288    Continuation {
289        callback: HeaderCallback<Types, S, P>,
290    },
291}
292
293impl<Types: NodeType, S, P> PartialEq for LeafCallback<Types, S, P> {
294    fn eq(&self, other: &Self) -> bool {
295        self.cmp(other).is_eq()
296    }
297}
298
299impl<Types: NodeType, S, P> Eq for LeafCallback<Types, S, P> {}
300
301impl<Types: NodeType, S, P> Ord for LeafCallback<Types, S, P> {
302    fn cmp(&self, other: &Self) -> Ordering {
303        match (self, other) {
304            // Store leaves in the database before storing derived objects.
305            (Self::Leaf { .. }, Self::Continuation { .. }) => Ordering::Less,
306            (Self::Continuation { .. }, Self::Leaf { .. }) => Ordering::Greater,
307
308            (Self::Continuation { callback: cb1 }, Self::Continuation { callback: cb2 }) => {
309                cb1.cmp(cb2)
310            },
311            _ => Ordering::Equal,
312        }
313    }
314}
315
316impl<Types: NodeType, S, P> PartialOrd for LeafCallback<Types, S, P> {
317    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
318        Some(self.cmp(other))
319    }
320}
321
322impl<Types: NodeType, S, P> Callback<LeafQueryData<Types>> for LeafCallback<Types, S, P>
323where
324    Header<Types>: QueryableHeader<Types>,
325    Payload<Types>: QueryablePayload<Types>,
326    S: VersionedDataSource + 'static,
327    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
328    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
329    P: AvailabilityProvider<Types>,
330{
331    async fn run(self, leaf: LeafQueryData<Types>) {
332        match self {
333            Self::Leaf { fetcher } => {
334                tracing::info!("fetched leaf {}", leaf.height());
335                // Trigger a fetch of the parent leaf, if we don't already have it.
336                trigger_fetch_for_parent(&fetcher, &leaf);
337                fetcher.store_and_notify(&leaf).await;
338            },
339            Self::Continuation { callback } => callback.run(leaf.leaf.block_header().clone()),
340        }
341    }
342}
343
344impl<Types: NodeType, S, P> Callback<NonEmptyRange<LeafQueryData<Types>>>
345    for LeafCallback<Types, S, P>
346where
347    Header<Types>: QueryableHeader<Types>,
348    Payload<Types>: QueryablePayload<Types>,
349    S: VersionedDataSource + 'static,
350    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
351    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
352    P: AvailabilityProvider<Types>,
353{
354    async fn run(self, leaves: NonEmptyRange<LeafQueryData<Types>>) {
355        match self {
356            Self::Leaf { fetcher } => {
357                tracing::info!("fetched leaf range {}..{}", leaves.start(), leaves.end());
358                fetcher.store_and_notify(&leaves).await;
359
360                // Unlike in the singular leaf version of this callback, we do not call
361                // `trigger_fetch_for_parent` to start a potential chain reaction for a
362                // contiguous range of leaves. This is because we have already just fetched a
363                // contiguous range, and if we are currently bulk fetching, it is more efficient to
364                // continue bulk fetching, rather than kick of a chain reaction of individual
365                // fetches, which will end up fetching the same data, slower.
366            },
367            Self::Continuation { callback } => callback.run_range(leaves.start(), leaves.end()),
368        }
369    }
370}
371
372/// A request for a semi-open range of height-indexed objects.
373#[derive(Clone, Copy, Debug)]
374pub struct RangeRequest {
375    pub start: u64,
376    pub end: u64,
377}
378
379impl IntoIterator for RangeRequest {
380    type Item = u64;
381    type IntoIter = Range<u64>;
382
383    fn into_iter(self) -> Self::IntoIter {
384        self.start..self.end
385    }
386}
387
388impl FetchRequest for RangeRequest {
389    fn might_exist(self, heights: Heights) -> bool {
390        heights.pruned_height.is_none_or(|h| h < self.start) && self.end < heights.height
391    }
392}
393
394impl RangeRequest {
395    pub fn is_satisfied(&self, range: &NonEmptyRange<impl HeightIndexed>) -> bool {
396        range.start() == self.start && range.end() == self.end
397    }
398
399    pub fn len(&self) -> usize {
400        (self.end - self.start) as usize
401    }
402}
403
404#[async_trait]
405impl<Types> Fetchable<Types> for NonEmptyRange<LeafQueryData<Types>>
406where
407    Types: NodeType,
408    Header<Types>: QueryableHeader<Types>,
409    Payload<Types>: QueryablePayload<Types>,
410{
411    type Request = RangeRequest;
412
413    fn satisfies(&self, req: Self::Request) -> bool {
414        req.is_satisfied(self)
415    }
416
417    async fn passive_fetch(
418        notifiers: &Notifiers<Types>,
419        req: Self::Request,
420    ) -> BoxFuture<'static, Option<Self>> {
421        let waits = join_all(req.into_iter().map(|i| {
422            notifiers
423                .leaf
424                .wait_for(move |leaf| leaf.satisfies(LeafId::Number(i as usize)))
425        }))
426        .await;
427
428        join_all(waits.into_iter().map(|wait| wait.into_future()))
429            .map(|options| NonEmptyRange::new(options.into_iter().flatten()).ok())
430            .boxed()
431    }
432
433    async fn active_fetch<S, P>(
434        _tx: &mut impl AvailabilityStorage<Types>,
435        fetcher: Arc<Fetcher<Types, S, P>>,
436        req: Self::Request,
437    ) -> anyhow::Result<()>
438    where
439        S: VersionedDataSource + 'static,
440        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
441        for<'a> S::ReadOnly<'a>:
442            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
443        P: AvailabilityProvider<Types>,
444    {
445        fetch_leaf_range_with_callbacks(fetcher, req, None).await
446    }
447
448    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
449    where
450        S: AvailabilityStorage<Types>,
451    {
452        let leaves = storage
453            .get_leaf_range((req.start as usize)..(req.end as usize))
454            .await?
455            .into_iter()
456            .collect::<QueryResult<Vec<_>>>()?;
457        if leaves.len() != req.len() {
458            tracing::debug!(
459                ?req,
460                len = leaves.len(),
461                "database returned partial result, unable to load full range"
462            );
463            return Err(QueryError::Missing);
464        }
465        NonEmptyRange::new(leaves).map_err(|err| QueryError::Error {
466            message: format!("expected contiguous range, but: {err:#}"),
467        })
468    }
469}
470
471impl<Types> Storable<Types> for NonEmptyRange<LeafQueryData<Types>>
472where
473    Types: NodeType,
474{
475    fn debug_name(&self) -> String {
476        format!("leaf range {}..{}", self.start(), self.end())
477    }
478
479    async fn notify(&self, notifiers: &Notifiers<Types>) {
480        for leaf in self {
481            notifiers.leaf.notify(leaf).await;
482        }
483    }
484
485    async fn store(
486        &self,
487        storage: &mut impl UpdateAvailabilityStorage<Types>,
488        _leaf_only: bool,
489    ) -> anyhow::Result<()> {
490        storage.insert_leaf_range(self).await
491    }
492}
493
494pub(super) async fn fetch_leaf_range_with_callbacks<Types, S, P, I>(
495    fetcher: Arc<Fetcher<Types, S, P>>,
496    req: RangeRequest,
497    callbacks: I,
498) -> anyhow::Result<()>
499where
500    Types: NodeType,
501    Header<Types>: QueryableHeader<Types>,
502    Payload<Types>: QueryablePayload<Types>,
503    S: VersionedDataSource + 'static,
504    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
505    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
506    P: AvailabilityProvider<Types>,
507    I: IntoIterator<Item = LeafCallback<Types, S, P>> + Send + 'static,
508    I::IntoIter: Send,
509{
510    let fetcher = fetcher.clone();
511    fetcher.leaf_range_fetcher.clone().spawn_fetch(
512        LeafRangeRequest {
513            start: req.start,
514            end: req.end,
515        },
516        fetcher.provider.clone(),
517        once(LeafCallback::Leaf { fetcher }).chain(callbacks),
518    );
519
520    Ok(())
521}