hotshot_query_service/data_source/fetching/
leaf.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! [`Fetchable`] implementation for [`LeafQueryData`].
14
15use std::{
16    cmp::Ordering,
17    fmt::Debug,
18    future::IntoFuture,
19    iter::once,
20    ops::{Range, RangeBounds},
21    sync::Arc,
22};
23
24use anyhow::bail;
25use async_trait::async_trait;
26use committable::Committable;
27use derivative::Derivative;
28use derive_more::From;
29use futures::future::{BoxFuture, FutureExt, join_all};
30use hotshot_types::traits::node_implementation::NodeType;
31use tokio::spawn;
32use tracing::Instrument;
33
34use super::{
35    AvailabilityProvider, FetchRequest, Fetchable, Fetcher, Heights, Notifiers, RangedFetchable,
36    Storable, header::HeaderCallback,
37};
38use crate::{
39    Header, Payload, QueryError, QueryResult,
40    availability::{LeafId, LeafQueryData, QueryableHeader, QueryablePayload},
41    data_source::{
42        VersionedDataSource,
43        storage::{
44            AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage,
45            pruning::PrunedHeightStorage,
46        },
47    },
48    fetching::{
49        self, Callback, NonEmptyRange,
50        request::{self, LeafRangeRequest},
51    },
52    types::HeightIndexed,
53};
54
55pub(super) type LeafFetcher<Types, S, P> =
56    fetching::Fetcher<request::LeafRequest<Types>, LeafCallback<Types, S, P>>;
57
58pub(super) type LeafRangeFetcher<Types, S, P> =
59    fetching::Fetcher<LeafRangeRequest<Types>, LeafCallback<Types, S, P>>;
60
61impl<Types> FetchRequest for LeafId<Types>
62where
63    Types: NodeType,
64{
65    fn might_exist(self, heights: Heights) -> bool {
66        if let LeafId::Number(n) = self {
67            heights.might_exist(n as u64)
68        } else {
69            true
70        }
71    }
72}
73
74#[async_trait]
75impl<Types> Fetchable<Types> for LeafQueryData<Types>
76where
77    Types: NodeType,
78    Header<Types>: QueryableHeader<Types>,
79    Payload<Types>: QueryablePayload<Types>,
80{
81    type Request = LeafId<Types>;
82
83    fn satisfies(&self, req: Self::Request) -> bool {
84        match req {
85            LeafId::Number(n) => self.height() == n as u64,
86            LeafId::Hash(h) => self.hash() == h,
87        }
88    }
89
90    async fn passive_fetch(
91        notifiers: &Notifiers<Types>,
92        req: Self::Request,
93    ) -> BoxFuture<'static, Option<Self>> {
94        notifiers
95            .leaf
96            .wait_for(move |leaf| leaf.satisfies(req))
97            .await
98            .into_future()
99            .boxed()
100    }
101
102    async fn active_fetch<S, P>(
103        tx: &mut impl AvailabilityStorage<Types>,
104        fetcher: Arc<Fetcher<Types, S, P>>,
105        req: Self::Request,
106    ) -> anyhow::Result<()>
107    where
108        S: VersionedDataSource + 'static,
109        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
110        for<'a> S::ReadOnly<'a>:
111            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
112        P: AvailabilityProvider<Types>,
113    {
114        fetch_leaf_with_callbacks(tx, fetcher, req, None).await
115    }
116
117    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
118    where
119        S: AvailabilityStorage<Types>,
120    {
121        storage.get_leaf(req).await
122    }
123}
124
125pub(super) async fn fetch_leaf_with_callbacks<Types, S, P, I>(
126    tx: &mut impl AvailabilityStorage<Types>,
127    fetcher: Arc<Fetcher<Types, S, P>>,
128    req: LeafId<Types>,
129    callbacks: I,
130) -> anyhow::Result<()>
131where
132    Types: NodeType,
133    Header<Types>: QueryableHeader<Types>,
134    Payload<Types>: QueryablePayload<Types>,
135    S: VersionedDataSource + 'static,
136    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
137    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
138    P: AvailabilityProvider<Types>,
139    I: IntoIterator<Item = LeafCallback<Types, S, P>> + Send + 'static,
140    I::IntoIter: Send,
141{
142    match req {
143        LeafId::Number(n) => {
144            // We need the next leaf in the chain so we can figure out what hash we expect for this
145            // leaf, so we can fetch it securely from an untrusted provider.
146            let next = (n + 1) as u64;
147            let next = match tx.first_available_leaf(next).await {
148                Ok(leaf) if leaf.height() == next => leaf,
149                Ok(leaf) => {
150                    // If we don't have the immediate successor leaf, but we have some later leaf,
151                    // then we can't trigger this exact fetch, but we can fetch the (apparently)
152                    // missing parent of the leaf we do have, which will trigger a chain of fetches
153                    // that eventually reaches all the way back to the desired leaf.
154                    tracing::debug!(
155                        n,
156                        fetching = leaf.height() - 1,
157                        "do not have necessary leaf; trigger fetch of a later leaf"
158                    );
159
160                    let mut callbacks = vec![LeafCallback::Leaf {
161                        fetcher: fetcher.clone(),
162                    }];
163
164                    if !fetcher.leaf_only {
165                        callbacks.push(
166                            HeaderCallback::Payload {
167                                fetcher: fetcher.clone(),
168                            }
169                            .into(),
170                        );
171                        callbacks.push(
172                            HeaderCallback::VidCommon {
173                                fetcher: fetcher.clone(),
174                            }
175                            .into(),
176                        );
177                    }
178
179                    fetcher.leaf_fetcher.clone().spawn_fetch(
180                        request::LeafRequest::new(
181                            leaf.height() - 1,
182                            leaf.leaf().parent_commitment(),
183                            leaf.leaf().justify_qc().commit(),
184                        ),
185                        fetcher.provider.clone(),
186                        // After getting the leaf, grab the other data as well; that will be missing
187                        // whenever the leaf was.
188                        callbacks,
189                    );
190                    return Ok(());
191                },
192                Err(QueryError::Missing | QueryError::NotFound) => {
193                    // We successfully queried the database, but the next leaf wasn't there. We
194                    // know for sure that based on the current state of the DB, we cannot fetch this
195                    // leaf.
196                    tracing::debug!(n, "not fetching leaf with unknown successor");
197                    return Ok(());
198                },
199                Err(QueryError::Error { message }) => {
200                    // An error occurred while querying the database. We don't know if we need to
201                    // fetch the leaf or not. Return an error so we can try again.
202                    bail!("failed to fetch successor for leaf {n}: {message}");
203                },
204            };
205
206            let fetcher = fetcher.clone();
207            fetcher.leaf_fetcher.clone().spawn_fetch(
208                request::LeafRequest::new(
209                    n as u64,
210                    next.leaf().parent_commitment(),
211                    next.leaf().justify_qc().commit(),
212                ),
213                fetcher.provider.clone(),
214                once(LeafCallback::Leaf { fetcher }).chain(callbacks),
215            );
216        },
217        LeafId::Hash(h) => {
218            // We don't actively fetch leaves when requested by hash, because we have no way of
219            // knowing whether a leaf with such a hash actually exists, and we don't want to bother
220            // peers with requests for non-existent leaves.
221            tracing::debug!("not fetching unknown leaf {h}");
222        },
223    }
224
225    Ok(())
226}
227
228/// Trigger a fetch of the parent of the given `leaf`, if it is missing.
229///
230/// Leaves have a unique constraint among fetchable objects: we cannot fetch a given leaf at height
231/// `h` unless we have its child at height `h + 1`. This is because the child, through its
232/// `parent_commitment`, tells us what the hash of the parent should be, which lets us authenticate
233/// it when fetching from an untrusted provider. Thus, requests for leaf `h` might block if `h + 1`
234/// is not available. To ensure all these requests are eventually unblocked, and all leaves are
235/// eventually fetched, we call this function whenever we receive leaf `h + 1` to check if we need
236/// to then fetch leaf `h`.
237pub(super) fn trigger_fetch_for_parent<Types, S, P>(
238    fetcher: &Arc<Fetcher<Types, S, P>>,
239    leaf: &LeafQueryData<Types>,
240) where
241    Types: NodeType,
242    Header<Types>: QueryableHeader<Types>,
243    Payload<Types>: QueryablePayload<Types>,
244    S: VersionedDataSource + 'static,
245    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
246    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
247    P: AvailabilityProvider<Types>,
248{
249    let height = leaf.height();
250    let parent = leaf.leaf().parent_commitment();
251    let parent_qc = leaf.leaf().justify_qc().commit();
252
253    // Check that there is a parent to fetch.
254    if height == 0 {
255        return;
256    }
257
258    // Spawn an async task; we're triggering a fire-and-forget fetch of a leaf that might now be
259    // available; we don't need to block the caller on this.
260    let fetcher = fetcher.clone();
261    let span = tracing::info_span!("fetch parent leaf", height, %parent, %parent_qc);
262    spawn(
263        async move {
264            // Check if we already have the parent.
265            match fetcher.storage.read().await {
266                Ok(mut tx) => {
267                    // Don't bother fetching a pruned leaf.
268                    if let Ok(pruned_height) = tx.load_pruned_height().await
269                        && pruned_height.is_some_and(|ph| height <= ph)
270                    {
271                        tracing::info!(height, ?pruned_height, "not fetching pruned parent leaf");
272                        return;
273                    }
274
275                    if tx.get_leaf(((height - 1) as usize).into()).await.is_ok() {
276                        return;
277                    }
278                },
279                Err(err) => {
280                    // If we can't open a transaction, we can't be sure that we already have the
281                    // parent, so we fall through to fetching it just to be safe.
282                    tracing::warn!(
283                        height,
284                        %parent,
285                        "error opening transaction to check for parent leaf: {err:#}",
286                    );
287                },
288            }
289
290            tracing::info!(height, %parent, "received new leaf; fetching missing parent");
291            fetcher.leaf_fetcher.clone().spawn_fetch(
292                request::LeafRequest::new(height - 1, parent, parent_qc),
293                fetcher.provider.clone(),
294                // After getting the leaf, grab the other data as well; that will be missing
295                // whenever the leaf was.
296                [
297                    LeafCallback::Leaf {
298                        fetcher: fetcher.clone(),
299                    },
300                    HeaderCallback::Payload {
301                        fetcher: fetcher.clone(),
302                    }
303                    .into(),
304                    HeaderCallback::VidCommon {
305                        fetcher: fetcher.clone(),
306                    }
307                    .into(),
308                ],
309            );
310        }
311        .instrument(span),
312    );
313}
314
315#[async_trait]
316impl<Types> RangedFetchable<Types> for LeafQueryData<Types>
317where
318    Types: NodeType,
319    Header<Types>: QueryableHeader<Types>,
320    Payload<Types>: QueryablePayload<Types>,
321{
322    type RangedRequest = LeafId<Types>;
323
324    async fn load_range<S, R>(storage: &mut S, range: R) -> QueryResult<Vec<QueryResult<Self>>>
325    where
326        S: AvailabilityStorage<Types>,
327        R: RangeBounds<usize> + Send + 'static,
328    {
329        storage.get_leaf_range(range).await
330    }
331}
332
333impl<Types> Storable<Types> for LeafQueryData<Types>
334where
335    Types: NodeType,
336{
337    fn debug_name(&self) -> String {
338        format!("leaf {}", self.height())
339    }
340
341    async fn notify(&self, notifiers: &Notifiers<Types>) {
342        notifiers.leaf.notify(self).await;
343    }
344
345    async fn store(
346        &self,
347        storage: &mut impl UpdateAvailabilityStorage<Types>,
348        _leaf_only: bool,
349    ) -> anyhow::Result<()> {
350        storage.insert_leaf(self).await
351    }
352}
353
354#[derive(Derivative, From)]
355#[derivative(Debug(bound = ""))]
356pub(super) enum LeafCallback<Types: NodeType, S, P> {
357    /// Callback when fetching the leaf for its own sake.
358    #[from(ignore)]
359    Leaf {
360        #[derivative(Debug = "ignore")]
361        fetcher: Arc<Fetcher<Types, S, P>>,
362    },
363    /// Callback when fetching the leaf in order to then look up something else.
364    Continuation {
365        callback: HeaderCallback<Types, S, P>,
366    },
367}
368
369impl<Types: NodeType, S, P> PartialEq for LeafCallback<Types, S, P> {
370    fn eq(&self, other: &Self) -> bool {
371        self.cmp(other).is_eq()
372    }
373}
374
375impl<Types: NodeType, S, P> Eq for LeafCallback<Types, S, P> {}
376
377impl<Types: NodeType, S, P> Ord for LeafCallback<Types, S, P> {
378    fn cmp(&self, other: &Self) -> Ordering {
379        match (self, other) {
380            // Store leaves in the database before storing derived objects.
381            (Self::Leaf { .. }, Self::Continuation { .. }) => Ordering::Less,
382            (Self::Continuation { .. }, Self::Leaf { .. }) => Ordering::Greater,
383
384            (Self::Continuation { callback: cb1 }, Self::Continuation { callback: cb2 }) => {
385                cb1.cmp(cb2)
386            },
387            _ => Ordering::Equal,
388        }
389    }
390}
391
392impl<Types: NodeType, S, P> PartialOrd for LeafCallback<Types, S, P> {
393    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
394        Some(self.cmp(other))
395    }
396}
397
398impl<Types: NodeType, S, P> Callback<LeafQueryData<Types>> for LeafCallback<Types, S, P>
399where
400    Header<Types>: QueryableHeader<Types>,
401    Payload<Types>: QueryablePayload<Types>,
402    S: VersionedDataSource + 'static,
403    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
404    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
405    P: AvailabilityProvider<Types>,
406{
407    async fn run(self, leaf: LeafQueryData<Types>) {
408        match self {
409            Self::Leaf { fetcher } => {
410                tracing::info!("fetched leaf {}", leaf.height());
411                // Trigger a fetch of the parent leaf, if we don't already have it.
412                trigger_fetch_for_parent(&fetcher, &leaf);
413                fetcher.store_and_notify(&leaf).await;
414            },
415            Self::Continuation { callback } => callback.run(leaf.leaf.block_header().clone()),
416        }
417    }
418}
419
420impl<Types: NodeType, S, P> Callback<NonEmptyRange<LeafQueryData<Types>>>
421    for LeafCallback<Types, S, P>
422where
423    Header<Types>: QueryableHeader<Types>,
424    Payload<Types>: QueryablePayload<Types>,
425    S: VersionedDataSource + 'static,
426    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
427    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
428    P: AvailabilityProvider<Types>,
429{
430    async fn run(self, leaves: NonEmptyRange<LeafQueryData<Types>>) {
431        match self {
432            Self::Leaf { fetcher } => {
433                tracing::info!("fetched leaf range {}..{}", leaves.start(), leaves.end());
434                fetcher.store_and_notify(&leaves).await;
435
436                // Unlike in the singular leaf version of this callback, we do not call
437                // `trigger_fetch_for_parent` to start a potential chain reaction for a
438                // contiguous range of leaves. This is because we have already just fetched a
439                // contiguous range, and if we are currently bulk fetching, it is more efficient to
440                // continue bulk fetching, rather than kick of a chain reaction of individual
441                // fetches, which will end up fetching the same data, slower.
442            },
443            Self::Continuation { callback } => callback.run_range(leaves.as_ref_cloned()),
444        }
445    }
446}
447
448/// A request for a semi-open range of height-indexed objects.
449#[derive(Clone, Copy, Debug)]
450pub struct RangeRequest {
451    pub start: u64,
452    pub end: u64,
453}
454
455impl IntoIterator for RangeRequest {
456    type Item = u64;
457    type IntoIter = Range<u64>;
458
459    fn into_iter(self) -> Self::IntoIter {
460        self.start..self.end
461    }
462}
463
464impl FetchRequest for RangeRequest {
465    fn might_exist(self, heights: Heights) -> bool {
466        heights.pruned_height.is_none_or(|h| h < self.start) && self.end < heights.height
467    }
468}
469
470impl RangeRequest {
471    pub fn is_satisfied(&self, range: &NonEmptyRange<impl HeightIndexed>) -> bool {
472        range.start() == self.start && range.end() == self.end
473    }
474
475    pub fn len(&self) -> usize {
476        (self.end - self.start) as usize
477    }
478}
479
480#[async_trait]
481impl<Types> Fetchable<Types> for NonEmptyRange<LeafQueryData<Types>>
482where
483    Types: NodeType,
484    Header<Types>: QueryableHeader<Types>,
485    Payload<Types>: QueryablePayload<Types>,
486{
487    type Request = RangeRequest;
488
489    fn satisfies(&self, req: Self::Request) -> bool {
490        req.is_satisfied(self)
491    }
492
493    async fn passive_fetch(
494        notifiers: &Notifiers<Types>,
495        req: Self::Request,
496    ) -> BoxFuture<'static, Option<Self>> {
497        let waits = join_all(req.into_iter().map(|i| {
498            notifiers
499                .leaf
500                .wait_for(move |leaf| leaf.satisfies(LeafId::Number(i as usize)))
501        }))
502        .await;
503
504        join_all(waits.into_iter().map(|wait| wait.into_future()))
505            .map(|options| NonEmptyRange::new(options.into_iter().flatten()).ok())
506            .boxed()
507    }
508
509    async fn active_fetch<S, P>(
510        tx: &mut impl AvailabilityStorage<Types>,
511        fetcher: Arc<Fetcher<Types, S, P>>,
512        req: Self::Request,
513    ) -> anyhow::Result<()>
514    where
515        S: VersionedDataSource + 'static,
516        for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
517        for<'a> S::ReadOnly<'a>:
518            AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
519        P: AvailabilityProvider<Types>,
520    {
521        fetch_leaf_range_with_callbacks(tx, fetcher, req, None).await
522    }
523
524    async fn load<S>(storage: &mut S, req: Self::Request) -> QueryResult<Self>
525    where
526        S: AvailabilityStorage<Types>,
527    {
528        let leaves = storage
529            .get_leaf_range((req.start as usize)..(req.end as usize))
530            .await?
531            .into_iter()
532            .collect::<QueryResult<Vec<_>>>()?;
533        if leaves.len() != req.len() {
534            tracing::debug!(
535                ?req,
536                len = leaves.len(),
537                "database returned partial result, unable to load full range"
538            );
539            return Err(QueryError::Missing);
540        }
541        NonEmptyRange::new(leaves).map_err(|err| QueryError::Error {
542            message: format!("expected contiguous range, but: {err:#}"),
543        })
544    }
545}
546
547impl<Types> Storable<Types> for NonEmptyRange<LeafQueryData<Types>>
548where
549    Types: NodeType,
550{
551    fn debug_name(&self) -> String {
552        format!("leaf range {}..{}", self.start(), self.end())
553    }
554
555    async fn notify(&self, notifiers: &Notifiers<Types>) {
556        for leaf in self {
557            notifiers.leaf.notify(leaf).await;
558        }
559    }
560
561    async fn store(
562        &self,
563        storage: &mut impl UpdateAvailabilityStorage<Types>,
564        _leaf_only: bool,
565    ) -> anyhow::Result<()> {
566        storage.insert_leaf_range(self).await
567    }
568}
569
570pub(super) async fn fetch_leaf_range_with_callbacks<Types, S, P, I>(
571    tx: &mut impl AvailabilityStorage<Types>,
572    fetcher: Arc<Fetcher<Types, S, P>>,
573    req: RangeRequest,
574    callbacks: I,
575) -> anyhow::Result<()>
576where
577    Types: NodeType,
578    Header<Types>: QueryableHeader<Types>,
579    Payload<Types>: QueryablePayload<Types>,
580    S: VersionedDataSource + 'static,
581    for<'a> S::Transaction<'a>: UpdateAvailabilityStorage<Types>,
582    for<'a> S::ReadOnly<'a>: AvailabilityStorage<Types> + NodeStorage<Types> + PrunedHeightStorage,
583    P: AvailabilityProvider<Types>,
584    I: IntoIterator<Item = LeafCallback<Types, S, P>> + Send + 'static,
585    I::IntoIter: Send,
586{
587    // We need the next leaf after the chain so we can figure out what hash we expect for the last
588    // leaf in the chain, so we can fetch it securely from an untrusted provider.
589    let next = match tx.first_available_leaf(req.end).await {
590        Ok(leaf) if leaf.height() == req.end => leaf,
591        Ok(leaf) => {
592            // If we don't have the immediate successor leaf, but we have some later leaf,
593            // then we can't trigger this exact fetch, but we can fetch the (apparently)
594            // missing parent of the leaf we do have, which will trigger a chain of fetches
595            // that eventually reaches all the way back to the desired leaf.
596            tracing::debug!(
597                req.end,
598                fetching = leaf.height() - 1,
599                "do not have necessary leaf; trigger fetch of a later leaf"
600            );
601
602            let mut callbacks = vec![LeafCallback::Leaf {
603                fetcher: fetcher.clone(),
604            }];
605
606            if !fetcher.leaf_only {
607                callbacks.push(
608                    HeaderCallback::Payload {
609                        fetcher: fetcher.clone(),
610                    }
611                    .into(),
612                );
613                callbacks.push(
614                    HeaderCallback::VidCommon {
615                        fetcher: fetcher.clone(),
616                    }
617                    .into(),
618                );
619            }
620
621            fetcher.leaf_fetcher.clone().spawn_fetch(
622                request::LeafRequest::new(
623                    leaf.height() - 1,
624                    leaf.leaf().parent_commitment(),
625                    leaf.leaf().justify_qc().commit(),
626                ),
627                fetcher.provider.clone(),
628                // After getting the leaf, grab the other data as well; that will be missing
629                // whenever the leaf was.
630                callbacks,
631            );
632            return Ok(());
633        },
634        Err(QueryError::Missing | QueryError::NotFound) => {
635            // We successfully queried the database, but the next leaf wasn't there. We know for
636            // sure that based on the current state of the DB, we cannot fetch this leaf.
637            tracing::debug!(req.end, "not fetching leaf chain with unknown successor");
638            return Ok(());
639        },
640        Err(QueryError::Error { message }) => {
641            // An error occurred while querying the database. We don't know if we need to fetch the
642            // leaf or not. Return an error so we can try again.
643            bail!(
644                "failed to fetch successor for leaf chain {}: {message}",
645                req.end
646            );
647        },
648    };
649
650    let fetcher = fetcher.clone();
651    fetcher.leaf_range_fetcher.clone().spawn_fetch(
652        LeafRangeRequest {
653            start: req.start,
654            end: req.end,
655            last_leaf: next.leaf().parent_commitment(),
656            last_qc: next.leaf().justify_qc().commit(),
657        },
658        fetcher.provider.clone(),
659        once(LeafCallback::Leaf { fetcher }).chain(callbacks),
660    );
661
662    Ok(())
663}