Skip to main content

espresso_node/
catchup.rs

1use std::{
2    cmp::Ordering,
3    collections::HashMap,
4    fmt::{Debug, Display},
5    sync::Arc,
6    time::Duration,
7};
8
9use alloy::primitives::U256;
10use anyhow::{Context, anyhow, bail, ensure};
11use async_lock::RwLock;
12use async_trait::async_trait;
13use committable::{Commitment, Committable};
14use espresso_types::{
15    BackoffParams, BlockMerkleTree, Certificate2, FeeAccount, FeeAccountProof, FeeMerkleCommitment,
16    FeeMerkleTree, Leaf2, NodeState, SeqTypes, ValidatedState,
17    config::PublicNetworkConfig,
18    v0::traits::StateCatchup,
19    v0_3::{
20        ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1,
21        RewardMerkleTreeV1,
22    },
23    v0_4::{
24        PermittedRewardMerkleTreeV2, RewardAccountProofV2, RewardAccountV2,
25        RewardMerkleCommitmentV2, RewardMerkleTreeV2, forgotten_accounts_include,
26    },
27};
28use futures::{
29    StreamExt,
30    future::{Future, FutureExt, TryFuture, TryFutureExt},
31    stream::FuturesUnordered,
32};
33use hotshot_new_protocol::utils::verify_leaf_chain_with_cert2;
34use hotshot_types::{
35    ValidatorConfig,
36    data::ViewNumber,
37    message::UpgradeLock,
38    network::NetworkConfig,
39    simple_certificate::LightClientStateUpdateCertificateV2,
40    stake_table::HSStakeTable,
41    traits::{
42        ValidatedState as ValidatedStateTrait,
43        metrics::{Counter, CounterFamily, Metrics},
44    },
45    utils::verify_leaf_chain,
46};
47use itertools::Itertools;
48use jf_merkle_tree_compat::{ForgetableMerkleTreeScheme, MerkleTreeScheme, prelude::MerkleNode};
49use parking_lot::Mutex;
50use priority_queue::PriorityQueue;
51use serde::de::DeserializeOwned;
52use surf_disco::Request;
53use tide_disco::error::ServerError;
54use tokio::time::timeout;
55use tokio_util::task::AbortOnDropHandle;
56use url::Url;
57use vbs::version::StaticVersionType;
58use versions::{EPOCH_VERSION, NEW_PROTOCOL_VERSION};
59
60use crate::{
61    api::{BlocksFrontier, RewardMerkleTreeDataSource, RewardMerkleTreeV2Data},
62    consensus_handle::ConsensusHandle,
63};
64
65// This newtype is probably not worth having. It's only used to be able to log
66// URLs before doing requests.
67#[derive(Debug, Clone)]
68struct Client<ServerError, ApiVer: StaticVersionType> {
69    inner: surf_disco::Client<ServerError, ApiVer>,
70    url: Url,
71    requests: Arc<Box<dyn Counter>>,
72    failures: Arc<Box<dyn Counter>>,
73}
74
75impl<ApiVer: StaticVersionType> Client<ServerError, ApiVer> {
76    pub fn new(
77        url: Url,
78        requests: &(impl CounterFamily + ?Sized),
79        failures: &(impl CounterFamily + ?Sized),
80    ) -> Self {
81        Self {
82            inner: surf_disco::Client::new(url.clone()),
83            requests: Arc::new(requests.create(vec![url.to_string()])),
84            failures: Arc::new(failures.create(vec![url.to_string()])),
85            url,
86        }
87    }
88
89    pub fn get<T: DeserializeOwned>(&self, route: &str) -> Request<T, ServerError, ApiVer> {
90        self.inner.get(route)
91    }
92}
93
94/// A score of a catchup peer, based on our interactions with that peer.
95///
96/// The score accounts for malicious peers -- i.e. peers that gave us an invalid response to a
97/// verifiable request -- and faulty/unreliable peers -- those that fail to respond to requests at
98/// all. The score has a comparison function where higher is better, or in other words `p1 > p2`
99/// means we believe we are more likely to successfully catch up using `p1` than `p2`. This makes it
100/// convenient and efficient to collect peers in a priority queue which we can easily convert to a
101/// list sorted by reliability.
102#[derive(Clone, Copy, Debug, Default)]
103struct PeerScore {
104    requests: usize,
105    failures: usize,
106}
107
108impl Ord for PeerScore {
109    fn cmp(&self, other: &Self) -> Ordering {
110        // Compare failure rates: `self` is better than `other` if
111        //      self.failures / self.requests < other.failures / other.requests
112        // or equivalently
113        //      other.failures * self.requests > self.failures * other.requests
114        (other.failures * self.requests).cmp(&(self.failures * other.requests))
115    }
116}
117
118impl PartialOrd for PeerScore {
119    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
120        Some(self.cmp(other))
121    }
122}
123
124impl PartialEq for PeerScore {
125    fn eq(&self, other: &Self) -> bool {
126        self.cmp(other).is_eq()
127    }
128}
129
130impl Eq for PeerScore {}
131
132#[derive(Debug, Clone, Default)]
133pub struct StatePeers<ApiVer: StaticVersionType> {
134    // Peer IDs, ordered by reliability score. Each ID is an index into `clients`.
135    scores: Arc<RwLock<PriorityQueue<usize, PeerScore>>>,
136    clients: Vec<Client<ServerError, ApiVer>>,
137    backoff: BackoffParams,
138    /// Base timeout for per peer catchup request
139    base_timeout: Duration,
140}
141
142impl<ApiVer: StaticVersionType> StatePeers<ApiVer> {
143    async fn fetch<Fut>(
144        &self,
145        retry: usize,
146        f: impl Fn(Client<ServerError, ApiVer>) -> Fut,
147    ) -> anyhow::Result<Fut::Ok>
148    where
149        Fut: TryFuture<Error: Display>,
150    {
151        // Since we have generally have multiple peers we can catch up from, we want a fairly
152        // aggressive timeout for requests: if a peer is not responding quickly, we're better off
153        // just trying the next one rather than waiting, and this prevents a malicious peer from
154        // delaying catchup for a long time.
155        //
156        // However, if we set the timeout _too_ aggressively, we might fail to catch up even from an
157        // honest peer, and thus never make progress. Thus, we start with a base timeout (default
158        // 2s), which is reasonable for an HTTP request. If that fails with all of our peers, we
159        // increase the timeout by the base amount for each successive retry, until we eventually
160        // succeed. The base timeout is configurable via ESPRESSO_NODE_CATCHUP_BASE_TIMEOUT.
161        let timeout_dur = self.base_timeout * (retry as u32 + 1);
162
163        // Keep track of which peers we make requests to and which succeed (`true`) or fail (`false`),
164        // so we can update reliability scores at the end.
165        let mut requests = HashMap::new();
166        let mut res = Err(anyhow!("failed fetching from every peer"));
167
168        // Try each peer in order of reliability score, until we succeed. We clone out of
169        // `self.scores` because it is small (contains only numeric IDs and scores), so this clone
170        // is a lot cheaper than holding the read lock the entire time we are making requests (which
171        // could be a while).
172        let mut scores = { (*self.scores.read().await).clone() };
173        let mut logs = vec![format!("Fetching failed.\n")];
174        while let Some((id, score)) = scores.pop() {
175            let client = &self.clients[id];
176            tracing::info!("fetching from {}", client.url);
177            match timeout(timeout_dur, TryFutureExt::into_future(f(client.clone()))).await {
178                Ok(Ok(t)) => {
179                    requests.insert(id, true);
180                    res = Ok(t);
181                    logs = Vec::new();
182                    break;
183                },
184                Ok(Err(err)) => {
185                    tracing::debug!(id, ?score, peer = %client.url, "error from peer: {err:#}");
186                    logs.push(format!(
187                        "Error from peer {} with id {id} and score {score:?}: {err:#}",
188                        client.url
189                    ));
190                    requests.insert(id, false);
191                },
192                Err(_) => {
193                    tracing::debug!(id, ?score, peer = %client.url, ?timeout_dur, "request timed out");
194                    logs.push(format!(
195                        "Error from peer {} with id {id} and score {score:?}: request timed out",
196                        client.url
197                    ));
198                    requests.insert(id, false);
199                },
200            }
201        }
202
203        if !logs.is_empty() {
204            tracing::warn!("{}", logs.join("\n"));
205        }
206
207        // Update client scores.
208        let mut scores = self.scores.write().await;
209        for (id, success) in requests {
210            scores.change_priority_by(&id, |score| {
211                score.requests += 1;
212                self.clients[id].requests.add(1);
213                if !success {
214                    score.failures += 1;
215                    self.clients[id].failures.add(1);
216                }
217            });
218        }
219
220        res
221    }
222
223    pub fn from_urls(
224        urls: Vec<Url>,
225        backoff: BackoffParams,
226        base_timeout: Duration,
227        metrics: &(impl Metrics + ?Sized),
228    ) -> Self {
229        if urls.is_empty() {
230            panic!("Cannot create StatePeers with no peers");
231        }
232
233        let metrics = metrics.subgroup("catchup".into());
234        let requests = metrics.counter_family("requests".into(), vec!["peer".into()]);
235        let failures = metrics.counter_family("request_failures".into(), vec!["peer".into()]);
236
237        let scores = urls
238            .iter()
239            .enumerate()
240            .map(|(i, _)| (i, PeerScore::default()))
241            .collect();
242        let clients = urls
243            .into_iter()
244            .map(|url| Client::new(url, &*requests, &*failures))
245            .collect();
246
247        Self {
248            clients,
249            scores: Arc::new(RwLock::new(scores)),
250            backoff,
251            base_timeout,
252        }
253    }
254
255    #[tracing::instrument(skip(self, my_own_validator_config))]
256    pub async fn fetch_config(
257        &self,
258        my_own_validator_config: ValidatorConfig<SeqTypes>,
259    ) -> anyhow::Result<NetworkConfig<SeqTypes>> {
260        self.backoff()
261            .retry(self, move |provider, retry| {
262                let my_own_validator_config = my_own_validator_config.clone();
263                async move {
264                    let cfg: PublicNetworkConfig = provider
265                        .fetch(retry, |client| {
266                            let url = client.url.join("config/hotshot").unwrap();
267
268                            reqwest::get(url.clone())
269                        })
270                        .await?
271                        .json()
272                        .await?;
273                    cfg.into_network_config(my_own_validator_config)
274                        .context("fetched config, but failed to convert to private config")
275                }
276                .boxed()
277            })
278            .await
279    }
280}
281
282#[async_trait]
283impl<ApiVer: StaticVersionType> StateCatchup for StatePeers<ApiVer> {
284    #[tracing::instrument(skip(self, _instance))]
285    async fn try_fetch_accounts(
286        &self,
287        retry: usize,
288        _instance: &NodeState,
289        height: u64,
290        view: ViewNumber,
291        fee_merkle_tree_root: FeeMerkleCommitment,
292        accounts: &[FeeAccount],
293    ) -> anyhow::Result<Vec<FeeAccountProof>> {
294        self.fetch(retry, |client| async move {
295            let tree = client
296                .inner
297                .post::<FeeMerkleTree>(&format!("catchup/{height}/{}/accounts", view.u64()))
298                .body_binary(&accounts.to_vec())?
299                .send()
300                .await?;
301
302            // Verify proofs.
303            let mut proofs = Vec::new();
304            for account in accounts {
305                let (proof, _) = FeeAccountProof::prove(&tree, (*account).into())
306                    .context(format!("response missing fee account {account}"))?;
307                proof.verify(&fee_merkle_tree_root).context(format!(
308                    "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
309                ))?;
310                proofs.push(proof);
311            }
312
313            anyhow::Ok(proofs)
314        })
315        .await
316    }
317
318    #[tracing::instrument(skip(self, _instance, mt))]
319    async fn try_remember_blocks_merkle_tree(
320        &self,
321        retry: usize,
322        _instance: &NodeState,
323        height: u64,
324        view: ViewNumber,
325        mt: &mut BlockMerkleTree,
326    ) -> anyhow::Result<()> {
327        *mt = self
328            .fetch(retry, |client| {
329                let mut mt = mt.clone();
330                async move {
331                    let frontier = client
332                        .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
333                        .send()
334                        .await?;
335                    let elem = frontier
336                        .elem()
337                        .context("provided frontier is missing leaf element")?;
338                    mt.remember(mt.num_leaves() - 1, *elem, &frontier)
339                        .context("verifying block proof")?;
340                    anyhow::Ok(mt)
341                }
342            })
343            .await?;
344        Ok(())
345    }
346
347    async fn try_fetch_chain_config(
348        &self,
349        retry: usize,
350        commitment: Commitment<ChainConfig>,
351    ) -> anyhow::Result<ChainConfig> {
352        self.fetch(retry, |client| async move {
353            let cf = client
354                .get::<ChainConfig>(&format!("catchup/chain-config/{commitment}"))
355                .send()
356                .await?;
357            ensure!(
358                cf.commit() == commitment,
359                "received chain config with mismatched commitment: expected {commitment}, got {}",
360                cf.commit()
361            );
362            Ok(cf)
363        })
364        .await
365    }
366
367    async fn try_fetch_leaf(
368        &self,
369        retry: usize,
370        height: u64,
371        stake_table: HSStakeTable<SeqTypes>,
372        success_threshold: U256,
373    ) -> anyhow::Result<Leaf2> {
374        // Fetch the leaf chain. For new protocol heights this is a leaf range
375        // `[height..=cert2_height]`
376        // for legacy-protocol heights it's a 3-chain.
377        let leaf_chain = self
378            .fetch(retry, |client| async move {
379                let chain = client
380                    .get::<Vec<Leaf2>>(&format!("catchup/{height}/leafchain"))
381                    .send()
382                    .await?;
383                anyhow::Ok(chain)
384            })
385            .await
386            .with_context(|| format!("failed to fetch leaf chain at height {height}"))?;
387
388        let first = leaf_chain
389            .first()
390            .ok_or_else(|| anyhow!("empty leaf chain returned for height {height}"))?;
391
392        if first.block_header().version() >= NEW_PROTOCOL_VERSION {
393            let upgrade_lock =
394                UpgradeLock::<SeqTypes>::new(versions::Upgrade::trivial(NEW_PROTOCOL_VERSION));
395            let cert2 = self
396                .fetch(retry, |client| async move {
397                    let cert2 = client
398                        .get::<Certificate2<SeqTypes>>(&format!("catchup/{height}/cert2"))
399                        .send()
400                        .await?;
401                    anyhow::Ok(cert2)
402                })
403                .await
404                .with_context(|| format!("failed to fetch cert2 for height {height}"))?;
405
406            verify_leaf_chain_with_cert2(
407                leaf_chain,
408                &stake_table,
409                success_threshold,
410                height,
411                &upgrade_lock,
412                cert2,
413            )
414            .await
415            .with_context(|| format!("failed to verify leaf chain with cert2 at height {height}"))
416        } else {
417            let upgrade_lock =
418                UpgradeLock::<SeqTypes>::new(versions::Upgrade::trivial(EPOCH_VERSION));
419            verify_leaf_chain(
420                leaf_chain,
421                &stake_table,
422                success_threshold,
423                height,
424                &upgrade_lock,
425            )
426            .await
427            .with_context(|| format!("failed to verify leaf chain at height {height}"))
428        }
429    }
430
431    async fn try_fetch_reward_merkle_tree_v2(
432        &self,
433        retry: usize,
434        height: u64,
435        view: ViewNumber,
436        reward_merkle_tree_root: RewardMerkleCommitmentV2,
437        accounts: Arc<Vec<RewardAccountV2>>,
438    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
439        let result = self
440            .fetch(retry, |client| async move {
441                // Try the catchup endpoint first which returns tree from consensuss decided state
442                // if not present, then fall back to
443                // the reward-state-v2 endpoint which returns from storage decided state
444                let tree_bytes = match client
445                    .inner
446                    .get::<Vec<u8>>(&format!("catchup/reward-merkle-tree-v2/{height}/{}", *view))
447                    .send()
448                    .await
449                {
450                    Ok(bytes) => bytes,
451                    Err(err) => {
452                        tracing::info!(
453                            "catchup endpoint failed, falling back to reward-state-v2: {err:#}"
454                        );
455                        client
456                            .inner
457                            .get::<Vec<u8>>(&format!(
458                                "reward-state-v2/reward-merkle-tree-v2/{height}"
459                            ))
460                            .send()
461                            .await?
462                    },
463                };
464
465                Ok::<Vec<u8>, anyhow::Error>(tree_bytes)
466            })
467            .await
468            .context("Fetching from peer failed")?;
469
470        let tree_data = bincode::deserialize::<RewardMerkleTreeV2Data>(&result)
471            .context("Failed to deserialize merkle tree from catchup")?;
472
473        let tree: PermittedRewardMerkleTreeV2 =
474            PermittedRewardMerkleTreeV2::try_from_kv_set(tree_data.balances).await?;
475
476        ensure!(
477            tree.tree.commitment() == reward_merkle_tree_root,
478            "RewardMerkleTreeV2 from peer failed commitment check."
479        );
480        ensure!(!forgotten_accounts_include(&tree, &accounts));
481
482        Ok(tree)
483    }
484
485    #[tracing::instrument(skip(self, _instance))]
486    async fn try_fetch_reward_accounts_v1(
487        &self,
488        retry: usize,
489        _instance: &NodeState,
490        height: u64,
491        view: ViewNumber,
492        reward_merkle_tree_root: RewardMerkleCommitmentV1,
493        accounts: &[RewardAccountV1],
494    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
495        self.fetch(retry, |client| async move {
496            let tree = client
497                .inner
498                .post::<RewardMerkleTreeV1>(&format!(
499                    "catchup/{height}/{}/reward-accounts",
500                    view.u64()
501                ))
502                .body_binary(&accounts.to_vec())?
503                .send()
504                .await?;
505
506            // Verify proofs.
507            let mut proofs = Vec::new();
508            for account in accounts {
509                let (proof, _) = RewardAccountProofV1::prove(&tree, (*account).into())
510                    .context(format!("response missing reward account {account}"))?;
511                proof.verify(&reward_merkle_tree_root).context(format!(
512                    "invalid proof for v1 reward account {account}, root: \
513                     {reward_merkle_tree_root} height {height} view {view}"
514                ))?;
515                proofs.push(proof);
516            }
517
518            anyhow::Ok(proofs)
519        })
520        .await
521    }
522
523    async fn try_fetch_state_cert(
524        &self,
525        retry: usize,
526        epoch: u64,
527    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
528        self.fetch(retry, |client| async move {
529            client
530                .get::<LightClientStateUpdateCertificateV2<SeqTypes>>(&format!(
531                    "catchup/{epoch}/state-cert"
532                ))
533                .send()
534                .await
535        })
536        .await
537    }
538
539    fn backoff(&self) -> &BackoffParams {
540        &self.backoff
541    }
542
543    fn name(&self) -> String {
544        format!(
545            "StatePeers({})",
546            self.clients
547                .iter()
548                .map(|client| client.url.to_string())
549                .join(",")
550        )
551    }
552
553    fn is_local(&self) -> bool {
554        false
555    }
556}
557
558pub(crate) trait CatchupStorage: Sync {
559    /// Get the state of the requested `accounts`.
560    ///
561    /// The state is fetched from a snapshot at the given height and view, which _must_ correspond!
562    /// `height` is provided to simplify lookups for backends where data is not indexed by view.
563    /// This function is intended to be used for catchup, so `view` should be no older than the last
564    /// decided view.
565    ///
566    /// If successful, this function also returns the leaf from `view`, if it is available. This can
567    /// be used to add the recovered state to HotShot's state map, so that future requests can get
568    /// the state from memory rather than storage.
569    fn get_accounts(
570        &self,
571        _instance: &NodeState,
572        _height: u64,
573        _view: ViewNumber,
574        _accounts: &[FeeAccount],
575    ) -> impl Send + Future<Output = anyhow::Result<(FeeMerkleTree, Leaf2)>> {
576        // Merklized state catchup is only supported by persistence backends that provide merklized
577        // state storage. This default implementation is overridden for those that do. Otherwise,
578        // catchup can still be provided by fetching undecided merklized state from consensus
579        // memory.
580        async {
581            bail!("merklized state catchup is not supported for this data source");
582        }
583    }
584
585    fn get_reward_accounts_v1(
586        &self,
587        _instance: &NodeState,
588        _height: u64,
589        _view: ViewNumber,
590        _accounts: &[RewardAccountV1],
591    ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV1, Leaf2)>> {
592        async {
593            bail!("merklized state catchup is not supported for this data source");
594        }
595    }
596
597    fn get_reward_accounts_v2(
598        &self,
599        _instance: &NodeState,
600        _height: u64,
601        _view: ViewNumber,
602        _accounts: &[RewardAccountV2],
603    ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV2, Leaf2)>> {
604        async {
605            bail!("merklized state catchup is not supported for this data source");
606        }
607    }
608
609    /// Get the blocks Merkle tree frontier.
610    ///
611    /// The state is fetched from a snapshot at the given height and view, which _must_ correspond!
612    /// `height` is provided to simplify lookups for backends where data is not indexed by view.
613    /// This function is intended to be used for catchup, so `view` should be no older than the last
614    /// decided view.
615    fn get_frontier(
616        &self,
617        _instance: &NodeState,
618        _height: u64,
619        _view: ViewNumber,
620    ) -> impl Send + Future<Output = anyhow::Result<BlocksFrontier>> {
621        // Merklized state catchup is only supported by persistence backends that provide merklized
622        // state storage. This default implementation is overridden for those that do. Otherwise,
623        // catchup can still be provided by fetching undecided merklized state from consensus
624        // memory.
625        async {
626            bail!("merklized state catchup is not supported for this data source");
627        }
628    }
629
630    fn get_chain_config(
631        &self,
632        _commitment: Commitment<ChainConfig>,
633    ) -> impl Send + Future<Output = anyhow::Result<ChainConfig>> {
634        async {
635            bail!("chain config catchup is not supported for this data source");
636        }
637    }
638
639    fn get_leaf_chain(
640        &self,
641        _height: u64,
642    ) -> impl Send + Future<Output = anyhow::Result<Vec<Leaf2>>> {
643        async {
644            bail!("leaf chain catchup is not supported for this data source");
645        }
646    }
647
648    /// Load the earliest cert2 whose finalized block height is at or above `height`.
649    ///
650    /// "Earliest" means the cert2 with the smallest finalized block height that is still greater
651    /// than or equal to the requested `height`.
652    fn load_earliest_cert2(
653        &self,
654        _height: u64,
655    ) -> impl Send + Future<Output = anyhow::Result<Option<Certificate2<SeqTypes>>>> {
656        async { Ok(None) }
657    }
658
659    /// Load a decided leaf at the given height.
660    fn get_leaf(&self, _height: u64) -> impl Send + Future<Output = anyhow::Result<Leaf2>> {
661        async {
662            bail!("leaf fetch is not supported for this data source");
663        }
664    }
665}
666
667impl CatchupStorage for hotshot_query_service::data_source::MetricsDataSource {}
668
669impl<T, S> CatchupStorage for hotshot_query_service::data_source::ExtensibleDataSource<T, S>
670where
671    T: CatchupStorage,
672    S: Sync,
673{
674    async fn get_accounts(
675        &self,
676        instance: &NodeState,
677        height: u64,
678        view: ViewNumber,
679        accounts: &[FeeAccount],
680    ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
681        self.inner()
682            .get_accounts(instance, height, view, accounts)
683            .await
684    }
685
686    async fn get_reward_accounts_v2(
687        &self,
688        instance: &NodeState,
689        height: u64,
690        view: ViewNumber,
691        accounts: &[RewardAccountV2],
692    ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
693        self.inner()
694            .get_reward_accounts_v2(instance, height, view, accounts)
695            .await
696    }
697
698    async fn get_reward_accounts_v1(
699        &self,
700        instance: &NodeState,
701        height: u64,
702        view: ViewNumber,
703        accounts: &[RewardAccountV1],
704    ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
705        self.inner()
706            .get_reward_accounts_v1(instance, height, view, accounts)
707            .await
708    }
709
710    async fn get_frontier(
711        &self,
712        instance: &NodeState,
713        height: u64,
714        view: ViewNumber,
715    ) -> anyhow::Result<BlocksFrontier> {
716        self.inner().get_frontier(instance, height, view).await
717    }
718
719    async fn get_chain_config(
720        &self,
721        commitment: Commitment<ChainConfig>,
722    ) -> anyhow::Result<ChainConfig> {
723        self.inner().get_chain_config(commitment).await
724    }
725    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
726        self.inner().get_leaf_chain(height).await
727    }
728
729    async fn load_earliest_cert2(
730        &self,
731        height: u64,
732    ) -> anyhow::Result<Option<Certificate2<SeqTypes>>> {
733        self.inner().load_earliest_cert2(height).await
734    }
735
736    async fn get_leaf(&self, height: u64) -> anyhow::Result<Leaf2> {
737        self.inner().get_leaf(height).await
738    }
739}
740
741#[derive(Debug)]
742pub(crate) struct SqlStateCatchup<T> {
743    db: Arc<T>,
744    backoff: BackoffParams,
745}
746
747impl<T> SqlStateCatchup<T> {
748    pub(crate) fn new(db: Arc<T>, backoff: BackoffParams) -> Self {
749        Self { db, backoff }
750    }
751}
752
753#[async_trait]
754impl<T> StateCatchup for SqlStateCatchup<T>
755where
756    T: CatchupStorage + RewardMerkleTreeDataSource + Send + Sync,
757{
758    async fn try_fetch_leaf(
759        &self,
760        _retry: usize,
761        height: u64,
762        _stake_table: HSStakeTable<SeqTypes>,
763        _success_threshold: U256,
764    ) -> anyhow::Result<Leaf2> {
765        // Leaves in our local DB were verified before they were stored, so we can return the leaf
766        // at `height` directly without re-verifying.
767        self.db
768            .get_leaf(height)
769            .await
770            .with_context(|| format!("failed to load leaf at height {height} from DB"))
771    }
772
773    // TODO: add a test for the account proof validation
774    // issue # 2102 (https://github.com/EspressoSystems/espresso-network/issues/2102)
775    #[tracing::instrument(skip(self, _retry, instance))]
776    async fn try_fetch_accounts(
777        &self,
778        _retry: usize,
779        instance: &NodeState,
780        block_height: u64,
781        view: ViewNumber,
782        fee_merkle_tree_root: FeeMerkleCommitment,
783        accounts: &[FeeAccount],
784    ) -> anyhow::Result<Vec<FeeAccountProof>> {
785        // Get the accounts
786        let (fee_merkle_tree_from_db, _) = self
787            .db
788            .get_accounts(instance, block_height, view, accounts)
789            .await
790            .with_context(|| "failed to get fee accounts from DB")?;
791
792        // Verify the accounts
793        let mut proofs = Vec::new();
794        for account in accounts {
795            let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree_from_db, (*account).into())
796                .context(format!("response missing account {account}"))?;
797            proof.verify(&fee_merkle_tree_root).context(format!(
798                "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
799            ))?;
800            proofs.push(proof);
801        }
802
803        Ok(proofs)
804    }
805
806    #[tracing::instrument(skip(self, _retry, instance, mt))]
807    async fn try_remember_blocks_merkle_tree(
808        &self,
809        _retry: usize,
810        instance: &NodeState,
811        bh: u64,
812        view: ViewNumber,
813        mt: &mut BlockMerkleTree,
814    ) -> anyhow::Result<()> {
815        if bh == 0 {
816            return Ok(());
817        }
818
819        let proof = self.db.get_frontier(instance, bh, view).await?;
820        match proof
821            .proof
822            .first()
823            .context(format!("empty proof for frontier at height {bh}"))?
824        {
825            MerkleNode::Leaf { pos, elem, .. } => mt
826                .remember(pos, elem, proof.clone())
827                .context("failed to remember proof"),
828            _ => bail!("invalid proof"),
829        }
830    }
831
832    async fn try_fetch_chain_config(
833        &self,
834        _retry: usize,
835        commitment: Commitment<ChainConfig>,
836    ) -> anyhow::Result<ChainConfig> {
837        let cf = self.db.get_chain_config(commitment).await?;
838
839        if cf.commit() != commitment {
840            panic!(
841                "Critical error: Mismatched chain config detected. Expected chain config: {:?}, \
842                 but got: {:?}.
843                This may indicate a compromised database",
844                commitment,
845                cf.commit()
846            )
847        }
848
849        Ok(cf)
850    }
851
852    async fn try_fetch_reward_merkle_tree_v2(
853        &self,
854        _retry: usize,
855        height: u64,
856        _view: ViewNumber,
857        reward_merkle_tree_root: RewardMerkleCommitmentV2,
858        accounts: Arc<Vec<RewardAccountV2>>,
859    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
860        let tree: PermittedRewardMerkleTreeV2 = self.db.load_reward_merkle_tree_v2(height).await?;
861
862        ensure!(tree.tree.commitment() == reward_merkle_tree_root);
863        ensure!(!forgotten_accounts_include(&tree, &accounts));
864
865        Ok(tree)
866    }
867
868    #[tracing::instrument(skip(self, _retry, instance))]
869    async fn try_fetch_reward_accounts_v1(
870        &self,
871        _retry: usize,
872        instance: &NodeState,
873        block_height: u64,
874        view: ViewNumber,
875        reward_merkle_tree_root: RewardMerkleCommitmentV1,
876        accounts: &[RewardAccountV1],
877    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
878        // Get the accounts
879        let (reward_merkle_tree_from_db, _) = self
880            .db
881            .get_reward_accounts_v1(instance, block_height, view, accounts)
882            .await
883            .with_context(|| "failed to get reward accounts from DB")?;
884        // Verify the accounts
885        let mut proofs = Vec::new();
886        for account in accounts {
887            let (proof, _) =
888                RewardAccountProofV1::prove(&reward_merkle_tree_from_db, (*account).into())
889                    .context(format!("response missing account {account}"))?;
890            proof.verify(&reward_merkle_tree_root).context(format!(
891                "invalid proof for v1 reward account {account}, root: {reward_merkle_tree_root}"
892            ))?;
893            proofs.push(proof);
894        }
895
896        Ok(proofs)
897    }
898
899    async fn try_fetch_state_cert(
900        &self,
901        _retry: usize,
902        _epoch: u64,
903    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
904        bail!("state cert catchup not supported for SqlStateCatchup");
905    }
906
907    fn backoff(&self) -> &BackoffParams {
908        &self.backoff
909    }
910
911    fn name(&self) -> String {
912        "SqlStateCatchup".into()
913    }
914
915    fn is_local(&self) -> bool {
916        true
917    }
918}
919
920/// Disable catchup entirely.
921#[derive(Clone, Debug)]
922pub struct NullStateCatchup {
923    backoff: BackoffParams,
924    chain_configs: HashMap<Commitment<ChainConfig>, ChainConfig>,
925}
926
927impl Default for NullStateCatchup {
928    fn default() -> Self {
929        Self {
930            backoff: BackoffParams::disabled(),
931            chain_configs: Default::default(),
932        }
933    }
934}
935
936impl NullStateCatchup {
937    /// Add a chain config preimage which can be fetched by hash during STF evaluation.
938    ///
939    /// [`NullStateCatchup`] is used to disable catchup entirely when evaluating the STF, which
940    /// requires the [`ValidatedState`](espresso_types::ValidatedState) to be pre-seeded with all
941    /// the dependencies of STF evaluation. However, the STF also depends on having the preimage of
942    /// various [`ChainConfig`] commitments, which are not stored in the
943    /// [`ValidatedState`](espresso_types::ValidatedState), but which instead must be supplied by a
944    /// separate preimage oracle. Thus, [`NullStateCatchup`] may be populated with a set of
945    /// [`ChainConfig`]s, which it can feed to the STF during evaluation.
946    pub fn add_chain_config(&mut self, cf: ChainConfig) {
947        self.chain_configs.insert(cf.commit(), cf);
948    }
949}
950
951#[async_trait]
952impl StateCatchup for NullStateCatchup {
953    async fn try_fetch_leaf(
954        &self,
955        _retry: usize,
956        _height: u64,
957        _stake_table: HSStakeTable<SeqTypes>,
958        _success_threshold: U256,
959    ) -> anyhow::Result<Leaf2> {
960        bail!("state catchup is disabled")
961    }
962
963    async fn try_fetch_accounts(
964        &self,
965        _retry: usize,
966        _instance: &NodeState,
967        _height: u64,
968        _view: ViewNumber,
969        _fee_merkle_tree_root: FeeMerkleCommitment,
970        _account: &[FeeAccount],
971    ) -> anyhow::Result<Vec<FeeAccountProof>> {
972        bail!("state catchup is disabled");
973    }
974
975    async fn try_remember_blocks_merkle_tree(
976        &self,
977        _retry: usize,
978        _instance: &NodeState,
979        _height: u64,
980        _view: ViewNumber,
981        _mt: &mut BlockMerkleTree,
982    ) -> anyhow::Result<()> {
983        bail!("state catchup is disabled");
984    }
985
986    async fn try_fetch_chain_config(
987        &self,
988        _retry: usize,
989        commitment: Commitment<ChainConfig>,
990    ) -> anyhow::Result<ChainConfig> {
991        self.chain_configs
992            .get(&commitment)
993            .copied()
994            .context(format!("chain config {commitment} not available"))
995    }
996
997    async fn try_fetch_reward_merkle_tree_v2(
998        &self,
999        _retry: usize,
1000        _height: u64,
1001        _view: ViewNumber,
1002        _reward_merkle_tree_root: RewardMerkleCommitmentV2,
1003        _accounts: Arc<Vec<RewardAccountV2>>,
1004    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
1005        bail!("state catchup is disabled");
1006    }
1007
1008    async fn try_fetch_reward_accounts_v1(
1009        &self,
1010        _retry: usize,
1011        _instance: &NodeState,
1012        _height: u64,
1013        _view: ViewNumber,
1014        _fee_merkle_tree_root: RewardMerkleCommitmentV1,
1015        _account: &[RewardAccountV1],
1016    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1017        bail!("state catchup is disabled");
1018    }
1019
1020    async fn try_fetch_state_cert(
1021        &self,
1022        _retry: usize,
1023        _epoch: u64,
1024    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1025        bail!("state catchup is disabled");
1026    }
1027
1028    fn backoff(&self) -> &BackoffParams {
1029        &self.backoff
1030    }
1031
1032    fn name(&self) -> String {
1033        "NullStateCatchup".into()
1034    }
1035
1036    fn is_local(&self) -> bool {
1037        true
1038    }
1039}
1040
1041/// A catchup implementation that parallelizes requests to many providers.
1042/// It returns the result of the first non-erroring provider to complete.
1043#[derive(Clone)]
1044pub struct ParallelStateCatchup {
1045    providers: Arc<Mutex<Vec<Arc<dyn StateCatchup>>>>,
1046    backoff: BackoffParams,
1047    /// Timeout for local provider requests
1048    local_timeout: Duration,
1049}
1050
1051impl ParallelStateCatchup {
1052    /// Create a new [`ParallelStateCatchup`] with the given providers and local timeout.
1053    pub fn new(providers: &[Arc<dyn StateCatchup>], local_timeout: Duration) -> Self {
1054        Self {
1055            providers: Arc::new(Mutex::new(providers.to_vec())),
1056            backoff: BackoffParams::disabled(),
1057            local_timeout,
1058        }
1059    }
1060
1061    /// Add a provider to the list of providers
1062    pub fn add_provider(&self, provider: Arc<dyn StateCatchup>) {
1063        self.providers.lock().push(provider);
1064    }
1065
1066    /// Perform an async operation on all local providers, returning the first result to succeed.
1067    ///
1068    /// A timeout is applied so that a slow local lookup does not prevent the node from
1069    /// falling back to remote providers in time to vote within the current view.
1070    pub async fn on_local_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
1071    where
1072        C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1073        F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1074        RT: Send + Sync + 'static,
1075    {
1076        let local_timeout = self.local_timeout;
1077        match timeout(
1078            local_timeout,
1079            self.on_providers(|provider| provider.is_local(), closure),
1080        )
1081        .await
1082        {
1083            Ok(result) => result,
1084            Err(_) => {
1085                let err = format!("local provider timed out after {local_timeout:?}");
1086                tracing::warn!("{err}");
1087                Err(anyhow::anyhow!(err))
1088            },
1089        }
1090    }
1091
1092    /// Perform an async operation on all remote providers, returning the first result to succeed
1093    pub async fn on_remote_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
1094    where
1095        C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1096        F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1097        RT: Send + Sync + 'static,
1098    {
1099        self.on_providers(|provider| !provider.is_local(), closure)
1100            .await
1101    }
1102
1103    /// Perform an async operation on all providers matching the given predicate, returning the first result to succeed
1104    pub async fn on_providers<P, C, F, RT>(&self, predicate: P, closure: C) -> anyhow::Result<RT>
1105    where
1106        P: Fn(&Arc<dyn StateCatchup>) -> bool + Clone + Send + Sync + 'static,
1107        C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1108        F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1109        RT: Send + Sync + 'static,
1110    {
1111        // Make sure we have at least one provider
1112        let providers = self.providers.lock().clone();
1113        if providers.is_empty() {
1114            return Err(anyhow::anyhow!("no providers were initialized"));
1115        }
1116
1117        // Filter the providers by the predicate
1118        let providers = providers.into_iter().filter(predicate).collect::<Vec<_>>();
1119        if providers.is_empty() {
1120            return Err(anyhow::anyhow!("no providers matched the given predicate"));
1121        }
1122
1123        // Spawn futures for each provider
1124        let mut futures = FuturesUnordered::new();
1125        for provider in providers {
1126            let closure = closure.clone();
1127            futures.push(AbortOnDropHandle::new(tokio::spawn(closure(provider))));
1128        }
1129
1130        let mut logs = vec![format!("No providers returned a successful result.\n")];
1131        // Return the first successful result
1132        while let Some(result) = futures.next().await {
1133            // Unwrap the inner (join) result
1134            let result = match result {
1135                Ok(res) => res,
1136                Err(err) => {
1137                    tracing::debug!("Failed to join on provider: {err:#}.");
1138                    logs.push(format!("Failed to join on provider: {err:#}."));
1139                    continue;
1140                },
1141            };
1142
1143            // If a provider fails, print why
1144            let result = match result {
1145                Ok(res) => res,
1146                Err(err) => {
1147                    tracing::debug!("Failed to fetch data: {err:#}.");
1148                    logs.push(format!("Failed to fetch data: {err:#}."));
1149                    continue;
1150                },
1151            };
1152
1153            return Ok(result);
1154        }
1155
1156        Err(anyhow::anyhow!(logs.join("\n")))
1157    }
1158}
1159
1160macro_rules! clone {
1161    ( ($( $x:ident ),*) $y:expr ) => {
1162        {
1163            $(let $x = $x.clone();)*
1164            $y
1165        }
1166    };
1167}
1168
1169/// A catchup implementation that parallelizes requests to a local and remote provider.
1170/// It returns the result of the first provider to complete.
1171#[async_trait]
1172impl StateCatchup for ParallelStateCatchup {
1173    async fn try_fetch_leaf(
1174        &self,
1175        retry: usize,
1176        height: u64,
1177        stake_table: HSStakeTable<SeqTypes>,
1178        success_threshold: U256,
1179    ) -> anyhow::Result<Leaf2> {
1180        // Try fetching the leaf on the local providers first
1181        let local_result = self
1182            .on_local_providers(clone! {(stake_table) move |provider| {
1183                clone!{(stake_table) async move {
1184                    provider
1185                        .try_fetch_leaf(retry, height, stake_table, success_threshold)
1186                        .await
1187                }}
1188            }})
1189            .await;
1190
1191        // Check if we were successful locally
1192        match &local_result {
1193            Ok(_) => return local_result,
1194            Err(err) => tracing::debug!("{err:#}"),
1195        }
1196
1197        // If that fails, try the remote ones
1198        self.on_remote_providers(clone! {(stake_table) move |provider| {
1199            clone!{(stake_table) async move {
1200                provider
1201                    .try_fetch_leaf(retry, height, stake_table, success_threshold)
1202                    .await
1203            }}
1204        }})
1205        .await
1206    }
1207
1208    async fn try_fetch_accounts(
1209        &self,
1210        retry: usize,
1211        instance: &NodeState,
1212        height: u64,
1213        view: ViewNumber,
1214        fee_merkle_tree_root: FeeMerkleCommitment,
1215        accounts: &[FeeAccount],
1216    ) -> anyhow::Result<Vec<FeeAccountProof>> {
1217        // Try to get the accounts on local providers first
1218        let accounts_vec = accounts.to_vec();
1219        let local_result = self
1220            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1221                clone! {(instance, accounts_vec) async move {
1222                    provider
1223                        .try_fetch_accounts(
1224                            retry,
1225                            &instance,
1226                            height,
1227                            view,
1228                            fee_merkle_tree_root,
1229                            &accounts_vec,
1230                        )
1231                        .await
1232                }}
1233            }})
1234            .await;
1235
1236        // Check if we were successful locally
1237        match &local_result {
1238            Ok(_) => return local_result,
1239            Err(err) => tracing::debug!("{err:#}"),
1240        }
1241
1242        // If that fails, try the remote ones
1243        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1244            clone!{(instance, accounts_vec) async move {
1245                provider
1246                .try_fetch_accounts(
1247                    retry,
1248                    &instance,
1249                    height,
1250                    view,
1251                    fee_merkle_tree_root,
1252                    &accounts_vec,
1253                ).await
1254            }}
1255        }})
1256        .await
1257    }
1258
1259    async fn try_remember_blocks_merkle_tree(
1260        &self,
1261        retry: usize,
1262        instance: &NodeState,
1263        height: u64,
1264        view: ViewNumber,
1265        mt: &mut BlockMerkleTree,
1266    ) -> anyhow::Result<()> {
1267        // Try to remember the blocks merkle tree on local providers first
1268        let local_result = self
1269            .on_local_providers(clone! {(mt, instance) move |provider| {
1270                let mut mt = mt.clone();
1271                clone! {(instance) async move {
1272                    // Perform the call
1273                    provider
1274                        .try_remember_blocks_merkle_tree(
1275                            retry,
1276                            &instance,
1277                            height,
1278                            view,
1279                            &mut mt,
1280                        )
1281                        .await?;
1282
1283                    // Return the merkle tree so we can modify it
1284                    Ok(mt)
1285                }}
1286            }})
1287            .await;
1288
1289        // Check if we were successful locally
1290        if let Ok(modified_mt) = local_result {
1291            // Set the merkle tree to the output of the successful local call
1292            *mt = modified_mt;
1293
1294            return Ok(());
1295        }
1296
1297        // If that fails, try the remote ones
1298        let remote_result = self
1299            .on_remote_providers(clone! {(mt, instance) move |provider| {
1300                let mut mt = mt.clone();
1301                clone!{(instance) async move {
1302                    // Perform the call
1303                    provider
1304                    .try_remember_blocks_merkle_tree(
1305                        retry,
1306                        &instance,
1307                        height,
1308                        view,
1309                        &mut mt,
1310                    )
1311                    .await?;
1312
1313                    // Return the merkle tree
1314                    Ok(mt)
1315                }}
1316            }})
1317            .await?;
1318
1319        // Update the original, local merkle tree
1320        *mt = remote_result;
1321
1322        Ok(())
1323    }
1324
1325    async fn try_fetch_chain_config(
1326        &self,
1327        retry: usize,
1328        commitment: Commitment<ChainConfig>,
1329    ) -> anyhow::Result<ChainConfig> {
1330        // Try fetching the chain config on the local providers first
1331        let local_result = self
1332            .on_local_providers(move |provider| async move {
1333                provider.try_fetch_chain_config(retry, commitment).await
1334            })
1335            .await;
1336
1337        // Check if we were successful locally
1338        match &local_result {
1339            Ok(_) => return local_result,
1340            Err(err) => tracing::debug!("{err:#}"),
1341        }
1342
1343        // If that fails, try the remote ones
1344        self.on_remote_providers(move |provider| async move {
1345            provider.try_fetch_chain_config(retry, commitment).await
1346        })
1347        .await
1348    }
1349
1350    async fn try_fetch_reward_merkle_tree_v2(
1351        &self,
1352        retry: usize,
1353        height: u64,
1354        view: ViewNumber,
1355        reward_merkle_tree_root: RewardMerkleCommitmentV2,
1356        accounts: Arc<Vec<RewardAccountV2>>,
1357    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
1358        let local_result = self
1359            .on_local_providers(clone! {(accounts) move |provider| {
1360                clone! {(accounts) async move {
1361                    provider
1362                        .try_fetch_reward_merkle_tree_v2(
1363                            retry,
1364                            height,
1365                            view,
1366                            reward_merkle_tree_root,
1367                            accounts,
1368                        )
1369                        .await
1370                }}
1371            }})
1372            .await;
1373
1374        // Check if we were successful locally
1375        match &local_result {
1376            Ok(_) => return local_result,
1377            Err(err) => tracing::debug!("{err:#}"),
1378        }
1379
1380        // If that fails, try the remote ones
1381        self.on_remote_providers(clone! {(accounts) move |provider| {
1382            clone!{(accounts) async move {
1383                provider
1384                .try_fetch_reward_merkle_tree_v2(
1385                    retry,
1386                    height,
1387                    view,
1388                    reward_merkle_tree_root,
1389                    accounts
1390                ).await
1391            }}
1392        }})
1393        .await
1394    }
1395
1396    async fn try_fetch_reward_accounts_v1(
1397        &self,
1398        retry: usize,
1399        instance: &NodeState,
1400        height: u64,
1401        view: ViewNumber,
1402        reward_merkle_tree_root: RewardMerkleCommitmentV1,
1403        accounts: &[RewardAccountV1],
1404    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1405        // Try to get the accounts on local providers first
1406        let accounts_vec = accounts.to_vec();
1407        let local_result = self
1408            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1409                clone! {(instance, accounts_vec) async move {
1410                    provider
1411                        .try_fetch_reward_accounts_v1(
1412                            retry,
1413                            &instance,
1414                            height,
1415                            view,
1416                            reward_merkle_tree_root,
1417                            &accounts_vec,
1418                        )
1419                        .await
1420                }}
1421            }})
1422            .await;
1423
1424        // Check if we were successful locally
1425        match &local_result {
1426            Ok(_) => return local_result,
1427            Err(err) => tracing::debug!("{err:#}"),
1428        }
1429
1430        // If that fails, try the remote ones
1431        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1432            clone!{(instance, accounts_vec) async move {
1433                provider
1434                .try_fetch_reward_accounts_v1(
1435                    retry,
1436                    &instance,
1437                    height,
1438                    view,
1439                    reward_merkle_tree_root,
1440                    &accounts_vec,
1441                ).await
1442            }}
1443        }})
1444        .await
1445    }
1446
1447    async fn try_fetch_state_cert(
1448        &self,
1449        retry: usize,
1450        epoch: u64,
1451    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1452        // Try fetching the state cert on the local providers first
1453        let local_result = self
1454            .on_local_providers(move |provider| async move {
1455                provider.try_fetch_state_cert(retry, epoch).await
1456            })
1457            .await;
1458
1459        // Check if we were successful locally
1460        match &local_result {
1461            Ok(_) => return local_result,
1462            Err(err) => tracing::debug!("{err:#}"),
1463        }
1464
1465        // If that fails, try the remote ones
1466        self.on_remote_providers(move |provider| async move {
1467            provider.try_fetch_state_cert(retry, epoch).await
1468        })
1469        .await
1470    }
1471
1472    fn backoff(&self) -> &BackoffParams {
1473        &self.backoff
1474    }
1475
1476    fn name(&self) -> String {
1477        format!(
1478            "[{}]",
1479            self.providers
1480                .lock()
1481                .iter()
1482                .map(|p| p.name())
1483                .collect::<Vec<_>>()
1484                .join(", ")
1485        )
1486    }
1487
1488    async fn fetch_accounts(
1489        &self,
1490        instance: &NodeState,
1491        height: u64,
1492        view: ViewNumber,
1493        fee_merkle_tree_root: FeeMerkleCommitment,
1494        accounts: Vec<FeeAccount>,
1495    ) -> anyhow::Result<Vec<FeeAccountProof>> {
1496        // Try to get the accounts on local providers first
1497        let accounts_vec = accounts.to_vec();
1498        let local_result = self
1499            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1500                clone! {(instance, accounts_vec) async move {
1501                    provider
1502                        .try_fetch_accounts(
1503                            0,
1504                            &instance,
1505                            height,
1506                            view,
1507                            fee_merkle_tree_root,
1508                            &accounts_vec,
1509                        )
1510                        .await
1511                }}
1512            }})
1513            .await;
1514
1515        // Check if we were successful locally
1516        match &local_result {
1517            Ok(_) => return local_result,
1518            Err(err) => tracing::debug!("{err:#}"),
1519        }
1520
1521        // If that fails, try the remote ones (with retry)
1522        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1523            clone!{(instance, accounts_vec) async move {
1524                provider
1525                .fetch_accounts(
1526                    &instance,
1527                    height,
1528                    view,
1529                    fee_merkle_tree_root,
1530                    accounts_vec,
1531                ).await
1532            }}
1533        }})
1534        .await
1535    }
1536
1537    async fn fetch_leaf(
1538        &self,
1539        height: u64,
1540        stake_table: HSStakeTable<SeqTypes>,
1541        success_threshold: U256,
1542    ) -> anyhow::Result<Leaf2> {
1543        // Try fetching the leaf on the local providers first
1544        let local_result = self
1545            .on_local_providers(clone! {(stake_table) move |provider| {
1546                clone!{(stake_table) async move {
1547                    provider
1548                        .try_fetch_leaf(0, height, stake_table, success_threshold)
1549                        .await
1550                }}
1551            }})
1552            .await;
1553
1554        // Check if we were successful locally
1555        match &local_result {
1556            Ok(_) => return local_result,
1557            Err(err) => tracing::debug!("{err:#}"),
1558        }
1559
1560        // If that fails, try the remote ones (with retry)
1561        self.on_remote_providers(clone! {(stake_table) move |provider| {
1562        clone!{(stake_table) async move {
1563            provider
1564                .fetch_leaf(height, stake_table, success_threshold)
1565                .await
1566        }}
1567        }})
1568        .await
1569    }
1570
1571    async fn fetch_chain_config(
1572        &self,
1573        commitment: Commitment<ChainConfig>,
1574    ) -> anyhow::Result<ChainConfig> {
1575        // Try fetching the chain config on the local providers first
1576        let local_result = self
1577            .on_local_providers(move |provider| async move {
1578                provider.try_fetch_chain_config(0, commitment).await
1579            })
1580            .await;
1581
1582        // Check if we were successful locally
1583        match &local_result {
1584            Ok(_) => return local_result,
1585            Err(err) => tracing::debug!("{err:#}"),
1586        }
1587
1588        // If that fails, try the remote ones (with retry)
1589        self.on_remote_providers(move |provider| async move {
1590            provider.fetch_chain_config(commitment).await
1591        })
1592        .await
1593    }
1594
1595    async fn fetch_reward_accounts_v1(
1596        &self,
1597        instance: &NodeState,
1598        height: u64,
1599        view: ViewNumber,
1600        reward_merkle_tree_root: RewardMerkleCommitmentV1,
1601        accounts: Vec<RewardAccountV1>,
1602    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1603        // Try to get the accounts on local providers first
1604        let accounts_vec = accounts.to_vec();
1605        let local_result = self
1606            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1607                clone! {(instance, accounts_vec) async move {
1608                    provider
1609                        .try_fetch_reward_accounts_v1(
1610                            0,
1611                            &instance,
1612                            height,
1613                            view,
1614                            reward_merkle_tree_root,
1615                            &accounts_vec,
1616                        )
1617                        .await
1618                }}
1619            }})
1620            .await;
1621
1622        // Check if we were successful locally
1623        match &local_result {
1624            Ok(_) => return local_result,
1625            Err(err) => tracing::debug!("{err:#}"),
1626        }
1627
1628        // If that fails, try the remote ones (with retry)
1629        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1630            clone!{(instance, accounts_vec) async move {
1631                provider
1632                .fetch_reward_accounts_v1(
1633                    &instance,
1634                    height,
1635                    view,
1636                    reward_merkle_tree_root,
1637                    accounts_vec,
1638                ).await
1639            }}
1640        }})
1641        .await
1642    }
1643
1644    async fn fetch_state_cert(
1645        &self,
1646        epoch: u64,
1647    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1648        let local_result = self
1649            .on_local_providers(move |provider| async move {
1650                provider.try_fetch_state_cert(0, epoch).await
1651            })
1652            .await;
1653
1654        // Check if we were successful locally
1655        match &local_result {
1656            Ok(_) => return local_result,
1657            Err(err) => tracing::debug!("{err:#}"),
1658        }
1659
1660        // If that fails, try the remote ones (with retry)
1661        self.on_remote_providers(
1662            move |provider| async move { provider.fetch_state_cert(epoch).await },
1663        )
1664        .await
1665    }
1666
1667    async fn remember_blocks_merkle_tree(
1668        &self,
1669        instance: &NodeState,
1670        height: u64,
1671        view: ViewNumber,
1672        mt: &mut BlockMerkleTree,
1673    ) -> anyhow::Result<()> {
1674        // Try to remember the blocks merkle tree on local providers first
1675        let local_result = self
1676            .on_local_providers(clone! {(mt, instance) move |provider| {
1677                let mut mt = mt.clone();
1678                clone! {(instance) async move {
1679                    // Perform the call
1680                    provider
1681                        .try_remember_blocks_merkle_tree(
1682                            0,
1683                            &instance,
1684                            height,
1685                            view,
1686                            &mut mt,
1687                        )
1688                        .await?;
1689
1690                    // Return the merkle tree so we can modify it
1691                    Ok(mt)
1692                }}
1693            }})
1694            .await;
1695
1696        // Check if we were successful locally
1697        if let Ok(modified_mt) = local_result {
1698            // Set the merkle tree to the one with the
1699            // successful call
1700            *mt = modified_mt;
1701
1702            return Ok(());
1703        }
1704
1705        // If that fails, try the remote ones (with retry)
1706        let remote_result = self
1707            .on_remote_providers(clone! {(mt, instance) move |provider| {
1708                let mut mt = mt.clone();
1709                clone!{(instance) async move {
1710                    // Perform the call
1711                    provider
1712                    .remember_blocks_merkle_tree(
1713                        &instance,
1714                        height,
1715                        view,
1716                        &mut mt,
1717                    )
1718                    .await?;
1719
1720                    // Return the merkle tree
1721                    Ok(mt)
1722                }}
1723            }})
1724            .await?;
1725
1726        // Update the original, local merkle tree
1727        *mt = remote_result;
1728
1729        Ok(())
1730    }
1731
1732    fn is_local(&self) -> bool {
1733        self.providers.lock().iter().all(|p| p.is_local())
1734    }
1735}
1736
1737/// Add accounts to the in-memory consensus state.
1738/// We use this during catchup after receiving verified accounts.
1739pub async fn add_fee_accounts_to_state<I: hotshot::traits::NodeImplementation<SeqTypes>>(
1740    consensus_handle: &ConsensusHandle<SeqTypes, I>,
1741    view: &ViewNumber,
1742    accounts: &[FeeAccount],
1743    tree: &FeeMerkleTree,
1744    leaf: Leaf2,
1745) -> anyhow::Result<()> {
1746    let (existing_state, delta) = consensus_handle.state_and_delta(*view).await;
1747    let (state, delta) = match existing_state {
1748        Some(existing) => {
1749            let mut state = (*existing).clone();
1750            for account in accounts {
1751                if let Some((proof, _)) = FeeAccountProof::prove(tree, (*account).into()) {
1752                    if let Err(err) = proof.remember(&mut state.fee_merkle_tree) {
1753                        tracing::warn!(
1754                            ?view,
1755                            %account,
1756                            "cannot update fetched account state: {err:#}"
1757                        );
1758                    }
1759                } else {
1760                    tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1761                };
1762            }
1763            (Arc::new(state), delta)
1764        },
1765        None => {
1766            // If we don't already have a leaf for this view, or if we don't have the view
1767            // at all, we can create a new view based on the recovered leaf and add it to
1768            // our state map. In this case, we must also add the leaf to the saved leaves
1769            // map to ensure consistency.
1770            let mut state = ValidatedState::from_header(leaf.block_header());
1771            state.fee_merkle_tree = tree.clone();
1772            (Arc::new(state), None)
1773        },
1774    };
1775
1776    consensus_handle
1777        .update_leaf(leaf, state, delta)
1778        .await
1779        .with_context(|| "failed to update leaf")?;
1780
1781    Ok(())
1782}
1783
1784/// Add accounts to the in-memory consensus state.
1785/// We use this during catchup after receiving verified accounts.
1786pub async fn add_v2_reward_accounts_to_state<I: hotshot::traits::NodeImplementation<SeqTypes>>(
1787    consensus_handle: &ConsensusHandle<SeqTypes, I>,
1788    view: &ViewNumber,
1789    accounts: &[RewardAccountV2],
1790    tree: &RewardMerkleTreeV2,
1791    leaf: Leaf2,
1792) -> anyhow::Result<()> {
1793    let (existing_state, delta) = consensus_handle.state_and_delta(*view).await;
1794    let (state, delta) = match existing_state {
1795        Some(existing) => {
1796            let mut state = (*existing).clone();
1797            for account in accounts {
1798                if let Some((proof, _)) = RewardAccountProofV2::prove(tree, (*account).into()) {
1799                    if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v2) {
1800                        tracing::warn!(
1801                            ?view,
1802                            %account,
1803                            "cannot update fetched account state: {err:#}"
1804                        );
1805                    }
1806                } else {
1807                    tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1808                };
1809            }
1810            (Arc::new(state), delta)
1811        },
1812        None => {
1813            // If we don't already have a leaf for this view, or if we don't have the view
1814            // at all, we can create a new view based on the recovered leaf and add it to
1815            // our state map. In this case, we must also add the leaf to the saved leaves
1816            // map to ensure consistency.
1817            let mut state = ValidatedState::from_header(leaf.block_header());
1818            state.reward_merkle_tree_v2 = tree.clone();
1819            (Arc::new(state), None)
1820        },
1821    };
1822
1823    consensus_handle
1824        .update_leaf(leaf, state, delta)
1825        .await
1826        .with_context(|| "failed to update leaf")?;
1827
1828    Ok(())
1829}
1830
1831/// Add accounts to the in-memory consensus state.
1832/// We use this during catchup after receiving verified accounts.
1833pub async fn add_v1_reward_accounts_to_state<I: hotshot::traits::NodeImplementation<SeqTypes>>(
1834    consensus_handle: &ConsensusHandle<SeqTypes, I>,
1835    view: &ViewNumber,
1836    accounts: &[RewardAccountV1],
1837    tree: &RewardMerkleTreeV1,
1838    leaf: Leaf2,
1839) -> anyhow::Result<()> {
1840    let (existing_state, delta) = consensus_handle.state_and_delta(*view).await;
1841    let (state, delta) = match existing_state {
1842        Some(existing) => {
1843            let mut state = (*existing).clone();
1844            for account in accounts {
1845                if let Some((proof, _)) = RewardAccountProofV1::prove(tree, (*account).into()) {
1846                    if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v1) {
1847                        tracing::warn!(
1848                            ?view,
1849                            %account,
1850                            "cannot update fetched account state: {err:#}"
1851                        );
1852                    }
1853                } else {
1854                    tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1855                };
1856            }
1857            (Arc::new(state), delta)
1858        },
1859        None => {
1860            // If we don't already have a leaf for this view, or if we don't have the view
1861            // at all, we can create a new view based on the recovered leaf and add it to
1862            // our state map. In this case, we must also add the leaf to the saved leaves
1863            // map to ensure consistency.
1864            let mut state = ValidatedState::from_header(leaf.block_header());
1865            state.reward_merkle_tree_v1 = tree.clone();
1866            (Arc::new(state), None)
1867        },
1868    };
1869
1870    consensus_handle
1871        .update_leaf(leaf, state, delta)
1872        .await
1873        .with_context(|| "failed to update leaf")?;
1874
1875    Ok(())
1876}
1877
1878#[cfg(test)]
1879mod test {
1880    use super::*;
1881
1882    #[test]
1883    fn test_peer_priority() {
1884        let good_peer = PeerScore {
1885            requests: 1000,
1886            failures: 2,
1887        };
1888        let bad_peer = PeerScore {
1889            requests: 10,
1890            failures: 1,
1891        };
1892        assert!(good_peer > bad_peer);
1893
1894        let mut peers: PriorityQueue<_, _> = [(0, good_peer), (1, bad_peer)].into_iter().collect();
1895        assert_eq!(peers.pop(), Some((0, good_peer)));
1896        assert_eq!(peers.pop(), Some((1, bad_peer)));
1897    }
1898}