light_client/
state.rs

1//! Client-side state used to implement light client fetching and verification.
2
3use std::{collections::BTreeMap, future::Future, sync::Arc};
4
5use anyhow::{Context, Result, ensure};
6use async_lock::RwLock;
7use committable::Committable;
8use espresso_types::{
9    Header, Leaf2, NamespaceId, PubKey, SeqTypes, StakeTableState, Transaction,
10    select_active_validator_set,
11};
12use hotshot_query_service::{
13    availability::{BlockQueryData, LeafId, LeafQueryData, PayloadQueryData, VidCommonQueryData},
14    node::BlockId,
15    types::HeightIndexed,
16};
17use hotshot_types::{data::EpochNumber, stake_table::StakeTableEntry, utils::root_block_in_epoch};
18use serde::{Deserialize, Serialize};
19
20use crate::{
21    client::Client,
22    consensus::{
23        leaf::LeafProofHint,
24        quorum::{Quorum, StakeTable, StakeTablePair, StakeTableQuorum},
25    },
26    storage::{LeafRequest, Storage},
27};
28
29/// Initial state for a [`LightClient`].
30///
31/// This [`Genesis`] forms the root of trust for a light client. It defines the initial stake table
32/// which is used to verify blocks before the first epoch, which in turn allows it to verify
33/// transitions to subsequent stake tables. Thus, this genesis must be configured correctly (i.e.
34/// matching the genesis state of honest HotShot nodes) or else the light client may not operate
35/// correctly.
36#[derive(Clone, Debug, Deserialize, Serialize)]
37pub struct Genesis {
38    /// The number of blocks in an epoch.
39    pub epoch_height: u64,
40
41    /// The first epoch where the stake table came from the contract, rather than the genesis stake
42    /// table.
43    pub first_epoch_with_dynamic_stake_table: EpochNumber,
44
45    /// The fixed stake table used before epochs begin.
46    pub stake_table: Vec<StakeTableEntry<PubKey>>,
47
48    /// Enable special cases for Decaf testnet.
49    ///
50    /// On Decaf, `first_epoch_with_dynamic_stake_table` is not actually the first epoch of PoS, but
51    /// the first Epoch after the upgrade to version 0.4 (version 0.3 is completely unsupported
52    /// since it will never be deployed on Mainnet). Thus, when we perform stake table catchup on
53    /// Decaf, we need to replay all events from epochs between the upgrade to proof-of-stake (this
54    /// number) and the upgrade to version 0.4.
55    #[cfg(feature = "decaf")]
56    #[serde(default)]
57    pub decaf_first_pos_epoch: Option<EpochNumber>,
58}
59
60#[derive(Clone, Debug)]
61#[cfg_attr(feature = "clap", derive(clap::Parser))]
62pub struct LightClientOptions {
63    /// Maximum number of stake tables to cache in memory at any given time.
64    #[cfg_attr(
65        feature = "clap",
66        clap(
67            long = "light-client-num-stake-tables",
68            env = "LIGHT_CLIENT_NUM_STAKE_TABLES",
69            default_value = "100",
70        )
71    )]
72    pub num_stake_tables_in_memory: usize,
73}
74
75impl Default for LightClientOptions {
76    fn default() -> Self {
77        Self {
78            num_stake_tables_in_memory: 100,
79        }
80    }
81}
82
83/// Client-side state required to implement the light client interface.
84///
85/// A [`LightClient`] can always be created [from scratch](Self::from_genesis), with no state, since
86/// ultimately all data is fetched and verified from external query nodes. However, having some
87/// persistent state can make it more efficient to use a [`LightClient`] over a long period of time,
88/// as important artifacts can be cached locally, avoiding the need to frequently re-fetch and
89/// verify them.
90#[derive(Debug)]
91pub struct LightClient<P, S> {
92    db: P,
93    server: S,
94    opt: LightClientOptions,
95
96    epoch_height: u64,
97    first_epoch_with_dynamic_stake_table: EpochNumber,
98    genesis_stake_table: Arc<StakeTable>,
99
100    #[cfg(feature = "decaf")]
101    decaf_first_pos_epoch: Option<EpochNumber>,
102
103    // We cache stake tables in memory since they are large and expensive to load from the database.
104    stake_tables: RwLock<BTreeMap<EpochNumber, Arc<StakeTable>>>,
105}
106
107impl<P, S> LightClient<P, S>
108where
109    P: Storage,
110    S: Client,
111{
112    /// Create a light client from scratch, with no state.
113    ///
114    /// State will automatically be populated as queries are made. The provided genesis becomes the
115    /// root of trust for verifying all state that is subsequently loaded by the light client. If
116    /// the genesis is not correct (i.e. matching the genesis used by honest HotShot nodes) the
117    /// light client may verify incorrect data, or fail to verify correct data.
118    pub fn from_genesis(db: P, server: S, genesis: Genesis) -> Self {
119        Self::from_genesis_with_options(db, server, genesis, Default::default())
120    }
121
122    /// Create a light client from scratch, with no state, using the given options.
123    ///
124    /// State will automatically be populated as queries are made. The provided genesis becomes the
125    /// root of trust for verifying all state that is subsequently loaded by the light client. If
126    /// the genesis is not correct (i.e. matching the genesis used by honest HotShot nodes) the
127    /// light client may verify incorrect data, or fail to verify correct data.
128    pub fn from_genesis_with_options(
129        db: P,
130        server: S,
131        genesis: Genesis,
132        opt: LightClientOptions,
133    ) -> Self {
134        Self {
135            db,
136            server,
137            opt,
138            epoch_height: genesis.epoch_height,
139            genesis_stake_table: Arc::new(genesis.stake_table.into()),
140            first_epoch_with_dynamic_stake_table: genesis.first_epoch_with_dynamic_stake_table,
141            stake_tables: Default::default(),
142
143            #[cfg(feature = "decaf")]
144            decaf_first_pos_epoch: genesis.decaf_first_pos_epoch,
145        }
146    }
147
148    /// Get the number of known blocks in the chain.
149    ///
150    /// This is equivalent to one more than the block number of the latest known block. The latest
151    /// known block may come either from the local light client database or from an untrusted query
152    /// service (in which case the corresponding leaf is fetched and verified to ensure there is in
153    /// fact such a block). Note however that it is always possible that neither this light client
154    /// nor the connected server is aware of the true latest block, and so in rare cases the result
155    /// of this method may be an underestimate.
156    pub async fn block_height(&self) -> Result<u64> {
157        let latest_known = self.db.block_height().await?;
158        let latest_from_server = self.server.block_height().await?;
159        if latest_from_server > latest_known {
160            // The server claims there is a newer block than we previously knew about. Verify that
161            // this is the case by requesting a finality proof for the corresponding leaf.
162            if let Err(err) = self
163                .fetch_leaf(LeafId::Number(latest_from_server as usize - 1))
164                .await
165            {
166                tracing::warn!(
167                    latest_known,
168                    latest_from_server,
169                    "failed to verify block height claimed by server: {err:#}"
170                );
171                return Ok(latest_known);
172            } else {
173                return Ok(latest_from_server);
174            }
175        }
176        Ok(latest_known)
177    }
178
179    /// Fetch and verify the requested leaf.
180    pub async fn fetch_leaf(&self, id: LeafId<SeqTypes>) -> Result<LeafQueryData<SeqTypes>> {
181        self.fetch_leaf_with_quorum(id, |epoch| {
182            StakeTableQuorum::new((epoch, self), self.epoch_height)
183        })
184        .await
185    }
186
187    async fn fetch_leaf_with_quorum<Q>(
188        &self,
189        id: LeafId<SeqTypes>,
190        quorum: impl Send + FnOnce(EpochNumber) -> Q,
191    ) -> Result<LeafQueryData<SeqTypes>>
192    where
193        Q: Send + Quorum,
194    {
195        let upper_bound = self.db.leaf_upper_bound(id).await?;
196        let known_finalized = if let Some(upper_bound) = upper_bound {
197            if leaf_matches_id(&upper_bound, id) {
198                return Ok(upper_bound);
199            }
200
201            Some(upper_bound)
202        } else {
203            None
204        };
205        let known_finalized = known_finalized.as_ref().map(LeafQueryData::leaf);
206        self.fetch_leaf_from_server(id, known_finalized, quorum)
207            .await
208    }
209
210    /// Fetches leaves in range [start_height, end_height)
211    pub async fn fetch_leaves_in_range(
212        &self,
213        start_height: usize,
214        end_height: usize,
215    ) -> Result<Vec<LeafQueryData<SeqTypes>>> {
216        ensure!(
217            start_height < end_height,
218            "invalid range: start must be < end"
219        );
220
221        // first try to fetch all the leaves from the local database
222        let leaves = self
223            .db
224            .get_leaves_in_range(start_height as u32, end_height as u32)
225            .await?;
226
227        if leaves.len() == end_height - start_height {
228            // we have all the leaves in the range
229            return Ok(leaves);
230        }
231
232        // Fetch the last leaf in the range as our known finalized anchor point
233        let known_end_leaf = self.fetch_leaf(LeafId::Number(end_height - 1)).await?;
234
235        // at this point, we know the end leaf is valid and is finalized
236        // now we need to fetch all leaves from start to end - 1 from the server
237        let leaves = self.fetch_leaves_in_range_from_server(start_height, end_height - 1, &known_end_leaf)
238            .await
239        // add the known end leaf to the result
240        .map(|mut leaves| {
241            leaves.push(known_end_leaf);
242            leaves
243        })?;
244        Ok(leaves)
245    }
246
247    /// Fetches headers in range [start_height, end_height)
248    pub async fn fetch_headers_in_range(
249        &self,
250        start_height: usize,
251        end_height: usize,
252    ) -> Result<Vec<Header>> {
253        ensure!(
254            start_height < end_height,
255            "invalid range: start must be < end"
256        );
257
258        // Reuse the verified leaf path to guarantee header correctness.
259        let leaves = self.fetch_leaves_in_range(start_height, end_height).await?;
260        Ok(leaves
261            .into_iter()
262            .map(|leaf| leaf.header().clone())
263            .collect())
264    }
265
266    /// Fetches leaves from the server in range [start_height, end_height) and verifies them by
267    /// walking backwards from the known finalized leaf, ensuring each leaf's hash matches the
268    /// parent commitment of the subsequent leaf.
269    async fn fetch_leaves_in_range_from_server(
270        &self,
271        start_height: usize,
272        end_height: usize,
273        known_finalized: &LeafQueryData<SeqTypes>,
274    ) -> Result<Vec<LeafQueryData<SeqTypes>>> {
275        // we will fetch all the leaves till the known finalized leaf
276        let leaves = self
277            .server
278            // `get_leaves_in_range` is exclusive of the end height
279            // which we dont need because we already know the end leaf
280            .get_leaves_in_range(start_height, end_height)
281            .await?;
282
283        ensure!(
284            leaves.len() == end_height.saturating_sub(start_height),
285            "server returned {} leaves for range [{}, {})",
286            leaves.len(),
287            start_height,
288            end_height
289        );
290
291        // Walk backwards from the known finalized leaf, ensuring each parent hash matches
292        let mut expected_parent = known_finalized.leaf().parent_commitment();
293        for leaf in leaves.iter().rev() {
294            let leaf_hash = leaf.hash();
295            ensure!(
296                leaf_hash == expected_parent,
297                "leaf hash mismatch: expected parent hash {:?}, got leaf hash {:?}",
298                expected_parent,
299                leaf_hash
300            );
301            expected_parent = leaf.leaf().parent_commitment();
302        }
303
304        // Cache the fetched leaves, but still return them even if caching fails
305        for leaf in &leaves {
306            if let Err(err) = self.db.insert_leaf(leaf.clone()).await {
307                tracing::warn!(
308                    "failed to cache leaf at height {}: {:#?}",
309                    leaf.height(),
310                    err
311                )
312            }
313        }
314
315        Ok(leaves)
316    }
317
318    /// Fetch and verify the requested header.
319    pub async fn fetch_header(&self, id: BlockId<SeqTypes>) -> Result<Header> {
320        self.fetch_header_with_quorum(id, |epoch| {
321            StakeTableQuorum::new((epoch, self), self.epoch_height)
322        })
323        .await
324    }
325
326    async fn fetch_header_with_quorum<Q>(
327        &self,
328        id: BlockId<SeqTypes>,
329        quorum: impl Send + FnOnce(EpochNumber) -> Q,
330    ) -> Result<Header>
331    where
332        Q: Send + Quorum,
333    {
334        if let Some(leaf) = self.db.leaf_upper_bound(id).await? {
335            if leaf_matches_id(&leaf, id) {
336                // If we have the leaf for the requested header in our database already, we can just
337                // extract the header.
338                return Ok(leaf.header().clone());
339            } else {
340                // Otherwise, if we have a leaf that is known to be greater than the requested
341                // header, we can ask the server for an inclusion proof for the requested header
342                // relative to the Merkle root in the upper bound leaf.
343                let proof = self.server.header_proof(leaf.height(), id).await?;
344                let header = proof
345                    .verify(leaf.header().block_merkle_tree_root())
346                    .context("invalid header proof")?;
347
348                // The server has given us a header and correctly proved it finalized, but we still
349                // need to verify that it actually gave us the header we requested.
350                ensure!(
351                    header_matches_id(&header, id),
352                    "server returned a valid header proof for the wrong header (requested header \
353                     {id}, got header {} with hash {})",
354                    header.height(),
355                    header.commit(),
356                );
357
358                return Ok(header);
359            }
360        }
361
362        // We have neither the requested header nor an upper bound for it. All we can do is fetch
363        // the corresponding leaf from the server (verifying a leaf proof) and then extract the
364        // header from there.
365        let leaf = self.fetch_leaf_from_server(id, None, quorum).await?;
366        Ok(leaf.header().clone())
367    }
368
369    /// Fetch and verify the requested payload.
370    pub async fn fetch_payload(&self, id: BlockId<SeqTypes>) -> Result<PayloadQueryData<SeqTypes>> {
371        Ok(self.fetch_block(id).await?.into())
372    }
373
374    /// Fetch and verify the full payload with the requested header.
375    pub async fn fetch_payload_for_header(
376        &self,
377        header: Header,
378    ) -> Result<PayloadQueryData<SeqTypes>> {
379        Ok(self.fetch_block_for_header(header).await?.into())
380    }
381
382    /// Fetch and verify the requested block.
383    pub async fn fetch_block(&self, id: BlockId<SeqTypes>) -> Result<BlockQueryData<SeqTypes>> {
384        Ok(self.fetch_block_and_vid_common(id).await?.0)
385    }
386
387    /// Fetch and verify the full block with the requested header.
388    pub async fn fetch_block_for_header(&self, header: Header) -> Result<BlockQueryData<SeqTypes>> {
389        Ok(self.fetch_block_and_vid_common_for_header(header).await?.0)
390    }
391
392    /// Fetch and verify the requested payload and the associated VID common data.
393    pub async fn fetch_block_and_vid_common(
394        &self,
395        id: BlockId<SeqTypes>,
396    ) -> Result<(BlockQueryData<SeqTypes>, VidCommonQueryData<SeqTypes>)> {
397        let header = self.fetch_header(id).await?;
398        self.fetch_block_and_vid_common_for_header(header).await
399    }
400
401    /// Fetch and verify the payload corresponding to `header`, and the associated VID common data.
402    pub async fn fetch_block_and_vid_common_for_header(
403        &self,
404        header: Header,
405    ) -> Result<(BlockQueryData<SeqTypes>, VidCommonQueryData<SeqTypes>)> {
406        let proof = self.server.payload_proof(header.height()).await?;
407        let (payload, vid_common) = proof.verify_with_vid_common(&header)?;
408        Ok((
409            BlockQueryData::new(header.clone(), payload),
410            VidCommonQueryData::new(header, vid_common),
411        ))
412    }
413
414    /// Fetch and verify the transactions in the given namespace of the requested block.
415    pub async fn fetch_namespace(
416        &self,
417        id: BlockId<SeqTypes>,
418        namespace: NamespaceId,
419    ) -> Result<Vec<Transaction>> {
420        let header = self.fetch_header(id).await?;
421        self.fetch_namespace_for_header(&header, namespace).await
422    }
423
424    /// Fetch and verify the transactions in the given namespace of the requested header.
425    pub async fn fetch_namespace_for_header(
426        &self,
427        header: &Header,
428        namespace: NamespaceId,
429    ) -> Result<Vec<Transaction>> {
430        let proof = self
431            .server
432            .namespace_proof(header.height(), namespace)
433            .await?;
434        proof.verify(header, namespace)
435    }
436
437    /// Fetch and verify the transactions in the given namespace of blocks in the range
438    /// `[start_height, end_height)`.
439    pub async fn fetch_namespaces_in_range(
440        &self,
441        start_height: usize,
442        end_height: usize,
443        namespace: NamespaceId,
444    ) -> Result<Vec<Vec<Transaction>>> {
445        let headers = self
446            .fetch_headers_in_range(start_height, end_height)
447            .await?;
448        let proofs = self
449            .server
450            .namespace_proofs_in_range(start_height as u64, end_height as u64, namespace)
451            .await?;
452        ensure!(
453            proofs.len() == headers.len(),
454            "server returned wrong number of namespace proofs (expected {}, got {})",
455            headers.len(),
456            proofs.len()
457        );
458        proofs
459            .into_iter()
460            .zip(&headers)
461            .map(|(proof, header)| proof.verify(header, namespace))
462            .collect()
463    }
464
465    /// Fetch and verify the stake table for the requested epoch.
466    pub async fn quorum_for_epoch(&self, epoch: EpochNumber) -> Result<Arc<StakeTable>> {
467        if epoch < self.first_epoch_with_dynamic_stake_table {
468            return Ok(self.genesis_stake_table.clone());
469        }
470
471        // Check cache for the desired stake table.
472        {
473            let cache = self.stake_tables.read().await;
474            if let Some(stake_table) = cache.get(&epoch) {
475                tracing::debug!(%epoch, "found stake table in cache");
476                return Ok(stake_table.clone());
477            }
478        }
479
480        // If we didn't find the exact stake table we are looking for in cache, look for it in our
481        // local database, or an earlier one we can catch up from.
482        let (lower_bound, mut stake_table, mut prev_quorum) =
483            if let Some((lower_bound, stake_table)) = self.db.stake_table_lower_bound(epoch).await?
484            {
485                if lower_bound == epoch {
486                    // We have the exact quorum we requested already in our database. Add it to cache
487                    // and return it.
488                    tracing::debug!(%epoch, "found stake table in database");
489                    let quorum = stake_table_state_to_quorum(stake_table)?;
490                    return Ok(self.cache_stake_table(epoch, Arc::new(quorum)).await);
491                }
492
493                (
494                    lower_bound,
495                    stake_table.clone(),
496                    Arc::new(stake_table_state_to_quorum(stake_table)?),
497                )
498            } else {
499                // We don't have any stake table earlier than `epoch` as a starting point, so we must
500                // start from the genesis state.
501                (
502                    self.first_epoch_with_dynamic_stake_table - 1,
503                    StakeTableState::default(),
504                    self.genesis_stake_table.clone(),
505                )
506            };
507        tracing::info!(from = %lower_bound, to = %epoch, "performing stake table catchup");
508
509        // On decaf, replay the events from epochs on version 0.3 without checking stake table
510        // hashes (since these were only added in version 0.4). We will effectively check all this
511        // work at once when we check the stake table hash after the first epoch of version 0.4
512        #[cfg(feature = "decaf")]
513        if lower_bound < self.first_epoch_with_dynamic_stake_table
514            && let Some(first_pos_epoch) = self.decaf_first_pos_epoch
515        {
516            tracing::info!(
517                %first_pos_epoch,
518                to = %lower_bound,
519                "performing Decaf catchup through version 0.3",
520            );
521            for epoch in *first_pos_epoch..=*lower_bound {
522                let events = self
523                    .server
524                    .stake_table_events(EpochNumber::new(epoch))
525                    .await?;
526                tracing::debug!(epoch, num_events = events.len(), "reconstruct stake table");
527                for event in events {
528                    tracing::debug!(epoch, ?event, "replay event");
529                    if let Err(err) = stake_table.apply_event(event).context("applying event")? {
530                        tracing::warn!("allowed error in event: {err:#}");
531                    }
532                }
533            }
534            prev_quorum = Arc::new(stake_table_state_to_quorum(stake_table.clone())?);
535        }
536
537        // Replay one epoch at a time from the lower bound stake table to the requested epoch.
538        for epoch in *lower_bound + 1..=*epoch {
539            let events = self
540                .server
541                .stake_table_events(EpochNumber::new(epoch))
542                .await?;
543            tracing::debug!(epoch, num_events = events.len(), "reconstruct stake table");
544            for event in events {
545                tracing::debug!(epoch, ?event, "replay event");
546                if let Err(err) = stake_table.apply_event(event).context("applying event")? {
547                    tracing::warn!("allowed error in event: {err:#}");
548                }
549            }
550            let next_quorum = Arc::new(stake_table_state_to_quorum(stake_table.clone())?);
551
552            // Since we are reconstructing based on events from an untrusted server, we need to
553            // compare the hash of the stake table after each epoch to the hash recorded in the
554            // epoch root header, which is certified by the previous stake table.
555            let root_height = root_block_in_epoch(epoch - 1, self.epoch_height);
556            let root = self
557                .fetch_header_with_quorum(BlockId::Number(root_height as usize), |_| {
558                    StakeTableQuorum::new((prev_quorum, next_quorum.clone()), self.epoch_height)
559                })
560                .await
561                .context("fetching epoch root for {epoch}")?;
562            let hash = root.next_stake_table_hash().context(format!(
563                "epoch {epoch} root {root_height} does not have next stake table hash"
564            ))?;
565            ensure!(
566                hash == stake_table.commit(),
567                "epoch {epoch} root {root_height} stake table hash {hash} does not match \
568                 reconstructed hash {}",
569                stake_table.commit(),
570            );
571
572            // Cache the reconstructed stake table in the database.
573            if let Err(err) = self
574                .db
575                .insert_stake_table(EpochNumber::new(epoch), &stake_table)
576                .await
577            {
578                // If this fails, we can continue with the stake table that we have in memory right
579                // now, so this is just a warning.
580                tracing::warn!(epoch, "failed to cache stake table: {err:#}");
581            }
582
583            prev_quorum = next_quorum;
584        }
585
586        Ok(self.cache_stake_table(epoch, prev_quorum).await)
587    }
588
589    fn fetch_leaf_from_server<'a, 'b, Q>(
590        &'a self,
591        id: impl Send + Into<LeafRequest> + 'a,
592        known_finalized: Option<&'b Leaf2>,
593        make_quorum: impl 'a + Send + FnOnce(EpochNumber) -> Q,
594    ) -> impl 'b + Send + Future<Output = Result<LeafQueryData<SeqTypes>>>
595    where
596        'a: 'b,
597        Q: Send + Quorum,
598    {
599        async move {
600            let id = id.into();
601            let proof = self
602                .server
603                .leaf_proof(id, known_finalized.map(Leaf2::height))
604                .await?;
605            let quorum;
606            let hint = match proof.proof().epoch() {
607                Some(epoch) => {
608                    quorum = make_quorum(epoch);
609                    LeafProofHint::Quorum(&quorum)
610                },
611                None => LeafProofHint::Assumption(known_finalized.context(
612                    "server returned proof with assumption, but we have no finalized upper bound \
613                     to verify assumption",
614                )?),
615            };
616            let leaf = proof.verify(hint).await?;
617
618            // The server has given us a leaf and correctly proved it finalized, but we still need to
619            // verify that it actually gave us the leaf we requested.
620            ensure!(
621                leaf_matches_id(&leaf, id),
622                "server returned a valid leaf proof for the wrong leaf (requested leaf {id}, got \
623                 leaf {} with hash {})",
624                leaf.height(),
625                leaf.hash(),
626            );
627
628            // Having fetched and verified the leaf from the server, we can now cache it locally to
629            // improve future requests.
630            if let Err(err) = self.db.insert_leaf(leaf.clone()).await {
631                // If this fails, we can still successfully return the leaf that we have in memory right
632                // now, so this is just a warning.
633                tracing::warn!("failed to cache fetched leaf: {err:#}");
634            }
635
636            Ok(leaf)
637        }
638    }
639
640    async fn cache_stake_table(
641        &self,
642        epoch: EpochNumber,
643        stake_table: Arc<StakeTable>,
644    ) -> Arc<StakeTable> {
645        let mut cache = self.stake_tables.write().await;
646
647        // If inserting the new stake table would cause the cache to exceed its maximum size, first
648        // delete an old stake table.
649        if cache.len() >= self.opt.num_stake_tables_in_memory {
650            // Always delete the _second oldest_ stake table. We want to keep the oldest around
651            // because it is the hardest to catch up for if we need it again (we would have to go
652            // all the way back to genesis). The second oldest is the least likely to be used again
653            // after the oldest, while still being easy to replay if we do need it (because we can
654            // just replay from the cached oldest).
655            if let Some(&second_oldest_epoch) = cache.keys().nth(1) {
656                cache.remove(&second_oldest_epoch);
657            }
658        }
659
660        cache.entry(epoch).insert_entry(stake_table).get().clone()
661    }
662}
663
664fn stake_table_state_to_quorum(state: StakeTableState) -> Result<StakeTable> {
665    let validators = state.into_validators();
666    let active_validators = select_active_validator_set(&validators)?;
667    Ok(active_validators
668        .into_values()
669        .map(|validator| StakeTableEntry {
670            stake_key: validator.stake_table_key,
671            stake_amount: validator.stake,
672        })
673        .collect())
674}
675
676impl<P, S> StakeTablePair for (EpochNumber, &LightClient<P, S>)
677where
678    P: Storage,
679    S: Client,
680{
681    async fn stake_table(&self) -> Result<Arc<StakeTable>> {
682        self.1.quorum_for_epoch(self.0).await
683    }
684
685    async fn next_epoch_stake_table(&self) -> Result<Arc<StakeTable>> {
686        self.1.quorum_for_epoch(self.0 + 1).await
687    }
688}
689
690fn leaf_matches_id(leaf: &LeafQueryData<SeqTypes>, id: impl Into<LeafRequest>) -> bool {
691    match id.into() {
692        LeafRequest::Leaf(LeafId::Number(h)) | LeafRequest::Header(BlockId::Number(h)) => {
693            (h as u64) == leaf.height()
694        },
695        LeafRequest::Leaf(LeafId::Hash(h)) => h == leaf.hash(),
696        LeafRequest::Header(BlockId::Hash(h)) => h == leaf.block_hash(),
697        LeafRequest::Header(BlockId::PayloadHash(h)) => h == leaf.payload_hash(),
698    }
699}
700
701fn header_matches_id(header: &Header, id: BlockId<SeqTypes>) -> bool {
702    match id {
703        BlockId::Number(n) => header.height() == (n as u64),
704        BlockId::Hash(h) => header.commit() == h,
705        BlockId::PayloadHash(h) => header.payload_commitment() == h,
706    }
707}
708
709#[cfg(test)]
710mod test {
711    use espresso_types::NsIndex;
712    use hotshot_query_service::availability::TransactionIndex;
713    use pretty_assertions::assert_eq;
714    use versions::DRB_AND_HEADER_UPGRADE_VERSION;
715
716    use super::*;
717    use crate::{
718        storage::SqliteStorage,
719        testing::{TestClient, leaf_chain},
720    };
721
722    #[tokio::test]
723    #[test_log::test]
724    async fn test_block_height() {
725        let client = TestClient::default();
726        let db = SqliteStorage::default().await.unwrap();
727
728        let lc = LightClient::from_genesis(db.clone(), client.clone(), client.genesis().await);
729
730        // Test empty block height.
731        assert_eq!(lc.block_height().await.unwrap(), 0);
732
733        // Local block height greater than server.
734        let leaf = leaf_chain(1..2, DRB_AND_HEADER_UPGRADE_VERSION)
735            .await
736            .remove(0);
737        db.insert_leaf(leaf).await.unwrap();
738        assert_eq!(lc.block_height().await.unwrap(), 2);
739
740        // Server block height greater than local.
741        client.leaf(2).await;
742        assert_eq!(lc.block_height().await.unwrap(), 3);
743        // In the process of verifying the block height, we should have fetched and stored the
744        // latest leaf.
745        assert_eq!(db.block_height().await.unwrap(), 3);
746
747        // Server lies about the block height.
748        client.mock_block_height(10).await;
749        client.forget_leaf(9).await;
750        assert_eq!(lc.block_height().await.unwrap(), 3);
751    }
752
753    #[tokio::test]
754    #[test_log::test]
755    async fn test_fetch_leaf_twice() {
756        let client = TestClient::default();
757        let lc = LightClient::from_genesis(
758            SqliteStorage::default().await.unwrap(),
759            client.clone(),
760            client.genesis().await,
761        );
762
763        // Fetch the leaf for the first time. We will need to get it from the server.
764        let leaf = client.remember_leaf(1).await;
765        assert_eq!(lc.fetch_leaf(LeafId::Number(1)).await.unwrap(), leaf);
766
767        // Fetching the leaf again hits the cache.
768        client.forget_leaf(1).await;
769        assert_eq!(lc.fetch_leaf(LeafId::Number(1)).await.unwrap(), leaf);
770    }
771
772    #[tokio::test]
773    #[test_log::test]
774    async fn test_fetch_leaf_upper_bound() {
775        let client = TestClient::default();
776
777        let db = SqliteStorage::default().await.unwrap();
778        db.insert_leaf(client.leaf(2).await).await.unwrap();
779
780        let lc = LightClient::from_genesis(db, client.clone(), client.genesis().await);
781        assert_eq!(
782            lc.fetch_leaf(LeafId::Number(1)).await.unwrap(),
783            client.leaf(1).await,
784        );
785    }
786
787    #[tokio::test]
788    #[test_log::test]
789    async fn test_fetch_leaf_invalid_proof() {
790        let client = TestClient::default();
791        let lc = LightClient::from_genesis(
792            SqliteStorage::default().await.unwrap(),
793            client.clone(),
794            client.genesis().await,
795        );
796        client.return_invalid_proof(1).await;
797        lc.fetch_leaf(LeafId::Number(1)).await.unwrap_err();
798        lc.fetch_leaf(LeafId::Hash(client.leaf(1).await.hash()))
799            .await
800            .unwrap_err();
801    }
802
803    #[tokio::test]
804    #[test_log::test]
805    async fn test_fetch_leaf_wrong_leaf() {
806        let client = TestClient::default();
807        let lc = LightClient::from_genesis(
808            SqliteStorage::default().await.unwrap(),
809            client.clone(),
810            client.genesis().await,
811        );
812        client.return_wrong_leaf(1, 2).await;
813        lc.fetch_leaf(LeafId::Number(1)).await.unwrap_err();
814        lc.fetch_leaf(LeafId::Hash(client.leaf(1).await.hash()))
815            .await
816            .unwrap_err();
817    }
818
819    #[tokio::test]
820    #[test_log::test]
821    async fn test_fetch_header_twice() {
822        let client = TestClient::default();
823        let lc = LightClient::from_genesis(
824            SqliteStorage::default().await.unwrap(),
825            client.clone(),
826            client.genesis().await,
827        );
828
829        // Fetch the header for the first time. We will need to get it from the server.
830        let leaf = client.remember_leaf(1).await;
831        assert_eq!(
832            lc.fetch_header(BlockId::Number(1)).await.unwrap(),
833            *leaf.header()
834        );
835
836        // Fetching the header again hits the cache.
837        client.forget_leaf(1).await;
838        assert_eq!(
839            lc.fetch_header(BlockId::Number(1)).await.unwrap(),
840            *leaf.header()
841        );
842    }
843
844    #[tokio::test]
845    #[test_log::test]
846    async fn test_fetch_header_upper_bound() {
847        let client = TestClient::default();
848
849        let db = SqliteStorage::default().await.unwrap();
850        db.insert_leaf(client.leaf(2).await).await.unwrap();
851
852        let lc = LightClient::from_genesis(db, client.clone(), client.genesis().await);
853        assert_eq!(
854            lc.fetch_header(BlockId::Number(1)).await.unwrap(),
855            *client.leaf(1).await.header(),
856        );
857    }
858
859    #[tokio::test]
860    #[test_log::test]
861    async fn test_fetch_header_invalid_proof() {
862        let client = TestClient::default();
863        let db = SqliteStorage::default().await.unwrap();
864
865        // Start with an upper bound, so that `fetch_header` goes through the `client.header_proof`
866        // path.
867        db.insert_leaf(client.leaf(2).await).await.unwrap();
868
869        let lc = LightClient::from_genesis(db, client.clone(), client.genesis().await);
870        client.return_invalid_proof(1).await;
871
872        let err = lc.fetch_header(BlockId::Number(1)).await.unwrap_err();
873        assert!(err.to_string().contains("invalid header proof"), "{err:#}");
874    }
875
876    #[tokio::test]
877    #[test_log::test]
878    async fn test_fetch_header_wrong_header() {
879        let client = TestClient::default();
880        let db = SqliteStorage::default().await.unwrap();
881
882        // Start with an upper bound, so that `fetch_header` goes through the `client.header_proof`
883        // path.
884        db.insert_leaf(client.leaf(2).await).await.unwrap();
885
886        let lc = LightClient::from_genesis(db, client.clone(), client.genesis().await);
887        client.return_wrong_leaf(1, 0).await;
888
889        let err = lc.fetch_header(BlockId::Number(1)).await.unwrap_err();
890        assert!(
891            err.to_string()
892                .contains("server returned a valid header proof for the wrong header"),
893            "{err:#}"
894        );
895    }
896
897    #[tokio::test]
898    #[test_log::test]
899    async fn test_fetch_leaves_in_range() {
900        let client = TestClient::default();
901        let lc = LightClient::from_genesis(
902            SqliteStorage::default().await.unwrap(),
903            client.clone(),
904            client.genesis().await,
905        );
906
907        // Fetch leaves in range [1,3) for the first time. We will need to get them from the server.
908        let leaf1 = client.remember_leaf(1).await;
909        let leaf2 = client.remember_leaf(2).await;
910        client.remember_leaf(3).await;
911
912        let leaves = lc.fetch_leaves_in_range(1, 3).await.unwrap();
913
914        assert_eq!(leaves, vec![leaf1.clone(), leaf2.clone()]);
915
916        // now remove from server and this time it should be able to fetch from local db
917        client.forget_leaf(1).await;
918        client.forget_leaf(2).await;
919        let leaves = lc.fetch_leaves_in_range(1, 3).await.unwrap();
920        assert_eq!(leaves, vec![leaf1, leaf2]);
921    }
922
923    #[tokio::test]
924    #[test_log::test]
925    async fn test_fetch_headers_in_range() {
926        let client = TestClient::default();
927        let lc = LightClient::from_genesis(
928            SqliteStorage::default().await.unwrap(),
929            client.clone(),
930            client.genesis().await,
931        );
932
933        // Fetch headers in range [1,3) for the first time. We will need to get them from the server.
934        let leaf1 = client.remember_leaf(1).await;
935        let leaf2 = client.remember_leaf(2).await;
936        client.remember_leaf(3).await;
937
938        let headers = lc.fetch_headers_in_range(1, 3).await.unwrap();
939
940        assert_eq!(
941            headers,
942            vec![leaf1.header().clone(), leaf2.header().clone()]
943        );
944
945        // now remove from server and this time it should be able to fetch from local db
946        client.forget_leaf(1).await;
947        client.forget_leaf(2).await;
948        let headers = lc.fetch_headers_in_range(1, 3).await.unwrap();
949        assert_eq!(
950            headers,
951            vec![leaf1.header().clone(), leaf2.header().clone()]
952        );
953    }
954
955    #[tokio::test]
956    #[test_log::test]
957    async fn test_fetch_leaves_in_range_invalid_proof() {
958        let client = TestClient::default();
959        let lc = LightClient::from_genesis(
960            SqliteStorage::default().await.unwrap(),
961            client.clone(),
962            client.genesis().await,
963        );
964        client.return_invalid_proof(2).await;
965        let err = lc.fetch_leaves_in_range(1, 3).await.unwrap_err();
966        assert!(
967            err.to_string().contains(
968                "server returned proof with assumption, but we have no finalized upper bound to \
969                 verify assumption"
970            ),
971            "{err:#}"
972        );
973    }
974
975    #[tokio::test]
976    #[test_log::test]
977    async fn test_fetch_headers_in_range_invalid_proof() {
978        let client = TestClient::default();
979        let lc = LightClient::from_genesis(
980            SqliteStorage::default().await.unwrap(),
981            client.clone(),
982            client.genesis().await,
983        );
984        client.return_invalid_proof(2).await;
985        let err = lc.fetch_headers_in_range(1, 3).await.unwrap_err();
986        assert!(
987            err.to_string().contains(
988                "server returned proof with assumption, but we have no finalized upper bound to \
989                 verify assumption"
990            ),
991            "{err:#}"
992        );
993    }
994
995    #[tokio::test]
996    #[test_log::test]
997    async fn test_fetch_leaves_in_range_wrong_leaf() {
998        let client = TestClient::default();
999        let lc = LightClient::from_genesis(
1000            SqliteStorage::default().await.unwrap(),
1001            client.clone(),
1002            client.genesis().await,
1003        );
1004        client.return_wrong_leaf(1, 2).await;
1005        let err = lc.fetch_leaves_in_range(1, 4).await.unwrap_err();
1006        assert!(err.to_string().contains("leaf hash mismatch"), "{err:#}");
1007    }
1008
1009    #[tokio::test]
1010    #[test_log::test]
1011    async fn test_fetch_headers_in_range_wrong_leaf() {
1012        let client = TestClient::default();
1013        let lc = LightClient::from_genesis(
1014            SqliteStorage::default().await.unwrap(),
1015            client.clone(),
1016            client.genesis().await,
1017        );
1018        client.return_wrong_leaf(1, 2).await;
1019        let err = lc.fetch_headers_in_range(1, 4).await.unwrap_err();
1020        assert!(err.to_string().contains("leaf hash mismatch"), "{err:#}");
1021    }
1022
1023    #[tokio::test]
1024    #[test_log::test]
1025    async fn test_fetch_stake_table_twice_cache() {
1026        let client = TestClient::default();
1027        let genesis = client.genesis().await;
1028        let lc = LightClient::from_genesis(
1029            SqliteStorage::default().await.unwrap(),
1030            client.clone(),
1031            genesis.clone(),
1032        );
1033
1034        // Fetch a dynamic stake table for the first time. We will need to get it from the server.
1035        let epoch = genesis.first_epoch_with_dynamic_stake_table + 5;
1036        let expected = client.quorum_for_epoch(epoch).await.into();
1037        assert_eq!(*lc.quorum_for_epoch(epoch).await.unwrap(), expected);
1038
1039        // Fetching the stake table again hits the cache.
1040        client.forget_quorum(epoch).await;
1041        assert_eq!(*lc.quorum_for_epoch(epoch).await.unwrap(), expected);
1042    }
1043
1044    #[tokio::test]
1045    #[test_log::test]
1046    async fn test_fetch_stake_table_twice_storage() {
1047        let db = SqliteStorage::default().await.unwrap();
1048        let client = TestClient::default();
1049        let genesis = client.genesis().await;
1050        let lc = LightClient::from_genesis(db.clone(), client.clone(), genesis.clone());
1051
1052        // Fetch a dynamic stake table for the first time. We will need to get it from the server.
1053        let epoch = genesis.first_epoch_with_dynamic_stake_table + 5;
1054        let expected = client.quorum_for_epoch(epoch).await.into();
1055        assert_eq!(*lc.quorum_for_epoch(epoch).await.unwrap(), expected);
1056
1057        // Even if the in-memory cache is cleared, we can still get this stake table without hitting
1058        // the server, but fetching from the database.
1059        client.forget_quorum(epoch).await;
1060        let lc = LightClient::from_genesis(db, client.clone(), genesis);
1061        assert_eq!(*lc.quorum_for_epoch(epoch).await.unwrap(), expected);
1062    }
1063
1064    #[tokio::test]
1065    #[test_log::test]
1066    async fn test_fetch_stake_table_catchup_from_lower_bound() {
1067        let db = SqliteStorage::default().await.unwrap();
1068        let client = TestClient::default();
1069        let genesis = client.genesis().await;
1070        let lc = LightClient::from_genesis(db.clone(), client.clone(), genesis.clone());
1071
1072        // Fetch a stake table, causing it to be stored in the stake table, making it usable as a
1073        // lower bound for later catchup.
1074        let epoch = genesis.first_epoch_with_dynamic_stake_table + 5;
1075        lc.quorum_for_epoch(epoch).await.unwrap();
1076
1077        // Cause the server to forget older epochs' stake table events, so that we can only catch up
1078        // successfully if we start from the saved lower bound.
1079        for epoch in 0..*epoch - 1 {
1080            client.forget_quorum(EpochNumber::new(epoch)).await;
1081        }
1082
1083        // Fetch a future stake table, catching up from the saved lower bound.
1084        let expected = client.quorum_for_epoch(epoch + 5).await.into();
1085        assert_eq!(*lc.quorum_for_epoch(epoch + 5).await.unwrap(), expected);
1086    }
1087
1088    #[tokio::test]
1089    #[test_log::test]
1090    async fn test_fetch_stake_table_cache_removal() {
1091        let db = SqliteStorage::default().await.unwrap();
1092        let client = TestClient::default();
1093        let genesis = client.genesis().await;
1094        let lc = LightClient::from_genesis_with_options(
1095            db.clone(),
1096            client.clone(),
1097            genesis.clone(),
1098            LightClientOptions {
1099                num_stake_tables_in_memory: 2,
1100            },
1101        );
1102
1103        // Fetch two stake tables, causing the cache to be filled up.
1104        for i in 0..2 {
1105            let epoch = genesis.first_epoch_with_dynamic_stake_table + 5 + i;
1106            assert_eq!(
1107                *lc.quorum_for_epoch(epoch).await.unwrap(),
1108                client.quorum_for_epoch(epoch).await.into(),
1109            );
1110        }
1111        assert_eq!(lc.stake_tables.read().await.len(), 2);
1112
1113        // Fetch a third stake table, causing the second-oldest stake table to be deleted.
1114        assert_eq!(
1115            *lc.quorum_for_epoch(genesis.first_epoch_with_dynamic_stake_table + 10)
1116                .await
1117                .unwrap(),
1118            client
1119                .quorum_for_epoch(genesis.first_epoch_with_dynamic_stake_table + 10)
1120                .await
1121                .into(),
1122        );
1123        assert_eq!(lc.stake_tables.read().await.len(), 2);
1124
1125        // Even if the server forgets all earlier stake tables, we can still catch up because we
1126        // didn't remove the lower bound stake table.
1127        let epoch = genesis.first_epoch_with_dynamic_stake_table + 6;
1128        for i in 0..*epoch {
1129            client.forget_quorum(EpochNumber::new(i)).await;
1130        }
1131        assert_eq!(
1132            *lc.quorum_for_epoch(epoch).await.unwrap(),
1133            client.quorum_for_epoch(epoch).await.into(),
1134        );
1135    }
1136
1137    #[tokio::test]
1138    #[test_log::test]
1139    async fn test_fetch_stake_table_invalid() {
1140        let db = SqliteStorage::default().await.unwrap();
1141        let client = TestClient::default();
1142        let genesis = client.genesis().await;
1143        let lc = LightClient::from_genesis(db.clone(), client.clone(), genesis.clone());
1144
1145        let epoch = genesis.first_epoch_with_dynamic_stake_table + 5;
1146        client.return_invalid_quorum(epoch).await;
1147        let err = lc.quorum_for_epoch(epoch).await.unwrap_err();
1148        assert!(
1149            err.to_string()
1150                .contains("does not match reconstructed hash"),
1151            "{err:#}"
1152        );
1153    }
1154
1155    #[tokio::test]
1156    #[test_log::test]
1157    async fn test_fetch_payload() {
1158        let client = TestClient::default();
1159        let lc = LightClient::from_genesis(
1160            SqliteStorage::default().await.unwrap(),
1161            client.clone(),
1162            client.genesis().await,
1163        );
1164
1165        for i in 1..10 {
1166            let payload = client.payload(i).await;
1167            let res = lc.fetch_payload(BlockId::Number(i)).await.unwrap();
1168            assert_eq!(res.data(), &payload);
1169            assert_eq!(res.height(), i as u64);
1170            assert_eq!(res.block_hash(), client.leaf(i).await.block_hash());
1171            assert_eq!(res.hash(), client.leaf(i).await.payload_hash());
1172        }
1173    }
1174
1175    #[tokio::test]
1176    #[test_log::test]
1177    async fn test_fetch_payload_invalid() {
1178        let client = TestClient::default();
1179        let lc = LightClient::from_genesis(
1180            SqliteStorage::default().await.unwrap(),
1181            client.clone(),
1182            client.genesis().await,
1183        );
1184
1185        client.return_invalid_payload(1).await;
1186        let err = lc.fetch_payload(BlockId::Number(1)).await.unwrap_err();
1187        assert!(
1188            err.to_string()
1189                .contains("commitment of payload does not match commitment in header"),
1190            "{err:#}"
1191        );
1192    }
1193
1194    #[tokio::test]
1195    #[test_log::test]
1196    async fn test_fetch_namespace() {
1197        let client = TestClient::default();
1198        let lc = LightClient::from_genesis(
1199            SqliteStorage::default().await.unwrap(),
1200            client.clone(),
1201            client.genesis().await,
1202        );
1203
1204        for i in 1..10 {
1205            let leaf = client.leaf(i).await;
1206            let payload = client.payload(i).await;
1207
1208            for id in [
1209                BlockId::Number(i),
1210                BlockId::Hash(leaf.block_hash()),
1211                BlockId::PayloadHash(leaf.payload_hash()),
1212            ] {
1213                // Request a non-empty namespace.
1214                let ns_index = NsIndex::from(0);
1215                let tx = payload
1216                    .transaction(&TransactionIndex {
1217                        ns_index,
1218                        position: 0,
1219                    })
1220                    .unwrap();
1221                let txs = lc.fetch_namespace(id, tx.namespace()).await.unwrap();
1222                assert_eq!(txs, std::slice::from_ref(&tx));
1223
1224                // Request an empty namespace.
1225                let txs = lc
1226                    .fetch_namespace(id, NamespaceId::from(u64::from(tx.namespace()) + 1))
1227                    .await
1228                    .unwrap();
1229                assert_eq!(txs, []);
1230            }
1231        }
1232
1233        // Fetch by range.
1234        let ns = client
1235            .payload(1)
1236            .await
1237            .transaction(&TransactionIndex {
1238                ns_index: 0.into(),
1239                position: 0,
1240            })
1241            .unwrap()
1242            .namespace();
1243        let namespaces = lc.fetch_namespaces_in_range(1, 10, ns).await.unwrap();
1244        assert_eq!(namespaces.len(), 9);
1245    }
1246
1247    #[tokio::test]
1248    #[test_log::test]
1249    async fn test_fetch_namespace_invalid() {
1250        let client = TestClient::default();
1251        let lc = LightClient::from_genesis(
1252            SqliteStorage::default().await.unwrap(),
1253            client.clone(),
1254            client.genesis().await,
1255        );
1256
1257        let payload = client.payload(1).await;
1258        let ns_index = NsIndex::from(0);
1259        let tx = payload
1260            .transaction(&TransactionIndex {
1261                ns_index,
1262                position: 0,
1263            })
1264            .unwrap();
1265
1266        client.return_invalid_payload(1).await;
1267        let err = lc
1268            .fetch_namespace(BlockId::Number(1), tx.namespace())
1269            .await
1270            .unwrap_err();
1271        assert!(
1272            err.to_string().contains("invalid namespace proof"),
1273            "{err:#}"
1274        );
1275    }
1276}