Skip to main content

espresso_node/api/
light_client.rs

1use std::{fmt::Display, sync::Arc, time::Duration};
2
3use anyhow::Result;
4use committable::Committable;
5use espresso_types::{BlockMerkleTree, NsProof, SeqTypes};
6use futures::{
7    TryStreamExt,
8    future::{FutureExt, join, try_join},
9    stream::StreamExt,
10};
11use hotshot_query_service::{
12    Error,
13    availability::{self, AvailabilityDataSource, LeafId, LeafQueryData},
14    data_source::{VersionedDataSource, storage::NodeStorage},
15    merklized_state::{MerklizedStateDataSource, Snapshot},
16    node::BlockId,
17    types::HeightIndexed,
18};
19use hotshot_types::utils::{epoch_from_block_number, root_block_in_epoch};
20use itertools::izip;
21use jf_merkle_tree_compat::MerkleTreeScheme;
22use light_client::consensus::{
23    header::HeaderProof, leaf::LeafProof, namespace::NamespaceProof, payload::PayloadProof,
24};
25use tide_disco::{Api, RequestParams, StatusCode, method::ReadState};
26use vbs::version::StaticVersionType;
27use versions::NEW_PROTOCOL_VERSION;
28
29use crate::api::data_source::{NodeStateDataSource, StakeTableDataSource};
30
31async fn get_leaf_proof_with_qc_chain<State>(
32    state: &State,
33    requested_leaf: LeafQueryData<SeqTypes>,
34    fetch_timeout: Duration,
35) -> Result<LeafProof, Error>
36where
37    State: AvailabilityDataSource<SeqTypes> + VersionedDataSource,
38    for<'a> State::ReadOnly<'a>: NodeStorage<SeqTypes>,
39{
40    let requested = requested_leaf.height() as usize;
41    // Grab the endpoint and the final QC chain in the same transaction, to ensure that
42    // the QC chain actually corresponds to the endpoint block (and is not subject to
43    // concurrent updates).
44    let mut tx = state.read().await.map_err(internal)?;
45    let latest_height = NodeStorage::block_height(&mut tx).await.map_err(internal)?;
46    let qc_chain = tx.latest_qc_chain().await.map_err(internal)?;
47    drop(tx);
48
49    let mut leaves = state.get_leaf_range(requested + 1..latest_height).await;
50    let mut proof = LeafProof::default();
51    proof.push(requested_leaf);
52
53    while let Some(leaf) = leaves.next().await {
54        let leaf = leaf
55            .with_timeout(fetch_timeout)
56            .await
57            .ok_or_else(|| not_found("missing leaves"))?;
58
59        if proof.push(leaf) {
60            return Ok(proof);
61        }
62    }
63
64    // We reached the end of the range of interest without encountering a 3-chain. Thus, if the last
65    // leaf in the chain is not already assumed finalized by the client, we must prove it finalized
66    // by appending two more QCs.
67    let Some([committing_qc, deciding_qc]) = qc_chain else {
68        return Err(not_found("missing QC 2-chain to prove finality"));
69    };
70    proof.add_qc_chain(Arc::new(committing_qc), Arc::new(deciding_qc));
71
72    Ok(proof)
73}
74
75/// Build a leaf proof for the new protocol using certificate2 finality.
76///
77/// Certificate2 directly commits the leaf at `cert2.data.block_number`. The indirect commit rule then
78/// commits all of that leaf's uncommitted ancestors. If cert2 directly commits the requested leaf,
79/// the proof contains only that leaf plus cert2. Otherwise, the proof includes the chain from the
80/// requested leaf through the directly committed descendant; the verifier checks parent commitments
81/// to prove the requested leaf is on that finalized chain.
82async fn get_leaf_proof_with_cert2<State>(
83    state: &State,
84    requested_leaf: LeafQueryData<SeqTypes>,
85    fetch_timeout: Duration,
86) -> Result<LeafProof, Error>
87where
88    State: AvailabilityDataSource<SeqTypes> + VersionedDataSource,
89    for<'a> State::ReadOnly<'a>: NodeStorage<SeqTypes>,
90{
91    let requested_height = requested_leaf.height();
92    let cert2 = state
93        .read()
94        .await
95        .map_err(internal)?
96        .load_earliest_cert2(requested_height)
97        .await
98        .map_err(internal)?
99        .ok_or_else(|| {
100            not_found(format!(
101                "no cert2 finality proof available at or after height {requested_height}"
102            ))
103        })?;
104
105    let cert2_height = cert2.data.block_number;
106    if cert2_height < requested_height {
107        return Err(not_found(
108            "cert2 finality proof is older than requested leaf",
109        ));
110    }
111
112    let mut proof = LeafProof::default();
113    let requested_leaf_qc = requested_leaf.qc().clone();
114
115    if cert2_height == requested_height {
116        if requested_leaf.leaf().commit() != cert2.data.leaf_commit {
117            return Err(internal("stored cert2 does not finalize the expected leaf"));
118        }
119        proof.push(requested_leaf);
120        proof.add_certificate(Arc::new(cert2), requested_leaf_qc);
121        return Ok(proof);
122    }
123
124    proof.push(requested_leaf);
125    let mut leaves = state
126        .get_leaf_range(requested_height as usize + 1..cert2_height as usize + 1)
127        .await;
128    // Extend the proof chain until we reach the leaf directly committed by cert2. Once that leaf is
129    // present and matches cert2's commitment, the requested leaf is finalized by the indirect
130    // commit rule.
131    while let Some(leaf) = leaves.next().await {
132        let leaf = leaf
133            .with_timeout(fetch_timeout)
134            .await
135            .ok_or_else(|| not_found("missing leaves"))?;
136
137        if leaf.height() == cert2_height {
138            if leaf.leaf().commit() != cert2.data.leaf_commit {
139                return Err(internal("stored cert2 does not finalize the expected leaf"));
140            }
141            proof.push(leaf);
142            proof.add_certificate(Arc::new(cert2), requested_leaf_qc);
143            return Ok(proof);
144        }
145        proof.push(leaf);
146    }
147
148    Err(not_found("missing cert2 leaf"))
149}
150
151async fn get_leaf_proof_with_finalized_assumption<State>(
152    state: &State,
153    requested_leaf: LeafQueryData<SeqTypes>,
154    finalized: usize,
155    fetch_timeout: Duration,
156) -> Result<LeafProof, Error>
157where
158    State: AvailabilityDataSource<SeqTypes>,
159{
160    let requested = requested_leaf.height() as usize;
161    // If we have a known-finalized block, we will not need a final 2-chain of QCs to prove
162    // the last leaf in the result finalized, since we will either terminate with a 3-chain
163    // of leaves or at the `finalized` leaf.
164    if finalized <= requested {
165        return Err(Error::Custom {
166            message: format!(
167                "finalized leaf height ({finalized}) must be greater than requested ({requested})"
168            ),
169            status: StatusCode::BAD_REQUEST,
170        });
171    }
172
173    let mut leaves = state.get_leaf_range(requested + 1..finalized).await;
174    let mut proof = LeafProof::default();
175    proof.push(requested_leaf);
176
177    while let Some(leaf) = leaves.next().await {
178        let leaf = leaf
179            .with_timeout(fetch_timeout)
180            .await
181            .ok_or_else(|| not_found("missing leaves"))?;
182
183        if proof.push(leaf) {
184            return Ok(proof);
185        }
186    }
187
188    Ok(proof)
189}
190
191async fn get_header_proof<State>(
192    state: &State,
193    root: u64,
194    requested: BlockId<SeqTypes>,
195    fetch_timeout: Duration,
196) -> Result<HeaderProof, Error>
197where
198    State: AvailabilityDataSource<SeqTypes>
199        + MerklizedStateDataSource<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
200        + VersionedDataSource,
201{
202    let header = state
203        .get_header(requested)
204        .await
205        .with_timeout(fetch_timeout)
206        .await
207        .ok_or_else(|| not_found(format!("unknown header {requested}")))?;
208    if header.height() >= root {
209        return Err(Error::Custom {
210            message: format!(
211                "height ({}) must be less than root ({root})",
212                header.height()
213            ),
214            status: StatusCode::BAD_REQUEST,
215        });
216    }
217    let path = MerklizedStateDataSource::<SeqTypes, BlockMerkleTree, _>::get_path(
218        state,
219        Snapshot::Index(root),
220        header.height(),
221    )
222    .await
223    .map_err(|source| Error::MerklizedState {
224        source: source.into(),
225    })?;
226
227    Ok(HeaderProof::new(header, path))
228}
229
230async fn get_namespace_proof_range<State>(
231    state: &State,
232    start: usize,
233    end: usize,
234    namespace: u64,
235    fetch_timeout: Duration,
236    large_object_range_limit: usize,
237) -> Result<Vec<NamespaceProof>, Error>
238where
239    State: AvailabilityDataSource<SeqTypes>,
240{
241    if end <= start {
242        return Err(Error::Custom {
243            message: format!("requested empty interval [{start}, {end})"),
244            status: StatusCode::BAD_REQUEST,
245        });
246    }
247    if end - start > large_object_range_limit {
248        return Err(Error::Custom {
249            message: format!(
250                "requested range [{start}, {end}) exceeds maximum size {large_object_range_limit}"
251            ),
252            status: StatusCode::BAD_REQUEST,
253        });
254    }
255
256    let fetch_headers = async move {
257        state
258            .get_header_range(start..end)
259            .await
260            .enumerate()
261            .then(|(i, fetch)| async move {
262                fetch
263                    .with_timeout(fetch_timeout)
264                    .await
265                    .ok_or_else(|| Error::Custom {
266                        message: format!("missing header {}", start + i),
267                        status: StatusCode::NOT_FOUND,
268                    })
269            })
270            .try_collect::<Vec<_>>()
271            .await
272    };
273    let fetch_payloads = async move {
274        state
275            .get_payload_range(start..end)
276            .await
277            .enumerate()
278            .then(|(i, fetch)| async move {
279                fetch
280                    .with_timeout(fetch_timeout)
281                    .await
282                    .ok_or_else(|| Error::Custom {
283                        message: format!("missing payload {}", start + i),
284                        status: StatusCode::NOT_FOUND,
285                    })
286            })
287            .try_collect::<Vec<_>>()
288            .await
289    };
290    let fetch_vid_commons = async move {
291        state
292            .get_vid_common_range(start..end)
293            .await
294            .enumerate()
295            .then(|(i, fetch)| async move {
296                fetch
297                    .with_timeout(fetch_timeout)
298                    .await
299                    .ok_or_else(|| Error::Custom {
300                        message: format!("missing VID common {}", start + i),
301                        status: StatusCode::NOT_FOUND,
302                    })
303            })
304            .try_collect::<Vec<_>>()
305            .await
306    };
307    let (headers, (payloads, vid_commons)) =
308        try_join(fetch_headers, try_join(fetch_payloads, fetch_vid_commons)).await?;
309
310    izip!(headers, payloads, vid_commons)
311        .map(|(header, payload, vid_common)| {
312            let Some(ns_index) = header.ns_table().find_ns_id(&namespace.into()) else {
313                return Ok(NamespaceProof::not_present());
314            };
315            let ns_proof = NsProof::new(payload.data(), &ns_index, vid_common.common())
316                .ok_or_else(|| Error::Custom {
317                    message: "failed to construct namespace proof".into(),
318                    status: StatusCode::INTERNAL_SERVER_ERROR,
319                })?;
320            Ok(NamespaceProof::new(ns_proof, vid_common.common().clone()))
321        })
322        .collect()
323}
324
325#[derive(Debug)]
326pub(super) struct Options {
327    /// Timeout for failing requests due to missing data.
328    ///
329    /// If data needed to respond to a request is missing, it can (in some cases) be fetched from an
330    /// external provider. This parameter controls how long the request handler will wait for
331    /// missing data to be fetched before giving up and failing the request.
332    pub fetch_timeout: Duration,
333
334    /// The maximum number of large objects which can be loaded in a single range query.
335    ///
336    /// Large objects include anything that _might_ contain a full payload or an object proportional
337    /// in size to a payload. Note that this limit applies to the entire class of objects: we do not
338    /// check the size of objects while loading to determine which limit to apply. If an object
339    /// belongs to a class which might contain a large payload, the large object limit always
340    /// applies.
341    pub large_object_range_limit: usize,
342}
343
344impl Default for Options {
345    fn default() -> Self {
346        Self {
347            fetch_timeout: Duration::from_millis(500),
348            large_object_range_limit: availability::Options::default().large_object_range_limit,
349        }
350    }
351}
352
353pub(super) fn define_api<S, ApiVer: StaticVersionType + 'static>(
354    opt: Options,
355    api_ver: semver::Version,
356) -> Result<Api<S, Error, ApiVer>>
357where
358    S: ReadState + Send + Sync + 'static,
359    S::State: AvailabilityDataSource<SeqTypes>
360        + MerklizedStateDataSource<SeqTypes, BlockMerkleTree, { BlockMerkleTree::ARITY }>
361        + NodeStateDataSource
362        + StakeTableDataSource<SeqTypes>
363        + VersionedDataSource,
364    for<'a> <S::State as VersionedDataSource>::ReadOnly<'a>: NodeStorage<SeqTypes>,
365{
366    let toml = toml::from_str::<toml::Value>(include_str!("../../api/light-client.toml"))?;
367    let mut api = Api::<S, Error, ApiVer>::new(toml)?;
368    api.with_version(api_ver);
369
370    let Options {
371        fetch_timeout,
372        large_object_range_limit,
373    } = opt;
374
375    api.get("leaf", move |req, state| {
376        async move {
377            let requested_leaf = leaf_from_req(&req, state, fetch_timeout).await?;
378            let finalized = req
379                .opt_integer_param("finalized")
380                .map_err(bad_param("finalized"))?;
381
382            if let Some(finalized) = finalized {
383                get_leaf_proof_with_finalized_assumption(
384                    state,
385                    requested_leaf,
386                    finalized,
387                    fetch_timeout,
388                )
389                .await
390            } else if requested_leaf.header().version() >= NEW_PROTOCOL_VERSION {
391                get_leaf_proof_with_cert2(state, requested_leaf, fetch_timeout).await
392            } else {
393                get_leaf_proof_with_qc_chain(state, requested_leaf, fetch_timeout).await
394            }
395        }
396        .boxed()
397    })?
398    .get("header", move |req, state| {
399        async move {
400            let root = req.integer_param("root").map_err(bad_param("root"))?;
401            let requested = block_id_from_req(&req)?;
402            get_header_proof(state, root, requested, fetch_timeout).await
403        }
404        .boxed()
405    })?
406    .get("stake_table", move |req, state| {
407        async move {
408            let epoch: u64 = req.integer_param("epoch").map_err(bad_param("epoch"))?;
409
410            let node_state = state.node_state().await;
411            let epoch_height = node_state.epoch_height.ok_or_else(|| Error::Custom {
412                message: "epoch state not set".into(),
413                status: StatusCode::INTERNAL_SERVER_ERROR,
414            })?;
415            let first_epoch = epoch_from_block_number(node_state.epoch_start_block, epoch_height);
416
417            if epoch < first_epoch + 2 {
418                return Err(Error::Custom {
419                    message: format!("epoch must be at least {}", first_epoch + 2),
420                    status: StatusCode::BAD_REQUEST,
421                });
422            }
423
424            // Find the range of L1 block containing events for this epoch. This is determined by
425            // the `l1_finalized` field of the epoch root (from two epochs prior) and the previous
426            // epoch's epoch root.
427            let epoch_root_height = root_block_in_epoch(epoch - 2, epoch_height) as usize;
428            let epoch_root = state
429                .get_header(epoch_root_height)
430                .await
431                .with_timeout(fetch_timeout)
432                .await
433                .ok_or_else(|| {
434                    not_found(format!("missing epoch root header {epoch_root_height}"))
435                })?;
436            let to_l1_block = epoch_root
437                .l1_finalized()
438                .ok_or_else(|| Error::Custom {
439                    message: "epoch root header is missing L1 finalized block".into(),
440                    status: StatusCode::INTERNAL_SERVER_ERROR,
441                })?
442                .number();
443
444            let from_l1_block = if epoch >= first_epoch + 3 {
445                let prev_epoch_root_height = root_block_in_epoch(epoch - 3, epoch_height) as usize;
446                let prev_epoch_root = state
447                    .get_header(prev_epoch_root_height)
448                    .await
449                    .with_timeout(fetch_timeout)
450                    .await
451                    .ok_or_else(|| {
452                        not_found(format!(
453                            "missing previous epoch root header {prev_epoch_root_height}"
454                        ))
455                    })?;
456                prev_epoch_root
457                    .l1_finalized()
458                    .ok_or_else(|| Error::Custom {
459                        message: "previous epoch root header is missing L1 finalized block".into(),
460                        status: StatusCode::INTERNAL_SERVER_ERROR,
461                    })?
462                    .number()
463                    + 1
464            } else {
465                0
466            };
467
468            state
469                .stake_table_events(from_l1_block, to_l1_block)
470                .await
471                .map_err(|err| Error::Custom {
472                    message: format!("failed to load stake table events: {err:#}"),
473                    status: StatusCode::INTERNAL_SERVER_ERROR,
474                })
475        }
476        .boxed()
477    })?
478    .get("payload", move |req, state| {
479        async move {
480            let height: usize = req.integer_param("height").map_err(bad_param("height"))?;
481            let fetch_payload = async move {
482                state
483                    .get_payload(height)
484                    .await
485                    .with_timeout(fetch_timeout)
486                    .await
487                    .ok_or_else(|| Error::Custom {
488                        message: format!("missing payload {height}"),
489                        status: StatusCode::NOT_FOUND,
490                    })
491            };
492            let fetch_vid_common = async move {
493                state
494                    .get_vid_common(height)
495                    .await
496                    .with_timeout(fetch_timeout)
497                    .await
498                    .ok_or_else(|| Error::Custom {
499                        message: format!("missing VID common {height}"),
500                        status: StatusCode::NOT_FOUND,
501                    })
502            };
503            let (payload, vid_common) = try_join(fetch_payload, fetch_vid_common).await?;
504            Ok(PayloadProof::new(
505                payload.data().clone(),
506                vid_common.common().clone(),
507            ))
508        }
509        .boxed()
510    })?
511    .get("payload_range", move |req, state| {
512        async move {
513            let start: usize = req.integer_param("start").map_err(bad_param("start"))?;
514            let end: usize = req.integer_param("end").map_err(bad_param("end"))?;
515            let fetch_payloads = async move {
516                state.get_payload_range(start..end).await.enumerate().then(
517                    move |(i, fetch)| async move {
518                        fetch
519                            .with_timeout(fetch_timeout)
520                            .await
521                            .ok_or_else(|| Error::Custom {
522                                message: format!("missing payload {}", start + i),
523                                status: StatusCode::NOT_FOUND,
524                            })
525                    },
526                )
527            };
528            let fetch_vid_commons = async move {
529                state
530                    .get_vid_common_range(start..end)
531                    .await
532                    .enumerate()
533                    .then(move |(i, fetch)| async move {
534                        fetch
535                            .with_timeout(fetch_timeout)
536                            .await
537                            .ok_or_else(|| Error::Custom {
538                                message: format!("missing VID common {}", start + i),
539                                status: StatusCode::NOT_FOUND,
540                            })
541                    })
542            };
543            let (payloads, vid_commons) = join(fetch_payloads, fetch_vid_commons).await;
544            payloads
545                .zip(vid_commons)
546                .map(|(payload, vid_common)| {
547                    Ok(PayloadProof::new(
548                        payload?.data().clone(),
549                        vid_common?.common().clone(),
550                    ))
551                })
552                .try_collect::<Vec<_>>()
553                .await
554        }
555        .boxed()
556    })?
557    .get("namespace", move |req, state| {
558        async move {
559            let height = req.integer_param("height").map_err(bad_param("height"))?;
560            let namespace = req
561                .integer_param("namespace")
562                .map_err(bad_param("namespace"))?;
563            let mut proofs = get_namespace_proof_range(
564                state,
565                height,
566                height + 1,
567                namespace,
568                fetch_timeout,
569                large_object_range_limit,
570            )
571            .await?;
572            if proofs.len() != 1 {
573                tracing::error!(
574                    height,
575                    namespace,
576                    ?proofs,
577                    "get_namespace_proof_range should have returned exactly one proof"
578                );
579                return Err(Error::Custom {
580                    message: "internal consistency error".into(),
581                    status: StatusCode::INTERNAL_SERVER_ERROR,
582                });
583            }
584            Ok(proofs.remove(0))
585        }
586        .boxed()
587    })?
588    .get("namespace_range", move |req, state| {
589        async move {
590            let start = req.integer_param("start").map_err(bad_param("start"))?;
591            let end = req.integer_param("end").map_err(bad_param("end"))?;
592            let namespace = req
593                .integer_param("namespace")
594                .map_err(bad_param("namespace"))?;
595            get_namespace_proof_range(
596                state,
597                start,
598                end,
599                namespace,
600                fetch_timeout,
601                large_object_range_limit,
602            )
603            .await
604        }
605        .boxed()
606    })?;
607
608    Ok(api)
609}
610
611async fn leaf_from_req<S>(
612    req: &RequestParams,
613    state: &S,
614    fetch_timeout: Duration,
615) -> Result<LeafQueryData<SeqTypes>, Error>
616where
617    S: AvailabilityDataSource<SeqTypes>,
618{
619    let requested = if let Some(height) = req
620        .opt_integer_param::<_, usize>("height")
621        .map_err(bad_param("height"))?
622    {
623        LeafId::Number(height)
624    } else if let Some(hash) = req.opt_blob_param("hash").map_err(bad_param("hash"))? {
625        LeafId::Hash(hash)
626    } else if let Some(hash) = req
627        .opt_blob_param("block-hash")
628        .map_err(bad_param("block-hash"))?
629    {
630        let header = state
631            .get_header(BlockId::Hash(hash))
632            .await
633            .with_timeout(fetch_timeout)
634            .await
635            .ok_or_else(|| not_found(format!("unknown block hash {hash}")))?;
636        LeafId::Number(header.height() as usize)
637    } else if let Some(hash) = req
638        .opt_blob_param("payload-hash")
639        .map_err(bad_param("payload-hash"))?
640    {
641        let header = state
642            .get_header(BlockId::PayloadHash(hash))
643            .await
644            .with_timeout(fetch_timeout)
645            .await
646            .ok_or_else(|| not_found(format!("unknown payload hash {hash}")))?;
647        LeafId::Number(header.height() as usize)
648    } else {
649        return Err(Error::Custom {
650            message: "missing parameter: requested leaf must be identified by height, hash, block \
651                      hash, or payload hash"
652                .into(),
653            status: StatusCode::BAD_REQUEST,
654        });
655    };
656
657    state
658        .get_leaf(requested)
659        .await
660        .with_timeout(fetch_timeout)
661        .await
662        .ok_or_else(|| not_found(format!("unknown leaf {requested}")))
663}
664
665fn block_id_from_req(req: &RequestParams) -> Result<BlockId<SeqTypes>, Error> {
666    if let Some(height) = req
667        .opt_integer_param("height")
668        .map_err(bad_param("height"))?
669    {
670        Ok(BlockId::Number(height))
671    } else if let Some(hash) = req.opt_blob_param("hash").map_err(bad_param("hash"))? {
672        Ok(BlockId::Hash(hash))
673    } else if let Some(hash) = req
674        .opt_blob_param("payload-hash")
675        .map_err(bad_param("payload-hash"))?
676    {
677        Ok(BlockId::PayloadHash(hash))
678    } else {
679        Err(Error::Custom {
680            message: "missing parameter: requested header must be identified by height, hash, or \
681                      payload hash"
682                .into(),
683            status: StatusCode::BAD_REQUEST,
684        })
685    }
686}
687
688fn bad_param<E>(name: &'static str) -> impl FnOnce(E) -> Error
689where
690    E: Display,
691{
692    move |err| Error::Custom {
693        message: format!("{name}: {err:#}"),
694        status: StatusCode::BAD_REQUEST,
695    }
696}
697
698fn internal(err: impl Display) -> Error {
699    Error::Custom {
700        message: err.to_string(),
701        status: StatusCode::INTERNAL_SERVER_ERROR,
702    }
703}
704
705fn not_found(msg: impl Into<String>) -> Error {
706    Error::Custom {
707        message: msg.into(),
708        status: StatusCode::NOT_FOUND,
709    }
710}
711
712#[cfg(test)]
713mod test {
714    use std::marker::PhantomData;
715
716    use committable::Committable;
717    use espresso_types::BLOCK_MERKLE_TREE_HEIGHT;
718    use futures::future::join_all;
719    use hotshot_query_service::{
720        availability::{BlockQueryData, TransactionIndex, VidCommonQueryData},
721        data_source::{Transaction, storage::UpdateAvailabilityStorage},
722        merklized_state::UpdateStateData,
723    };
724    use hotshot_types::{simple_certificate::CertificatePair, simple_vote::Vote2Data};
725    use jf_merkle_tree_compat::{AppendableMerkleTreeScheme, ToTraversalPath};
726    use light_client::{
727        consensus::leaf::{FinalityProof, LeafProofHint},
728        testing::{
729            AlwaysTrueQuorum, ENABLE_EPOCHS, LEGACY_VERSION, TestClient, VersionCheckQuorum,
730            leaf_chain, leaf_chain_with_upgrade,
731        },
732    };
733    use tide_disco::Error;
734    use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_VERSION, NEW_PROTOCOL_VERSION};
735
736    use super::*;
737    use crate::api::{
738        data_source::{SequencerDataSource, testing::TestableSequencerDataSource},
739        sql::DataSource,
740    };
741
742    #[test_log::test(tokio::test(flavor = "multi_thread"))]
743    async fn test_two_chain() {
744        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
745        let ds = DataSource::create(
746            DataSource::persistence_options(&storage),
747            Default::default(),
748            false,
749        )
750        .await
751        .unwrap();
752
753        // Insert some leaves, forming a chain.
754        let leaves = leaf_chain(1..=3, EPOCH_VERSION).await;
755        {
756            let mut tx = ds.write().await.unwrap();
757            tx.insert_leaf(&leaves[0]).await.unwrap();
758            tx.insert_leaf(&leaves[1]).await.unwrap();
759            tx.insert_leaf(&leaves[2]).await.unwrap();
760            tx.commit().await.unwrap();
761        }
762
763        // Ask for the first leaf; it is proved finalized by the chain formed along with the second.
764        let proof = get_leaf_proof_with_qc_chain(&ds, leaves[0].clone(), Duration::MAX)
765            .await
766            .unwrap();
767        assert_eq!(
768            proof
769                .verify(LeafProofHint::Quorum(&AlwaysTrueQuorum))
770                .await
771                .unwrap(),
772            leaves[0]
773        );
774    }
775
776    #[test_log::test(tokio::test(flavor = "multi_thread"))]
777    async fn test_finalized() {
778        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
779        let ds = DataSource::create(
780            DataSource::persistence_options(&storage),
781            Default::default(),
782            false,
783        )
784        .await
785        .unwrap();
786
787        // Insert a single leaf. We will not be able to provide proofs ending in a leaf chain, but
788        // we can return a leaf if the leaf after it is already known to be finalized.
789        let leaves = leaf_chain(1..=2, EPOCH_VERSION).await;
790        {
791            let mut tx = ds.write().await.unwrap();
792            tx.insert_leaf(&leaves[0]).await.unwrap();
793            tx.commit().await.unwrap();
794        }
795
796        let proof =
797            get_leaf_proof_with_finalized_assumption(&ds, leaves[0].clone(), 2, Duration::MAX)
798                .await
799                .unwrap();
800        assert_eq!(
801            proof
802                .verify(LeafProofHint::assumption(leaves[1].leaf()))
803                .await
804                .unwrap(),
805            leaves[0]
806        );
807    }
808
809    #[test_log::test(tokio::test(flavor = "multi_thread"))]
810    async fn test_new_protocol_finalized_assumption() {
811        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
812        let ds = DataSource::create(
813            DataSource::persistence_options(&storage),
814            Default::default(),
815            false,
816        )
817        .await
818        .unwrap();
819
820        let leaves = leaf_chain(1..=2, NEW_PROTOCOL_VERSION).await;
821        {
822            let mut tx = ds.write().await.unwrap();
823            tx.insert_leaf(&leaves[0]).await.unwrap();
824            tx.commit().await.unwrap();
825        }
826
827        let proof =
828            get_leaf_proof_with_finalized_assumption(&ds, leaves[0].clone(), 2, Duration::MAX)
829                .await
830                .unwrap();
831        assert!(matches!(proof.proof(), FinalityProof::Assumption));
832        assert_eq!(
833            proof
834                .verify(LeafProofHint::assumption(leaves[1].leaf()))
835                .await
836                .unwrap(),
837            leaves[0]
838        );
839    }
840
841    #[test_log::test(tokio::test(flavor = "multi_thread"))]
842    async fn test_new_protocol_cert2() {
843        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
844        let ds = DataSource::create(
845            DataSource::persistence_options(&storage),
846            Default::default(),
847            false,
848        )
849        .await
850        .unwrap();
851
852        let leaves = leaf_chain(1..=2, NEW_PROTOCOL_VERSION).await;
853        let cert2_leaf = &leaves[1];
854        let cert2_data = Vote2Data {
855            leaf_commit: cert2_leaf.leaf().commit(),
856            epoch: cert2_leaf.qc().data.epoch.unwrap(),
857            block_number: cert2_leaf.height(),
858        };
859        let cert2 = espresso_types::Certificate2::new(
860            cert2_data.clone(),
861            cert2_data.commit(),
862            cert2_leaf.leaf().view_number(),
863            None,
864            PhantomData,
865        );
866
867        {
868            let mut tx = ds.write().await.unwrap();
869            tx.insert_leaf(&leaves[0]).await.unwrap();
870            tx.insert_leaf(cert2_leaf).await.unwrap();
871            tx.insert_cert2(cert2_leaf.height(), cert2).await.unwrap();
872            tx.commit().await.unwrap();
873        }
874
875        let proof = get_leaf_proof_with_cert2(&ds, leaves[0].clone(), Duration::MAX)
876            .await
877            .unwrap();
878        assert!(matches!(proof.proof(), FinalityProof::NewProtocol { .. }));
879        assert_eq!(
880            proof
881                .verify(LeafProofHint::Quorum(&AlwaysTrueQuorum))
882                .await
883                .unwrap(),
884            leaves[0]
885        );
886    }
887
888    #[test_log::test(tokio::test(flavor = "multi_thread"))]
889    async fn test_bad_finalized() {
890        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
891        let ds = DataSource::create(
892            DataSource::persistence_options(&storage),
893            Default::default(),
894            false,
895        )
896        .await
897        .unwrap();
898
899        // Insert a single leaf. If we request this leaf but provide a finalized leaf which is
900        // earlier, we should fail.
901        let leaves = leaf_chain(1..2, EPOCH_VERSION).await;
902        {
903            let mut tx = ds.write().await.unwrap();
904            tx.insert_leaf(&leaves[0]).await.unwrap();
905            tx.commit().await.unwrap();
906        }
907
908        let err =
909            get_leaf_proof_with_finalized_assumption(&ds, leaves[0].clone(), 0, Duration::MAX)
910                .await
911                .unwrap_err();
912        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
913    }
914
915    #[test_log::test(tokio::test(flavor = "multi_thread"))]
916    async fn test_no_chain() {
917        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
918        let ds = DataSource::create(
919            DataSource::persistence_options(&storage),
920            Default::default(),
921            false,
922        )
923        .await
924        .unwrap();
925
926        // Insert multiple leaves that don't chain. We will not be able to prove these are
927        // finalized.
928        let leaves = leaf_chain(1..=4, EPOCH_VERSION).await;
929        {
930            let mut tx = ds.write().await.unwrap();
931            tx.insert_leaf(&leaves[0]).await.unwrap();
932            tx.insert_leaf(&leaves[2]).await.unwrap();
933            tx.insert_leaf(&leaves[3]).await.unwrap();
934            tx.commit().await.unwrap();
935        }
936
937        let err = get_leaf_proof_with_qc_chain(&ds, leaves[0].clone(), Duration::from_secs(1))
938            .await
939            .unwrap_err();
940        assert_eq!(err.status(), StatusCode::NOT_FOUND);
941
942        // Even if we start from a finalized leave that extends one of the leaves we do have (4,
943        // extends 3) we fail to generate a proof because we can't generate a chain from the
944        // requested leaf (1) to the finalized leaf (4), since leaf 2 is missing.
945        let err = get_leaf_proof_with_finalized_assumption(
946            &ds,
947            leaves[0].clone(),
948            4,
949            Duration::from_secs(1),
950        )
951        .await
952        .unwrap_err();
953        assert_eq!(err.status(), StatusCode::NOT_FOUND);
954    }
955
956    #[test_log::test(tokio::test(flavor = "multi_thread"))]
957    async fn test_final_qcs() {
958        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
959        let ds = DataSource::create(
960            DataSource::persistence_options(&storage),
961            Default::default(),
962            false,
963        )
964        .await
965        .unwrap();
966
967        // Insert a single leaf, plus an extra QC chain proving it finalized.
968        let leaves = leaf_chain(1..=3, EPOCH_VERSION).await;
969        let qcs = [
970            CertificatePair::for_parent(leaves[1].leaf()),
971            CertificatePair::for_parent(leaves[2].leaf()),
972        ];
973        {
974            let mut tx = ds.write().await.unwrap();
975            tx.insert_leaf_with_qc_chain(&leaves[0], Some(qcs.clone()))
976                .await
977                .unwrap();
978            tx.commit().await.unwrap();
979        }
980
981        let proof = get_leaf_proof_with_qc_chain(&ds, leaves[0].clone(), Duration::MAX)
982            .await
983            .unwrap();
984        assert_eq!(
985            proof
986                .verify(LeafProofHint::Quorum(&AlwaysTrueQuorum))
987                .await
988                .unwrap(),
989            leaves[0]
990        );
991    }
992
993    #[test_log::test(tokio::test(flavor = "multi_thread"))]
994    async fn test_upgrade_to_epochs() {
995        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
996        let ds = DataSource::create(
997            DataSource::persistence_options(&storage),
998            Default::default(),
999            false,
1000        )
1001        .await
1002        .unwrap();
1003
1004        // Upgrade to epochs (and enabling HotStuff2) in the middle of a leaf chain, so that the
1005        // last leaf in the chain only requires 2 QCs to verify, even though at the start of the
1006        // chain we would have required 3.
1007        let leaves = leaf_chain_with_upgrade(1..=4, 2, ENABLE_EPOCHS).await;
1008        assert_eq!(leaves[0].header().version(), LEGACY_VERSION);
1009        assert_eq!(leaves[1].header().version(), DRB_AND_HEADER_UPGRADE_VERSION);
1010        let qcs = [
1011            CertificatePair::for_parent(leaves[2].leaf()),
1012            CertificatePair::for_parent(leaves[3].leaf()),
1013        ];
1014        {
1015            let mut tx = ds.write().await.unwrap();
1016            tx.insert_leaf(&leaves[0]).await.unwrap();
1017            tx.insert_leaf_with_qc_chain(&leaves[1], Some(qcs.clone()))
1018                .await
1019                .unwrap();
1020            tx.commit().await.unwrap();
1021        }
1022
1023        let proof = get_leaf_proof_with_qc_chain(&ds, leaves[0].clone(), Duration::MAX)
1024            .await
1025            .unwrap();
1026        assert_eq!(
1027            proof
1028                .verify(LeafProofHint::Quorum(&VersionCheckQuorum::new(
1029                    leaves.iter().map(|leaf| leaf.leaf().clone())
1030                )))
1031                .await
1032                .unwrap(),
1033            leaves[0]
1034        );
1035        assert!(matches!(proof.proof(), FinalityProof::HotStuff2 { .. }))
1036    }
1037
1038    #[tokio::test]
1039    #[test_log::test]
1040    async fn test_header_proof() {
1041        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
1042        let ds = DataSource::create(
1043            DataSource::persistence_options(&storage),
1044            Default::default(),
1045            false,
1046        )
1047        .await
1048        .unwrap();
1049
1050        // Construct a chain of leaves, plus the corresponding block Merkle tree at each leaf.
1051        let leaves = leaf_chain(0..=2, EPOCH_VERSION).await;
1052        let mts = leaves
1053            .iter()
1054            .scan(
1055                BlockMerkleTree::new(BLOCK_MERKLE_TREE_HEIGHT),
1056                |mt, leaf| {
1057                    assert_eq!(mt.commitment(), leaf.header().block_merkle_tree_root());
1058                    let item = mt.clone();
1059                    mt.push(leaf.block_hash()).unwrap();
1060                    Some(item)
1061                },
1062            )
1063            .collect::<Vec<_>>();
1064
1065        // Save all those objects in the DB.
1066        {
1067            let mut tx = ds.write().await.unwrap();
1068            for (leaf, mt) in leaves.iter().zip(&mts) {
1069                tx.insert_leaf(leaf).await.unwrap();
1070
1071                if leaf.height() > 0 {
1072                    let merkle_path = mt.lookup(leaf.height() - 1).expect_ok().unwrap().1;
1073                    UpdateStateData::<SeqTypes, BlockMerkleTree, _>::insert_merkle_nodes(
1074                        &mut tx,
1075                        merkle_path,
1076                        ToTraversalPath::<{ BlockMerkleTree::ARITY }>::to_traversal_path(
1077                            &(leaf.height() - 1),
1078                            BLOCK_MERKLE_TREE_HEIGHT,
1079                        ),
1080                        leaf.height(),
1081                    )
1082                    .await
1083                    .unwrap();
1084                    UpdateStateData::<SeqTypes, BlockMerkleTree, _>::set_last_state_height(
1085                        &mut tx,
1086                        leaf.height() as usize,
1087                    )
1088                    .await
1089                    .unwrap();
1090                }
1091            }
1092            tx.commit().await.unwrap();
1093        }
1094
1095        // Test happy path.
1096        for (root, mt) in mts.iter().enumerate().skip(1) {
1097            for (height, leaf) in leaves.iter().enumerate().take(root) {
1098                tracing::info!(root, height, "test happy path");
1099                let proof =
1100                    get_header_proof(&ds, root as u64, BlockId::Number(height), Duration::MAX)
1101                        .await
1102                        .unwrap();
1103                assert_eq!(proof.verify_ref(mt.commitment()).unwrap(), leaf.header());
1104            }
1105        }
1106
1107        // Test unknown leaf.
1108        let err = get_header_proof(&ds, 5, BlockId::Number(4), Duration::from_secs(1))
1109            .await
1110            .unwrap_err();
1111        assert_eq!(err.status(), StatusCode::NOT_FOUND);
1112
1113        // Test height >= root.
1114        let err = get_header_proof(&ds, 1, BlockId::Number(1), Duration::MAX)
1115            .await
1116            .unwrap_err();
1117        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1118    }
1119
1120    #[tokio::test]
1121    #[test_log::test]
1122    async fn test_namespace_proof() {
1123        let storage = <DataSource as TestableSequencerDataSource>::create_storage().await;
1124        let ds = DataSource::create(
1125            DataSource::persistence_options(&storage),
1126            Default::default(),
1127            false,
1128        )
1129        .await
1130        .unwrap();
1131
1132        // Construct a chain of blocks.
1133        let client = TestClient::default();
1134        let leaves = join_all((0..=2).map(|i| client.leaf(i))).await;
1135        let payloads = join_all((0..=2).map(|i| client.payload(i))).await;
1136        let vid_commons = join_all((0..=2).map(|i| client.vid_common(i))).await;
1137
1138        // Save all those objects in the DB.
1139        {
1140            let mut tx = ds.write().await.unwrap();
1141            for (leaf, payload, vid_common) in izip!(&leaves, &payloads, &vid_commons) {
1142                tx.insert_leaf(leaf).await.unwrap();
1143                tx.insert_block(&BlockQueryData::<SeqTypes>::new(
1144                    leaf.header().clone(),
1145                    payload.clone(),
1146                ))
1147                .await
1148                .unwrap();
1149                tx.insert_vid(
1150                    &VidCommonQueryData::<SeqTypes>::new(leaf.header().clone(), vid_common.clone()),
1151                    None,
1152                )
1153                .await
1154                .unwrap();
1155            }
1156            tx.commit().await.unwrap();
1157        }
1158
1159        // Test happy path: all blocks.
1160        let ns = payloads[0]
1161            .transaction(&TransactionIndex {
1162                ns_index: 0.into(),
1163                position: 0,
1164            })
1165            .unwrap()
1166            .namespace();
1167        let proofs = get_namespace_proof_range(&ds, 0, 3, ns.into(), Duration::MAX, 100)
1168            .await
1169            .unwrap();
1170        assert_eq!(proofs.len(), 3);
1171        for (leaf, proof) in leaves.iter().zip(proofs) {
1172            proof.verify(leaf.header(), ns).unwrap();
1173        }
1174
1175        // Test happy path: subset.
1176        let tx = payloads[1]
1177            .transaction(&TransactionIndex {
1178                ns_index: 0.into(),
1179                position: 0,
1180            })
1181            .unwrap();
1182        let ns = tx.namespace();
1183        let proofs = get_namespace_proof_range(&ds, 1, 2, ns.into(), Duration::MAX, 100)
1184            .await
1185            .unwrap();
1186        assert_eq!(proofs.len(), 1);
1187        assert_eq!(proofs[0].verify(leaves[1].header(), ns).unwrap(), [tx]);
1188
1189        // Test missing data in range.
1190        let err = get_namespace_proof_range(&ds, 0, 4, ns.into(), Duration::from_secs(1), 100)
1191            .await
1192            .unwrap_err();
1193        assert_eq!(err.status(), StatusCode::NOT_FOUND);
1194
1195        // Test invalid range.
1196        let err = get_namespace_proof_range(&ds, 1, 0, ns.into(), Duration::from_secs(1), 100)
1197            .await
1198            .unwrap_err();
1199        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1200        assert!(
1201            err.to_string().contains("requested empty interval"),
1202            "{err:#}"
1203        );
1204
1205        // Test large range.
1206        let err = get_namespace_proof_range(&ds, 0, 10_000, ns.into(), Duration::from_secs(1), 100)
1207            .await
1208            .unwrap_err();
1209        assert_eq!(err.status(), StatusCode::BAD_REQUEST);
1210        assert!(err.to_string().contains("exceeds maximum size"), "{err:#}");
1211    }
1212}