espresso_node/api/
light_client.rs

1use std::{fmt::Display, sync::Arc, time::Duration};
2
3use anyhow::Result;
4use espresso_types::{BlockMerkleTree, NsProof, SeqTypes};
5use futures::{
6    TryStreamExt,
7    future::{FutureExt, try_join},
8    stream::StreamExt,
9};
10use hotshot_query_service::{
11    Error,
12    availability::{self, AvailabilityDataSource, LeafId},
13    data_source::{VersionedDataSource, storage::NodeStorage},
14    merklized_state::{MerklizedStateDataSource, Snapshot},
15    node::BlockId,
16    types::HeightIndexed,
17};
18use hotshot_types::utils::{epoch_from_block_number, root_block_in_epoch};
19use itertools::izip;
20use jf_merkle_tree_compat::MerkleTreeScheme;
21use light_client::consensus::{
22    header::HeaderProof, leaf::LeafProof, namespace::NamespaceProof, payload::PayloadProof,
23};
24use tide_disco::{Api, RequestParams, StatusCode, method::ReadState};
25use vbs::version::StaticVersionType;
26
27use crate::api::data_source::{NodeStateDataSource, StakeTableDataSource};
28
29async fn get_leaf_proof<State>(
30    state: &State,
31    requested: usize,
32    finalized: Option<usize>,
33    fetch_timeout: Duration,
34) -> Result<LeafProof, Error>
35where
36    State: AvailabilityDataSource<SeqTypes> + VersionedDataSource,
37    for<'a> State::ReadOnly<'a>: NodeStorage<SeqTypes>,
38{
39    let (endpoint, qc_chain) = match finalized {
40        Some(finalized) => {
41            // If we have a known-finalized block, we will not need a final 2-chain of QCs to prove
42            // the last leaf in the result finalized, since we will either terminate with a 3-chain
43            // of leaves or at the `finalized` leaf. Thus, we can use None for the final QC chain.
44            if finalized <= requested {
45                return Err(Error::Custom {
46                    message: format!(
47                        "finalized leaf height ({finalized}) must be greater than requested \
48                         ({requested})"
49                    ),
50                    status: StatusCode::BAD_REQUEST,
51                });
52            }
53            (finalized, None)
54        },
55        None => {
56            async {
57                // Grab the endpoint and the final QC chain in the same transaction, to ensure that
58                // the QC chain actually corresponds to the endpoint block (and is not subject to
59                // concurrent updates).
60                let mut tx = state.read().await?;
61                let height = NodeStorage::block_height(&mut tx).await?;
62                let qc_chain = tx.latest_qc_chain().await?;
63                Ok((height, qc_chain))
64            }
65            .await
66            .map_err(|err: anyhow::Error| Error::Custom {
67                message: err.to_string(),
68                status: StatusCode::INTERNAL_SERVER_ERROR,
69            })?
70        },
71    };
72    let mut leaves = state.get_leaf_range(requested..endpoint).await;
73    let mut proof = LeafProof::default();
74
75    while let Some(leaf) = leaves.next().await {
76        let leaf = leaf
77            .with_timeout(fetch_timeout)
78            .await
79            .ok_or_else(|| not_found("missing leaves"))?;
80
81        if proof.push(leaf) {
82            return Ok(proof);
83        }
84    }
85
86    // We reached the end of the range of interest without encountering a 3-chain. Thus, if the last
87    // leaf in the chain is not already assumed finalized by the client, we must prove it finalized
88    // by appending two more QCs.
89    if finalized.is_none() {
90        let Some([committing_qc, deciding_qc]) = qc_chain else {
91            return Err(not_found("missing QC 2-chain to prove finality"));
92        };
93        proof.add_qc_chain(Arc::new(committing_qc), Arc::new(deciding_qc));
94    }
95
96    Ok(proof)
97}
98
99async fn get_header_proof<State>(
100    state: &State,
101    root: u64,
102    requested: BlockId<SeqTypes>,
103    fetch_timeout: Duration,
104) -> Result<HeaderProof, Error>
105where
106    State: AvailabilityDataSource<SeqTypes>
107        + MerklizedStateDataSource<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
108        + VersionedDataSource,
109{
110    let header = state
111        .get_header(requested)
112        .await
113        .with_timeout(fetch_timeout)
114        .await
115        .ok_or_else(|| not_found(format!("unknown header {requested}")))?;
116    if header.height() >= root {
117        return Err(Error::Custom {
118            message: format!(
119                "height ({}) must be less than root ({root})",
120                header.height()
121            ),
122            status: StatusCode::BAD_REQUEST,
123        });
124    }
125    let path = MerklizedStateDataSource::<SeqTypes, BlockMerkleTree, _>::get_path(
126        state,
127        Snapshot::Index(root),
128        header.height(),
129    )
130    .await
131    .map_err(|source| Error::MerklizedState {
132        source: source.into(),
133    })?;
134
135    Ok(HeaderProof::new(header, path))
136}
137
138async fn get_namespace_proof_range<State>(
139    state: &State,
140    start: usize,
141    end: usize,
142    namespace: u64,
143    fetch_timeout: Duration,
144    large_object_range_limit: usize,
145) -> Result<Vec<NamespaceProof>, Error>
146where
147    State: AvailabilityDataSource<SeqTypes>,
148{
149    if end <= start {
150        return Err(Error::Custom {
151            message: format!("requested empty interval [{start}, {end})"),
152            status: StatusCode::BAD_REQUEST,
153        });
154    }
155    if end - start > large_object_range_limit {
156        return Err(Error::Custom {
157            message: format!(
158                "requested range [{start}, {end}) exceeds maximum size {large_object_range_limit}"
159            ),
160            status: StatusCode::BAD_REQUEST,
161        });
162    }
163
164    let fetch_headers = async move {
165        state
166            .get_header_range(start..end)
167            .await
168            .enumerate()
169            .then(|(i, fetch)| async move {
170                fetch
171                    .with_timeout(fetch_timeout)
172                    .await
173                    .ok_or_else(|| Error::Custom {
174                        message: format!("missing header {}", start + i),
175                        status: StatusCode::NOT_FOUND,
176                    })
177            })
178            .try_collect::<Vec<_>>()
179            .await
180    };
181    let fetch_payloads = async move {
182        state
183            .get_payload_range(start..end)
184            .await
185            .enumerate()
186            .then(|(i, fetch)| async move {
187                fetch
188                    .with_timeout(fetch_timeout)
189                    .await
190                    .ok_or_else(|| Error::Custom {
191                        message: format!("missing payload {}", start + i),
192                        status: StatusCode::NOT_FOUND,
193                    })
194            })
195            .try_collect::<Vec<_>>()
196            .await
197    };
198    let fetch_vid_commons = async move {
199        state
200            .get_vid_common_range(start..end)
201            .await
202            .enumerate()
203            .then(|(i, fetch)| async move {
204                fetch
205                    .with_timeout(fetch_timeout)
206                    .await
207                    .ok_or_else(|| Error::Custom {
208                        message: format!("missing VID common {}", start + i),
209                        status: StatusCode::NOT_FOUND,
210                    })
211            })
212            .try_collect::<Vec<_>>()
213            .await
214    };
215    let (headers, (payloads, vid_commons)) =
216        try_join(fetch_headers, try_join(fetch_payloads, fetch_vid_commons)).await?;
217
218    izip!(headers, payloads, vid_commons)
219        .map(|(header, payload, vid_common)| {
220            let Some(ns_index) = header.ns_table().find_ns_id(&namespace.into()) else {
221                return Ok(NamespaceProof::not_present());
222            };
223            let ns_proof = NsProof::new(payload.data(), &ns_index, vid_common.common())
224                .ok_or_else(|| Error::Custom {
225                    message: "failed to construct namespace proof".into(),
226                    status: StatusCode::INTERNAL_SERVER_ERROR,
227                })?;
228            Ok(NamespaceProof::new(ns_proof, vid_common.common().clone()))
229        })
230        .collect()
231}
232
233#[derive(Debug)]
234pub(super) struct Options {
235    /// Timeout for failing requests due to missing data.
236    ///
237    /// If data needed to respond to a request is missing, it can (in some cases) be fetched from an
238    /// external provider. This parameter controls how long the request handler will wait for
239    /// missing data to be fetched before giving up and failing the request.
240    pub fetch_timeout: Duration,
241
242    /// The maximum number of large objects which can be loaded in a single range query.
243    ///
244    /// Large objects include anything that _might_ contain a full payload or an object proportional
245    /// in size to a payload. Note that this limit applies to the entire class of objects: we do not
246    /// check the size of objects while loading to determine which limit to apply. If an object
247    /// belongs to a class which might contain a large payload, the large object limit always
248    /// applies.
249    pub large_object_range_limit: usize,
250}
251
252impl Default for Options {
253    fn default() -> Self {
254        Self {
255            fetch_timeout: Duration::from_millis(500),
256            large_object_range_limit: availability::Options::default().large_object_range_limit,
257        }
258    }
259}
260
261pub(super) fn define_api<S, ApiVer: StaticVersionType + 'static>(
262    opt: Options,
263    api_ver: semver::Version,
264) -> Result<Api<S, Error, ApiVer>>
265where
266    S: ReadState + Send + Sync + 'static,
267    S::State: AvailabilityDataSource<SeqTypes>
268        + MerklizedStateDataSource<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
269        + NodeStateDataSource
270        + StakeTableDataSource<SeqTypes>
271        + VersionedDataSource,
272    for<'a> <S::State as VersionedDataSource>::ReadOnly<'a>: NodeStorage<SeqTypes>,
273{
274    let toml = toml::from_str::<toml::Value>(include_str!("../../api/light-client.toml"))?;
275    let mut api = Api::<S, Error, ApiVer>::new(toml)?;
276    api.with_version(api_ver);
277
278    let Options {
279        fetch_timeout,
280        large_object_range_limit,
281    } = opt;
282
283    api.get("leaf", move |req, state| {
284        async move {
285            let requested = leaf_height_from_req(&req, state, fetch_timeout).await?;
286            let finalized = req
287                .opt_integer_param("finalized")
288                .map_err(bad_param("finalized"))?;
289            get_leaf_proof(state, requested, finalized, fetch_timeout).await
290        }
291        .boxed()
292    })?
293    .get("header", move |req, state| {
294        async move {
295            let root = req.integer_param("root").map_err(bad_param("root"))?;
296            let requested = block_id_from_req(&req)?;
297            get_header_proof(state, root, requested, fetch_timeout).await
298        }
299        .boxed()
300    })?
301    .get("stake_table", move |req, state| {
302        async move {
303            let epoch: u64 = req.integer_param("epoch").map_err(bad_param("epoch"))?;
304
305            let node_state = state.node_state().await;
306            let epoch_height = node_state.epoch_height.ok_or_else(|| Error::Custom {
307                message: "epoch state not set".into(),
308                status: StatusCode::INTERNAL_SERVER_ERROR,
309            })?;
310            let first_epoch = epoch_from_block_number(node_state.epoch_start_block, epoch_height);
311
312            if epoch < first_epoch + 2 {
313                return Err(Error::Custom {
314                    message: format!("epoch must be at least {}", first_epoch + 2),
315                    status: StatusCode::BAD_REQUEST,
316                });
317            }
318
319            // Find the range of L1 block containing events for this epoch. This is determined by
320            // the `l1_finalized` field of the epoch root (from two epochs prior) and the previous
321            // epoch's epoch root.
322            let epoch_root_height = root_block_in_epoch(epoch - 2, epoch_height) as usize;
323            let epoch_root = state
324                .get_header(epoch_root_height)
325                .await
326                .with_timeout(fetch_timeout)
327                .await
328                .ok_or_else(|| {
329                    not_found(format!("missing epoch root header {epoch_root_height}"))
330                })?;
331            let to_l1_block = epoch_root
332                .l1_finalized()
333                .ok_or_else(|| Error::Custom {
334                    message: "epoch root header is missing L1 finalized block".into(),
335                    status: StatusCode::INTERNAL_SERVER_ERROR,
336                })?
337                .number();
338
339            let from_l1_block = if epoch >= first_epoch + 3 {
340                let prev_epoch_root_height = root_block_in_epoch(epoch - 3, epoch_height) as usize;
341                let prev_epoch_root = state
342                    .get_header(prev_epoch_root_height)
343                    .await
344                    .with_timeout(fetch_timeout)
345                    .await
346                    .ok_or_else(|| {
347                        not_found(format!(
348                            "missing previous epoch root header {prev_epoch_root_height}"
349                        ))
350                    })?;
351                prev_epoch_root
352                    .l1_finalized()
353                    .ok_or_else(|| Error::Custom {
354                        message: "previous epoch root header is missing L1 finalized block".into(),
355                        status: StatusCode::INTERNAL_SERVER_ERROR,
356                    })?
357                    .number()
358                    + 1
359            } else {
360                0
361            };
362
363            state
364                .stake_table_events(from_l1_block, to_l1_block)
365                .await
366                .map_err(|err| Error::Custom {
367                    message: format!("failed to load stake table events: {err:#}"),
368                    status: StatusCode::INTERNAL_SERVER_ERROR,
369                })
370        }
371        .boxed()
372    })?
373    .get("payload", move |req, state| {
374        async move {
375            let height: usize = req.integer_param("height").map_err(bad_param("height"))?;
376            let fetch_payload = async move {
377                state
378                    .get_payload(height)
379                    .await
380                    .with_timeout(fetch_timeout)
381                    .await
382                    .ok_or_else(|| Error::Custom {
383                        message: format!("missing payload {height}"),
384                        status: StatusCode::NOT_FOUND,
385                    })
386            };
387            let fetch_vid_common = async move {
388                state
389                    .get_vid_common(height)
390                    .await
391                    .with_timeout(fetch_timeout)
392                    .await
393                    .ok_or_else(|| Error::Custom {
394                        message: format!("missing VID common {height}"),
395                        status: StatusCode::NOT_FOUND,
396                    })
397            };
398            let (payload, vid_common) = try_join(fetch_payload, fetch_vid_common).await?;
399            Ok(PayloadProof::new(
400                payload.data().clone(),
401                vid_common.common().clone(),
402            ))
403        }
404        .boxed()
405    })?
406    .get("namespace", move |req, state| {
407        async move {
408            let height = req.integer_param("height").map_err(bad_param("height"))?;
409            let namespace = req
410                .integer_param("namespace")
411                .map_err(bad_param("namespace"))?;
412            let mut proofs = get_namespace_proof_range(
413                state,
414                height,
415                height + 1,
416                namespace,
417                fetch_timeout,
418                large_object_range_limit,
419            )
420            .await?;
421            if proofs.len() != 1 {
422                tracing::error!(
423                    height,
424                    namespace,
425                    ?proofs,
426                    "get_namespace_proof_range should have returned exactly one proof"
427                );
428                return Err(Error::Custom {
429                    message: "internal consistency error".into(),
430                    status: StatusCode::INTERNAL_SERVER_ERROR,
431                });
432            }
433            Ok(proofs.remove(0))
434        }
435        .boxed()
436    })?
437    .get("namespace_range", move |req, state| {
438        async move {
439            let start = req.integer_param("start").map_err(bad_param("start"))?;
440            let end = req.integer_param("end").map_err(bad_param("end"))?;
441            let namespace = req
442                .integer_param("namespace")
443                .map_err(bad_param("namespace"))?;
444            get_namespace_proof_range(
445                state,
446                start,
447                end,
448                namespace,
449                fetch_timeout,
450                large_object_range_limit,
451            )
452            .await
453        }
454        .boxed()
455    })?;
456
457    Ok(api)
458}
459
460async fn leaf_height_from_req<S>(
461    req: &RequestParams,
462    state: &S,
463    fetch_timeout: Duration,
464) -> Result<usize, Error>
465where
466    S: AvailabilityDataSource<SeqTypes>,
467{
468    if let Some(height) = req
469        .opt_integer_param("height")
470        .map_err(bad_param("height"))?
471    {
472        return Ok(height);
473    } else if let Some(hash) = req.opt_blob_param("hash").map_err(bad_param("hash"))? {
474        let leaf = state
475            .get_leaf(LeafId::Hash(hash))
476            .await
477            .with_timeout(fetch_timeout)
478            .await
479            .ok_or_else(|| not_found(format!("unknown leaf hash {hash}")))?;
480        return Ok(leaf.height() as usize);
481    } else if let Some(hash) = req
482        .opt_blob_param("block-hash")
483        .map_err(bad_param("block-hash"))?
484    {
485        let header = state
486            .get_header(BlockId::Hash(hash))
487            .await
488            .with_timeout(fetch_timeout)
489            .await
490            .ok_or_else(|| not_found(format!("unknown block hash {hash}")))?;
491        return Ok(header.height() as usize);
492    } else if let Some(hash) = req
493        .opt_blob_param("payload-hash")
494        .map_err(bad_param("payload-hash"))?
495    {
496        let header = state
497            .get_header(BlockId::PayloadHash(hash))
498            .await
499            .with_timeout(fetch_timeout)
500            .await
501            .ok_or_else(|| not_found(format!("unknown payload hash {hash}")))?;
502        return Ok(header.height() as usize);
503    }
504
505    Err(Error::Custom {
506        message: "missing parameter: requested leaf must be identified by height, hash, block \
507                  hash, or payload hash"
508            .into(),
509        status: StatusCode::BAD_REQUEST,
510    })
511}
512
513fn block_id_from_req(req: &RequestParams) -> Result<BlockId<SeqTypes>, Error> {
514    if let Some(height) = req
515        .opt_integer_param("height")
516        .map_err(bad_param("height"))?
517    {
518        Ok(BlockId::Number(height))
519    } else if let Some(hash) = req.opt_blob_param("hash").map_err(bad_param("hash"))? {
520        Ok(BlockId::Hash(hash))
521    } else if let Some(hash) = req
522        .opt_blob_param("payload-hash")
523        .map_err(bad_param("payload-hash"))?
524    {
525        Ok(BlockId::PayloadHash(hash))
526    } else {
527        Err(Error::Custom {
528            message: "missing parameter: requested header must be identified by height, hash, or \
529                      payload hash"
530                .into(),
531            status: StatusCode::BAD_REQUEST,
532        })
533    }
534}
535
536fn bad_param<E>(name: &'static str) -> impl FnOnce(E) -> Error
537where
538    E: Display,
539{
540    move |err| Error::Custom {
541        message: format!("{name}: {err:#}"),
542        status: StatusCode::BAD_REQUEST,
543    }
544}
545
546fn not_found(msg: impl Into<String>) -> Error {
547    Error::Custom {
548        message: msg.into(),
549        status: StatusCode::NOT_FOUND,
550    }
551}
552
553#[cfg(test)]
554mod test {
555    use espresso_types::BLOCK_MERKLE_TREE_HEIGHT;
556    use futures::future::join_all;
557    use hotshot_query_service::{
558        availability::{BlockQueryData, TransactionIndex, VidCommonQueryData},
559        data_source::{Transaction, storage::UpdateAvailabilityStorage},
560        merklized_state::UpdateStateData,
561    };
562    use hotshot_types::simple_certificate::CertificatePair;
563    use jf_merkle_tree_compat::{AppendableMerkleTreeScheme, ToTraversalPath};
564    use light_client::{
565        consensus::leaf::{FinalityProof, LeafProofHint},
566        testing::{
567            AlwaysTrueQuorum, ENABLE_EPOCHS, LEGACY_VERSION, TestClient, VersionCheckQuorum,
568            leaf_chain, leaf_chain_with_upgrade,
569        },
570    };
571    use tide_disco::Error;
572    use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_VERSION};
573
574    use super::*;
575    use crate::api::{
576        data_source::{SequencerDataSource, testing::TestableSequencerDataSource},
577        sql::DataSource,
578    };
579
580    #[test_log::test(tokio::test(flavor = "multi_thread"))]
581    async fn test_two_chain() {
582        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
583        let ds = DataSource::create(
584            DataSource::persistence_options(&storage),
585            Default::default(),
586            false,
587        )
588        .await
589        .unwrap();
590
591        // Insert some leaves, forming a chain.
592        let leaves = leaf_chain(1..=3, EPOCH_VERSION).await;
593        {
594            let mut tx = ds.write().await.unwrap();
595            tx.insert_leaf(&leaves[0]).await.unwrap();
596            tx.insert_leaf(&leaves[1]).await.unwrap();
597            tx.insert_leaf(&leaves[2]).await.unwrap();
598            tx.commit().await.unwrap();
599        }
600
601        // Ask for the first leaf; it is proved finalized by the chain formed along with the second.
602        let proof = get_leaf_proof(&ds, 1, None, Duration::MAX).await.unwrap();
603        assert_eq!(
604            proof
605                .verify(LeafProofHint::Quorum(&AlwaysTrueQuorum))
606                .await
607                .unwrap(),
608            leaves[0]
609        );
610    }
611
612    #[test_log::test(tokio::test(flavor = "multi_thread"))]
613    async fn test_finalized() {
614        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
615        let ds = DataSource::create(
616            DataSource::persistence_options(&storage),
617            Default::default(),
618            false,
619        )
620        .await
621        .unwrap();
622
623        // Insert a single leaf. We will not be able to provide proofs ending in a leaf chain, but
624        // we can return a leaf if the leaf after it is already known to be finalized.
625        let leaves = leaf_chain(1..=2, EPOCH_VERSION).await;
626        {
627            let mut tx = ds.write().await.unwrap();
628            tx.insert_leaf(&leaves[0]).await.unwrap();
629            tx.commit().await.unwrap();
630        }
631
632        let proof = get_leaf_proof(&ds, 1, Some(2), Duration::MAX)
633            .await
634            .unwrap();
635        assert_eq!(
636            proof
637                .verify(LeafProofHint::assumption(leaves[1].leaf()))
638                .await
639                .unwrap(),
640            leaves[0]
641        );
642    }
643
644    #[test_log::test(tokio::test(flavor = "multi_thread"))]
645    async fn test_bad_finalized() {
646        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
647        let ds = DataSource::create(
648            DataSource::persistence_options(&storage),
649            Default::default(),
650            false,
651        )
652        .await
653        .unwrap();
654
655        // Insert a single leaf. If we request this leaf but provide a finalized leaf which is
656        // earlier, we should fail.
657        let leaves = leaf_chain(1..2, EPOCH_VERSION).await;
658        {
659            let mut tx = ds.write().await.unwrap();
660            tx.insert_leaf(&leaves[0]).await.unwrap();
661            tx.commit().await.unwrap();
662        }
663
664        let err = get_leaf_proof(&ds, 1, Some(0), Duration::MAX)
665            .await
666            .unwrap_err();
667        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
668    }
669
670    #[test_log::test(tokio::test(flavor = "multi_thread"))]
671    async fn test_no_chain() {
672        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
673        let ds = DataSource::create(
674            DataSource::persistence_options(&storage),
675            Default::default(),
676            false,
677        )
678        .await
679        .unwrap();
680
681        // Insert multiple leaves that don't chain. We will not be able to prove these are
682        // finalized.
683        let leaves = leaf_chain(1..=4, EPOCH_VERSION).await;
684        {
685            let mut tx = ds.write().await.unwrap();
686            tx.insert_leaf(&leaves[0]).await.unwrap();
687            tx.insert_leaf(&leaves[2]).await.unwrap();
688            tx.insert_leaf(&leaves[3]).await.unwrap();
689            tx.commit().await.unwrap();
690        }
691
692        let err = get_leaf_proof(&ds, 1, None, Duration::from_secs(1))
693            .await
694            .unwrap_err();
695        assert_eq!(err.status(), StatusCode::NOT_FOUND);
696
697        // Even if we start from a finalized leave that extends one of the leaves we do have (4,
698        // extends 3) we fail to generate a proof because we can't generate a chain from the
699        // requested leaf (1) to the finalized leaf (4), since leaf 2 is missing.
700        let err = get_leaf_proof(&ds, 1, Some(4), Duration::from_secs(1))
701            .await
702            .unwrap_err();
703        assert_eq!(err.status(), StatusCode::NOT_FOUND);
704    }
705
706    #[test_log::test(tokio::test(flavor = "multi_thread"))]
707    async fn test_final_qcs() {
708        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
709        let ds = DataSource::create(
710            DataSource::persistence_options(&storage),
711            Default::default(),
712            false,
713        )
714        .await
715        .unwrap();
716
717        // Insert a single leaf, plus an extra QC chain proving it finalized.
718        let leaves = leaf_chain(1..=3, EPOCH_VERSION).await;
719        let qcs = [
720            CertificatePair::for_parent(leaves[1].leaf()),
721            CertificatePair::for_parent(leaves[2].leaf()),
722        ];
723        {
724            let mut tx = ds.write().await.unwrap();
725            tx.insert_leaf_with_qc_chain(&leaves[0], Some(qcs.clone()))
726                .await
727                .unwrap();
728            tx.commit().await.unwrap();
729        }
730
731        let proof = get_leaf_proof(&ds, 1, None, Duration::MAX).await.unwrap();
732        assert_eq!(
733            proof
734                .verify(LeafProofHint::Quorum(&AlwaysTrueQuorum))
735                .await
736                .unwrap(),
737            leaves[0]
738        );
739    }
740
741    #[test_log::test(tokio::test(flavor = "multi_thread"))]
742    async fn test_upgrade_to_epochs() {
743        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
744        let ds = DataSource::create(
745            DataSource::persistence_options(&storage),
746            Default::default(),
747            false,
748        )
749        .await
750        .unwrap();
751
752        // Upgrade to epochs (and enabling HotStuff2) in the middle of a leaf chain, so that the
753        // last leaf in the chain only requires 2 QCs to verify, even though at the start of the
754        // chain we would have required 3.
755        let leaves = leaf_chain_with_upgrade(1..=4, 2, ENABLE_EPOCHS).await;
756        assert_eq!(leaves[0].header().version(), LEGACY_VERSION);
757        assert_eq!(leaves[1].header().version(), DRB_AND_HEADER_UPGRADE_VERSION);
758        let qcs = [
759            CertificatePair::for_parent(leaves[2].leaf()),
760            CertificatePair::for_parent(leaves[3].leaf()),
761        ];
762        {
763            let mut tx = ds.write().await.unwrap();
764            tx.insert_leaf(&leaves[0]).await.unwrap();
765            tx.insert_leaf_with_qc_chain(&leaves[1], Some(qcs.clone()))
766                .await
767                .unwrap();
768            tx.commit().await.unwrap();
769        }
770
771        let proof = get_leaf_proof(&ds, 1, None, Duration::MAX).await.unwrap();
772        assert_eq!(
773            proof
774                .verify(LeafProofHint::Quorum(&VersionCheckQuorum::new(
775                    leaves.iter().map(|leaf| leaf.leaf().clone())
776                )))
777                .await
778                .unwrap(),
779            leaves[0]
780        );
781        assert!(matches!(proof.proof(), FinalityProof::HotStuff2 { .. }))
782    }
783
784    #[tokio::test]
785    #[test_log::test]
786    async fn test_header_proof() {
787        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
788        let ds = DataSource::create(
789            DataSource::persistence_options(&storage),
790            Default::default(),
791            false,
792        )
793        .await
794        .unwrap();
795
796        // Construct a chain of leaves, plus the corresponding block Merkle tree at each leaf.
797        let leaves = leaf_chain(0..=2, EPOCH_VERSION).await;
798        let mts = leaves
799            .iter()
800            .scan(
801                BlockMerkleTree::new(BLOCK_MERKLE_TREE_HEIGHT),
802                |mt, leaf| {
803                    assert_eq!(mt.commitment(), leaf.header().block_merkle_tree_root());
804                    let item = mt.clone();
805                    mt.push(leaf.block_hash()).unwrap();
806                    Some(item)
807                },
808            )
809            .collect::<Vec<_>>();
810
811        // Save all those objects in the DB.
812        {
813            let mut tx = ds.write().await.unwrap();
814            for (leaf, mt) in leaves.iter().zip(&mts) {
815                tx.insert_leaf(leaf).await.unwrap();
816
817                if leaf.height() > 0 {
818                    let merkle_path = mt.lookup(leaf.height() - 1).expect_ok().unwrap().1;
819                    UpdateStateData::<SeqTypes, BlockMerkleTree, _>::insert_merkle_nodes(
820                        &mut tx,
821                        merkle_path,
822                        ToTraversalPath::<{ BlockMerkleTree::ARITY }>::to_traversal_path(
823                            &(leaf.height() - 1),
824                            BLOCK_MERKLE_TREE_HEIGHT,
825                        ),
826                        leaf.height(),
827                    )
828                    .await
829                    .unwrap();
830                    UpdateStateData::<SeqTypes, BlockMerkleTree, _>::set_last_state_height(
831                        &mut tx,
832                        leaf.height() as usize,
833                    )
834                    .await
835                    .unwrap();
836                }
837            }
838            tx.commit().await.unwrap();
839        }
840
841        // Test happy path.
842        for (root, mt) in mts.iter().enumerate().skip(1) {
843            for (height, leaf) in leaves.iter().enumerate().take(root) {
844                tracing::info!(root, height, "test happy path");
845                let proof =
846                    get_header_proof(&ds, root as u64, BlockId::Number(height), Duration::MAX)
847                        .await
848                        .unwrap();
849                assert_eq!(proof.verify_ref(mt.commitment()).unwrap(), leaf.header());
850            }
851        }
852
853        // Test unknown leaf.
854        let err = get_header_proof(&ds, 5, BlockId::Number(4), Duration::from_secs(1))
855            .await
856            .unwrap_err();
857        assert_eq!(err.status(), StatusCode::NOT_FOUND);
858
859        // Test height >= root.
860        let err = get_header_proof(&ds, 1, BlockId::Number(1), Duration::MAX)
861            .await
862            .unwrap_err();
863        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
864    }
865
866    #[tokio::test]
867    #[test_log::test]
868    async fn test_namespace_proof() {
869        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
870        let ds = DataSource::create(
871            DataSource::persistence_options(&storage),
872            Default::default(),
873            false,
874        )
875        .await
876        .unwrap();
877
878        // Construct a chain of blocks.
879        let client = TestClient::default();
880        let leaves = join_all((0..=2).map(|i| client.leaf(i))).await;
881        let payloads = join_all((0..=2).map(|i| client.payload(i))).await;
882        let vid_commons = join_all((0..=2).map(|i| client.vid_common(i))).await;
883
884        // Save all those objects in the DB.
885        {
886            let mut tx = ds.write().await.unwrap();
887            for (leaf, payload, vid_common) in izip!(&leaves, &payloads, &vid_commons) {
888                tx.insert_leaf(leaf).await.unwrap();
889                tx.insert_block(&BlockQueryData::<SeqTypes>::new(
890                    leaf.header().clone(),
891                    payload.clone(),
892                ))
893                .await
894                .unwrap();
895                tx.insert_vid(
896                    &VidCommonQueryData::<SeqTypes>::new(leaf.header().clone(), vid_common.clone()),
897                    None,
898                )
899                .await
900                .unwrap();
901            }
902            tx.commit().await.unwrap();
903        }
904
905        // Test happy path: all blocks.
906        let ns = payloads[0]
907            .transaction(&TransactionIndex {
908                ns_index: 0.into(),
909                position: 0,
910            })
911            .unwrap()
912            .namespace();
913        let proofs = get_namespace_proof_range(&ds, 0, 3, ns.into(), Duration::MAX, 100)
914            .await
915            .unwrap();
916        assert_eq!(proofs.len(), 3);
917        for (leaf, proof) in leaves.iter().zip(proofs) {
918            proof.verify(leaf.header(), ns).unwrap();
919        }
920
921        // Test happy path: subset.
922        let tx = payloads[1]
923            .transaction(&TransactionIndex {
924                ns_index: 0.into(),
925                position: 0,
926            })
927            .unwrap();
928        let ns = tx.namespace();
929        let proofs = get_namespace_proof_range(&ds, 1, 2, ns.into(), Duration::MAX, 100)
930            .await
931            .unwrap();
932        assert_eq!(proofs.len(), 1);
933        assert_eq!(proofs[0].verify(leaves[1].header(), ns).unwrap(), [tx]);
934
935        // Test missing data in range.
936        let err = get_namespace_proof_range(&ds, 0, 4, ns.into(), Duration::from_secs(1), 100)
937            .await
938            .unwrap_err();
939        assert_eq!(err.status(), StatusCode::NOT_FOUND);
940
941        // Test invalid range.
942        let err = get_namespace_proof_range(&ds, 1, 0, ns.into(), Duration::from_secs(1), 100)
943            .await
944            .unwrap_err();
945        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
946        assert!(
947            err.to_string().contains("requested empty interval"),
948            "{err:#}"
949        );
950
951        // Test large range.
952        let err = get_namespace_proof_range(&ds, 0, 10_000, ns.into(), Duration::from_secs(1), 100)
953            .await
954            .unwrap_err();
955        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
956        assert!(err.to_string().contains("exceeds maximum size"), "{err:#}");
957    }
958}