espresso_node/
catchup.rs

1use std::{
2    cmp::Ordering,
3    collections::HashMap,
4    fmt::{Debug, Display},
5    sync::Arc,
6    time::Duration,
7};
8
9use alloy::primitives::U256;
10use anyhow::{Context, anyhow, bail, ensure};
11use async_lock::RwLock;
12use async_trait::async_trait;
13use committable::{Commitment, Committable};
14use espresso_types::{
15    BackoffParams, BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment,
16    FeeMerkleTree, Leaf2, NodeState, PubKey, SeqTypes, ValidatedState,
17    config::PublicNetworkConfig,
18    traits::SequencerPersistence,
19    v0::traits::StateCatchup,
20    v0_3::{
21        ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1,
22        RewardMerkleTreeV1,
23    },
24    v0_4::{
25        PermittedRewardMerkleTreeV2, RewardAccountProofV2, RewardAccountV2,
26        RewardMerkleCommitmentV2, RewardMerkleTreeV2, forgotten_accounts_include,
27    },
28};
29use futures::{
30    StreamExt,
31    future::{Future, FutureExt, TryFuture, TryFutureExt},
32    stream::FuturesUnordered,
33};
34use hotshot_types::{
35    ValidatorConfig,
36    consensus::Consensus,
37    data::ViewNumber,
38    message::UpgradeLock,
39    network::NetworkConfig,
40    simple_certificate::LightClientStateUpdateCertificateV2,
41    stake_table::HSStakeTable,
42    traits::{
43        ValidatedState as ValidatedStateTrait,
44        metrics::{Counter, CounterFamily, Metrics},
45        network::ConnectedNetwork,
46    },
47    utils::{View, ViewInner, verify_leaf_chain},
48};
49use itertools::Itertools;
50use jf_merkle_tree_compat::{ForgetableMerkleTreeScheme, MerkleTreeScheme, prelude::MerkleNode};
51use parking_lot::Mutex;
52use priority_queue::PriorityQueue;
53use serde::de::DeserializeOwned;
54use surf_disco::Request;
55use tide_disco::error::ServerError;
56use tokio::time::timeout;
57use tokio_util::task::AbortOnDropHandle;
58use url::Url;
59use vbs::version::StaticVersionType;
60use versions::EPOCH_VERSION;
61
62use crate::api::{BlocksFrontier, RewardMerkleTreeDataSource, RewardMerkleTreeV2Data};
63
64// This newtype is probably not worth having. It's only used to be able to log
65// URLs before doing requests.
66#[derive(Debug, Clone)]
67struct Client<ServerError, ApiVer: StaticVersionType> {
68    inner: surf_disco::Client<ServerError, ApiVer>,
69    url: Url,
70    requests: Arc<Box<dyn Counter>>,
71    failures: Arc<Box<dyn Counter>>,
72}
73
74impl<ApiVer: StaticVersionType> Client<ServerError, ApiVer> {
75    pub fn new(
76        url: Url,
77        requests: &(impl CounterFamily + ?Sized),
78        failures: &(impl CounterFamily + ?Sized),
79    ) -> Self {
80        Self {
81            inner: surf_disco::Client::new(url.clone()),
82            requests: Arc::new(requests.create(vec![url.to_string()])),
83            failures: Arc::new(failures.create(vec![url.to_string()])),
84            url,
85        }
86    }
87
88    pub fn get<T: DeserializeOwned>(&self, route: &str) -> Request<T, ServerError, ApiVer> {
89        self.inner.get(route)
90    }
91}
92
93/// A score of a catchup peer, based on our interactions with that peer.
94///
95/// The score accounts for malicious peers -- i.e. peers that gave us an invalid response to a
96/// verifiable request -- and faulty/unreliable peers -- those that fail to respond to requests at
97/// all. The score has a comparison function where higher is better, or in other words `p1 > p2`
98/// means we believe we are more likely to successfully catch up using `p1` than `p2`. This makes it
99/// convenient and efficient to collect peers in a priority queue which we can easily convert to a
100/// list sorted by reliability.
101#[derive(Clone, Copy, Debug, Default)]
102struct PeerScore {
103    requests: usize,
104    failures: usize,
105}
106
107impl Ord for PeerScore {
108    fn cmp(&self, other: &Self) -> Ordering {
109        // Compare failure rates: `self` is better than `other` if
110        //      self.failures / self.requests < other.failures / other.requests
111        // or equivalently
112        //      other.failures * self.requests > self.failures * other.requests
113        (other.failures * self.requests).cmp(&(self.failures * other.requests))
114    }
115}
116
117impl PartialOrd for PeerScore {
118    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
119        Some(self.cmp(other))
120    }
121}
122
123impl PartialEq for PeerScore {
124    fn eq(&self, other: &Self) -> bool {
125        self.cmp(other).is_eq()
126    }
127}
128
129impl Eq for PeerScore {}
130
131#[derive(Debug, Clone, Default)]
132pub struct StatePeers<ApiVer: StaticVersionType> {
133    // Peer IDs, ordered by reliability score. Each ID is an index into `clients`.
134    scores: Arc<RwLock<PriorityQueue<usize, PeerScore>>>,
135    clients: Vec<Client<ServerError, ApiVer>>,
136    backoff: BackoffParams,
137    /// Base timeout for per peer catchup request
138    base_timeout: Duration,
139}
140
141impl<ApiVer: StaticVersionType> StatePeers<ApiVer> {
142    async fn fetch<Fut>(
143        &self,
144        retry: usize,
145        f: impl Fn(Client<ServerError, ApiVer>) -> Fut,
146    ) -> anyhow::Result<Fut::Ok>
147    where
148        Fut: TryFuture<Error: Display>,
149    {
150        // Since we have generally have multiple peers we can catch up from, we want a fairly
151        // aggressive timeout for requests: if a peer is not responding quickly, we're better off
152        // just trying the next one rather than waiting, and this prevents a malicious peer from
153        // delaying catchup for a long time.
154        //
155        // However, if we set the timeout _too_ aggressively, we might fail to catch up even from an
156        // honest peer, and thus never make progress. Thus, we start with a base timeout (default
157        // 2s), which is reasonable for an HTTP request. If that fails with all of our peers, we
158        // increase the timeout by the base amount for each successive retry, until we eventually
159        // succeed. The base timeout is configurable via ESPRESSO_SEQUENCER_CATCHUP_BASE_TIMEOUT.
160        let timeout_dur = self.base_timeout * (retry as u32 + 1);
161
162        // Keep track of which peers we make requests to and which succeed (`true`) or fail (`false`),
163        // so we can update reliability scores at the end.
164        let mut requests = HashMap::new();
165        let mut res = Err(anyhow!("failed fetching from every peer"));
166
167        // Try each peer in order of reliability score, until we succeed. We clone out of
168        // `self.scores` because it is small (contains only numeric IDs and scores), so this clone
169        // is a lot cheaper than holding the read lock the entire time we are making requests (which
170        // could be a while).
171        let mut scores = { (*self.scores.read().await).clone() };
172        let mut logs = vec![format!("Fetching failed.\n")];
173        while let Some((id, score)) = scores.pop() {
174            let client = &self.clients[id];
175            tracing::info!("fetching from {}", client.url);
176            match timeout(timeout_dur, TryFutureExt::into_future(f(client.clone()))).await {
177                Ok(Ok(t)) => {
178                    requests.insert(id, true);
179                    res = Ok(t);
180                    logs = Vec::new();
181                    break;
182                },
183                Ok(Err(err)) => {
184                    tracing::debug!(id, ?score, peer = %client.url, "error from peer: {err:#}");
185                    logs.push(format!(
186                        "Error from peer {} with id {id} and score {score:?}: {err:#}",
187                        client.url
188                    ));
189                    requests.insert(id, false);
190                },
191                Err(_) => {
192                    tracing::debug!(id, ?score, peer = %client.url, ?timeout_dur, "request timed out");
193                    logs.push(format!(
194                        "Error from peer {} with id {id} and score {score:?}: request timed out",
195                        client.url
196                    ));
197                    requests.insert(id, false);
198                },
199            }
200        }
201
202        if !logs.is_empty() {
203            tracing::warn!("{}", logs.join("\n"));
204        }
205
206        // Update client scores.
207        let mut scores = self.scores.write().await;
208        for (id, success) in requests {
209            scores.change_priority_by(&id, |score| {
210                score.requests += 1;
211                self.clients[id].requests.add(1);
212                if !success {
213                    score.failures += 1;
214                    self.clients[id].failures.add(1);
215                }
216            });
217        }
218
219        res
220    }
221
222    pub fn from_urls(
223        urls: Vec<Url>,
224        backoff: BackoffParams,
225        base_timeout: Duration,
226        metrics: &(impl Metrics + ?Sized),
227    ) -> Self {
228        if urls.is_empty() {
229            panic!("Cannot create StatePeers with no peers");
230        }
231
232        let metrics = metrics.subgroup("catchup".into());
233        let requests = metrics.counter_family("requests".into(), vec!["peer".into()]);
234        let failures = metrics.counter_family("request_failures".into(), vec!["peer".into()]);
235
236        let scores = urls
237            .iter()
238            .enumerate()
239            .map(|(i, _)| (i, PeerScore::default()))
240            .collect();
241        let clients = urls
242            .into_iter()
243            .map(|url| Client::new(url, &*requests, &*failures))
244            .collect();
245
246        Self {
247            clients,
248            scores: Arc::new(RwLock::new(scores)),
249            backoff,
250            base_timeout,
251        }
252    }
253
254    #[tracing::instrument(skip(self, my_own_validator_config))]
255    pub async fn fetch_config(
256        &self,
257        my_own_validator_config: ValidatorConfig<SeqTypes>,
258    ) -> anyhow::Result<NetworkConfig<SeqTypes>> {
259        self.backoff()
260            .retry(self, move |provider, retry| {
261                let my_own_validator_config = my_own_validator_config.clone();
262                async move {
263                    let cfg: PublicNetworkConfig = provider
264                        .fetch(retry, |client| {
265                            let url = client.url.join("config/hotshot").unwrap();
266
267                            reqwest::get(url.clone())
268                        })
269                        .await?
270                        .json()
271                        .await?;
272                    cfg.into_network_config(my_own_validator_config)
273                        .context("fetched config, but failed to convert to private config")
274                }
275                .boxed()
276            })
277            .await
278    }
279}
280
281#[async_trait]
282impl<ApiVer: StaticVersionType> StateCatchup for StatePeers<ApiVer> {
283    #[tracing::instrument(skip(self, _instance))]
284    async fn try_fetch_accounts(
285        &self,
286        retry: usize,
287        _instance: &NodeState,
288        height: u64,
289        view: ViewNumber,
290        fee_merkle_tree_root: FeeMerkleCommitment,
291        accounts: &[FeeAccount],
292    ) -> anyhow::Result<Vec<FeeAccountProof>> {
293        self.fetch(retry, |client| async move {
294            let tree = client
295                .inner
296                .post::<FeeMerkleTree>(&format!("catchup/{height}/{}/accounts", view.u64()))
297                .body_binary(&accounts.to_vec())?
298                .send()
299                .await?;
300
301            // Verify proofs.
302            let mut proofs = Vec::new();
303            for account in accounts {
304                let (proof, _) = FeeAccountProof::prove(&tree, (*account).into())
305                    .context(format!("response missing fee account {account}"))?;
306                proof.verify(&fee_merkle_tree_root).context(format!(
307                    "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
308                ))?;
309                proofs.push(proof);
310            }
311
312            anyhow::Ok(proofs)
313        })
314        .await
315    }
316
317    #[tracing::instrument(skip(self, _instance, mt))]
318    async fn try_remember_blocks_merkle_tree(
319        &self,
320        retry: usize,
321        _instance: &NodeState,
322        height: u64,
323        view: ViewNumber,
324        mt: &mut BlockMerkleTree,
325    ) -> anyhow::Result<()> {
326        *mt = self
327            .fetch(retry, |client| {
328                let mut mt = mt.clone();
329                async move {
330                    let frontier = client
331                        .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
332                        .send()
333                        .await?;
334                    let elem = frontier
335                        .elem()
336                        .context("provided frontier is missing leaf element")?;
337                    mt.remember(mt.num_leaves() - 1, *elem, &frontier)
338                        .context("verifying block proof")?;
339                    anyhow::Ok(mt)
340                }
341            })
342            .await?;
343        Ok(())
344    }
345
346    async fn try_fetch_chain_config(
347        &self,
348        retry: usize,
349        commitment: Commitment<ChainConfig>,
350    ) -> anyhow::Result<ChainConfig> {
351        self.fetch(retry, |client| async move {
352            let cf = client
353                .get::<ChainConfig>(&format!("catchup/chain-config/{commitment}"))
354                .send()
355                .await?;
356            ensure!(
357                cf.commit() == commitment,
358                "received chain config with mismatched commitment: expected {commitment}, got {}",
359                cf.commit()
360            );
361            Ok(cf)
362        })
363        .await
364    }
365
366    async fn try_fetch_leaf(
367        &self,
368        retry: usize,
369        height: u64,
370        stake_table: HSStakeTable<SeqTypes>,
371        success_threshold: U256,
372    ) -> anyhow::Result<Leaf2> {
373        // Get the leaf chain
374        let leaf_chain = self
375            .fetch(retry, |client| async move {
376                let leaf = client
377                    .get::<Vec<Leaf2>>(&format!("catchup/{height}/leafchain"))
378                    .send()
379                    .await?;
380                anyhow::Ok(leaf)
381            })
382            .await
383            .with_context(|| format!("failed to fetch leaf chain at height {height}"))?;
384
385        // Verify it, returning the leaf at the given height
386        verify_leaf_chain(
387            leaf_chain,
388            &stake_table,
389            success_threshold,
390            height,
391            &UpgradeLock::<SeqTypes>::new(versions::Upgrade::trivial(EPOCH_VERSION)),
392        )
393        .await
394        .with_context(|| format!("failed to verify leaf chain at height {height}"))
395    }
396
397    async fn try_fetch_reward_merkle_tree_v2(
398        &self,
399        retry: usize,
400        height: u64,
401        view: ViewNumber,
402        reward_merkle_tree_root: RewardMerkleCommitmentV2,
403        accounts: Arc<Vec<RewardAccountV2>>,
404    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
405        let result = self
406            .fetch(retry, |client| async move {
407                // Try the catchup endpoint first which returns tree from consensuss decided state
408                // if not present, then fall back to
409                // the reward-state-v2 endpoint which returns from storage decided state
410                let tree_bytes = match client
411                    .inner
412                    .get::<Vec<u8>>(&format!("catchup/reward-merkle-tree-v2/{height}/{}", *view))
413                    .send()
414                    .await
415                {
416                    Ok(bytes) => bytes,
417                    Err(err) => {
418                        tracing::info!(
419                            "catchup endpoint failed, falling back to reward-state-v2: {err:#}"
420                        );
421                        client
422                            .inner
423                            .get::<Vec<u8>>(&format!(
424                                "reward-state-v2/reward-merkle-tree-v2/{height}"
425                            ))
426                            .send()
427                            .await?
428                    },
429                };
430
431                Ok::<Vec<u8>, anyhow::Error>(tree_bytes)
432            })
433            .await
434            .context("Fetching from peer failed")?;
435
436        let tree_data = bincode::deserialize::<RewardMerkleTreeV2Data>(&result)
437            .context("Failed to deserialize merkle tree from catchup")?;
438
439        let tree: PermittedRewardMerkleTreeV2 =
440            PermittedRewardMerkleTreeV2::try_from_kv_set(tree_data.balances).await?;
441
442        ensure!(
443            tree.tree.commitment() == reward_merkle_tree_root,
444            "RewardMerkleTreeV2 from peer failed commitment check."
445        );
446        ensure!(!forgotten_accounts_include(&tree, &accounts));
447
448        Ok(tree)
449    }
450
451    #[tracing::instrument(skip(self, _instance))]
452    async fn try_fetch_reward_accounts_v1(
453        &self,
454        retry: usize,
455        _instance: &NodeState,
456        height: u64,
457        view: ViewNumber,
458        reward_merkle_tree_root: RewardMerkleCommitmentV1,
459        accounts: &[RewardAccountV1],
460    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
461        self.fetch(retry, |client| async move {
462            let tree = client
463                .inner
464                .post::<RewardMerkleTreeV1>(&format!(
465                    "catchup/{height}/{}/reward-accounts",
466                    view.u64()
467                ))
468                .body_binary(&accounts.to_vec())?
469                .send()
470                .await?;
471
472            // Verify proofs.
473            let mut proofs = Vec::new();
474            for account in accounts {
475                let (proof, _) = RewardAccountProofV1::prove(&tree, (*account).into())
476                    .context(format!("response missing reward account {account}"))?;
477                proof.verify(&reward_merkle_tree_root).context(format!(
478                    "invalid proof for v1 reward account {account}, root: \
479                     {reward_merkle_tree_root} height {height} view {view}"
480                ))?;
481                proofs.push(proof);
482            }
483
484            anyhow::Ok(proofs)
485        })
486        .await
487    }
488
489    async fn try_fetch_state_cert(
490        &self,
491        retry: usize,
492        epoch: u64,
493    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
494        self.fetch(retry, |client| async move {
495            client
496                .get::<LightClientStateUpdateCertificateV2<SeqTypes>>(&format!(
497                    "catchup/{epoch}/state-cert"
498                ))
499                .send()
500                .await
501        })
502        .await
503    }
504
505    fn backoff(&self) -> &BackoffParams {
506        &self.backoff
507    }
508
509    fn name(&self) -> String {
510        format!(
511            "StatePeers({})",
512            self.clients
513                .iter()
514                .map(|client| client.url.to_string())
515                .join(",")
516        )
517    }
518
519    fn is_local(&self) -> bool {
520        false
521    }
522}
523
524pub(crate) trait CatchupStorage: Sync {
525    /// Get the state of the requested `accounts`.
526    ///
527    /// The state is fetched from a snapshot at the given height and view, which _must_ correspond!
528    /// `height` is provided to simplify lookups for backends where data is not indexed by view.
529    /// This function is intended to be used for catchup, so `view` should be no older than the last
530    /// decided view.
531    ///
532    /// If successful, this function also returns the leaf from `view`, if it is available. This can
533    /// be used to add the recovered state to HotShot's state map, so that future requests can get
534    /// the state from memory rather than storage.
535    fn get_accounts(
536        &self,
537        _instance: &NodeState,
538        _height: u64,
539        _view: ViewNumber,
540        _accounts: &[FeeAccount],
541    ) -> impl Send + Future<Output = anyhow::Result<(FeeMerkleTree, Leaf2)>> {
542        // Merklized state catchup is only supported by persistence backends that provide merklized
543        // state storage. This default implementation is overridden for those that do. Otherwise,
544        // catchup can still be provided by fetching undecided merklized state from consensus
545        // memory.
546        async {
547            bail!("merklized state catchup is not supported for this data source");
548        }
549    }
550
551    fn get_reward_accounts_v1(
552        &self,
553        _instance: &NodeState,
554        _height: u64,
555        _view: ViewNumber,
556        _accounts: &[RewardAccountV1],
557    ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV1, Leaf2)>> {
558        async {
559            bail!("merklized state catchup is not supported for this data source");
560        }
561    }
562
563    fn get_reward_accounts_v2(
564        &self,
565        _instance: &NodeState,
566        _height: u64,
567        _view: ViewNumber,
568        _accounts: &[RewardAccountV2],
569    ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV2, Leaf2)>> {
570        async {
571            bail!("merklized state catchup is not supported for this data source");
572        }
573    }
574
575    /// Get the blocks Merkle tree frontier.
576    ///
577    /// The state is fetched from a snapshot at the given height and view, which _must_ correspond!
578    /// `height` is provided to simplify lookups for backends where data is not indexed by view.
579    /// This function is intended to be used for catchup, so `view` should be no older than the last
580    /// decided view.
581    fn get_frontier(
582        &self,
583        _instance: &NodeState,
584        _height: u64,
585        _view: ViewNumber,
586    ) -> impl Send + Future<Output = anyhow::Result<BlocksFrontier>> {
587        // Merklized state catchup is only supported by persistence backends that provide merklized
588        // state storage. This default implementation is overridden for those that do. Otherwise,
589        // catchup can still be provided by fetching undecided merklized state from consensus
590        // memory.
591        async {
592            bail!("merklized state catchup is not supported for this data source");
593        }
594    }
595
596    fn get_chain_config(
597        &self,
598        _commitment: Commitment<ChainConfig>,
599    ) -> impl Send + Future<Output = anyhow::Result<ChainConfig>> {
600        async {
601            bail!("chain config catchup is not supported for this data source");
602        }
603    }
604
605    fn get_leaf_chain(
606        &self,
607        _height: u64,
608    ) -> impl Send + Future<Output = anyhow::Result<Vec<Leaf2>>> {
609        async {
610            bail!("leaf chain catchup is not supported for this data source");
611        }
612    }
613}
614
615impl CatchupStorage for hotshot_query_service::data_source::MetricsDataSource {}
616
617impl<T, S> CatchupStorage for hotshot_query_service::data_source::ExtensibleDataSource<T, S>
618where
619    T: CatchupStorage,
620    S: Sync,
621{
622    async fn get_accounts(
623        &self,
624        instance: &NodeState,
625        height: u64,
626        view: ViewNumber,
627        accounts: &[FeeAccount],
628    ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
629        self.inner()
630            .get_accounts(instance, height, view, accounts)
631            .await
632    }
633
634    async fn get_reward_accounts_v2(
635        &self,
636        instance: &NodeState,
637        height: u64,
638        view: ViewNumber,
639        accounts: &[RewardAccountV2],
640    ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
641        self.inner()
642            .get_reward_accounts_v2(instance, height, view, accounts)
643            .await
644    }
645
646    async fn get_reward_accounts_v1(
647        &self,
648        instance: &NodeState,
649        height: u64,
650        view: ViewNumber,
651        accounts: &[RewardAccountV1],
652    ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
653        self.inner()
654            .get_reward_accounts_v1(instance, height, view, accounts)
655            .await
656    }
657
658    async fn get_frontier(
659        &self,
660        instance: &NodeState,
661        height: u64,
662        view: ViewNumber,
663    ) -> anyhow::Result<BlocksFrontier> {
664        self.inner().get_frontier(instance, height, view).await
665    }
666
667    async fn get_chain_config(
668        &self,
669        commitment: Commitment<ChainConfig>,
670    ) -> anyhow::Result<ChainConfig> {
671        self.inner().get_chain_config(commitment).await
672    }
673    async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
674        self.inner().get_leaf_chain(height).await
675    }
676}
677
678#[derive(Debug)]
679pub(crate) struct SqlStateCatchup<T> {
680    db: Arc<T>,
681    backoff: BackoffParams,
682}
683
684impl<T> SqlStateCatchup<T> {
685    pub(crate) fn new(db: Arc<T>, backoff: BackoffParams) -> Self {
686        Self { db, backoff }
687    }
688}
689
690#[async_trait]
691impl<T> StateCatchup for SqlStateCatchup<T>
692where
693    T: CatchupStorage + RewardMerkleTreeDataSource + Send + Sync,
694{
695    async fn try_fetch_leaf(
696        &self,
697        _retry: usize,
698        height: u64,
699        stake_table: HSStakeTable<SeqTypes>,
700        success_threshold: U256,
701    ) -> anyhow::Result<Leaf2> {
702        // Get the leaf chain
703        let leaf_chain = self
704            .db
705            .get_leaf_chain(height)
706            .await
707            .with_context(|| "failed to get leaf chain from DB")?;
708
709        // Verify the leaf chain
710        let leaf = verify_leaf_chain(
711            leaf_chain,
712            &stake_table,
713            success_threshold,
714            height,
715            &UpgradeLock::<SeqTypes>::new(versions::Upgrade::trivial(EPOCH_VERSION)),
716        )
717        .await
718        .with_context(|| "failed to verify leaf chain")?;
719
720        Ok(leaf)
721    }
722
723    // TODO: add a test for the account proof validation
724    // issue # 2102 (https://github.com/EspressoSystems/espresso-network/issues/2102)
725    #[tracing::instrument(skip(self, _retry, instance))]
726    async fn try_fetch_accounts(
727        &self,
728        _retry: usize,
729        instance: &NodeState,
730        block_height: u64,
731        view: ViewNumber,
732        fee_merkle_tree_root: FeeMerkleCommitment,
733        accounts: &[FeeAccount],
734    ) -> anyhow::Result<Vec<FeeAccountProof>> {
735        // Get the accounts
736        let (fee_merkle_tree_from_db, _) = self
737            .db
738            .get_accounts(instance, block_height, view, accounts)
739            .await
740            .with_context(|| "failed to get fee accounts from DB")?;
741
742        // Verify the accounts
743        let mut proofs = Vec::new();
744        for account in accounts {
745            let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree_from_db, (*account).into())
746                .context(format!("response missing account {account}"))?;
747            proof.verify(&fee_merkle_tree_root).context(format!(
748                "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
749            ))?;
750            proofs.push(proof);
751        }
752
753        Ok(proofs)
754    }
755
756    #[tracing::instrument(skip(self, _retry, instance, mt))]
757    async fn try_remember_blocks_merkle_tree(
758        &self,
759        _retry: usize,
760        instance: &NodeState,
761        bh: u64,
762        view: ViewNumber,
763        mt: &mut BlockMerkleTree,
764    ) -> anyhow::Result<()> {
765        if bh == 0 {
766            return Ok(());
767        }
768
769        let proof = self.db.get_frontier(instance, bh, view).await?;
770        match proof
771            .proof
772            .first()
773            .context(format!("empty proof for frontier at height {bh}"))?
774        {
775            MerkleNode::Leaf { pos, elem, .. } => mt
776                .remember(pos, elem, proof.clone())
777                .context("failed to remember proof"),
778            _ => bail!("invalid proof"),
779        }
780    }
781
782    async fn try_fetch_chain_config(
783        &self,
784        _retry: usize,
785        commitment: Commitment<ChainConfig>,
786    ) -> anyhow::Result<ChainConfig> {
787        let cf = self.db.get_chain_config(commitment).await?;
788
789        if cf.commit() != commitment {
790            panic!(
791                "Critical error: Mismatched chain config detected. Expected chain config: {:?}, \
792                 but got: {:?}.
793                This may indicate a compromised database",
794                commitment,
795                cf.commit()
796            )
797        }
798
799        Ok(cf)
800    }
801
802    async fn try_fetch_reward_merkle_tree_v2(
803        &self,
804        _retry: usize,
805        height: u64,
806        _view: ViewNumber,
807        reward_merkle_tree_root: RewardMerkleCommitmentV2,
808        accounts: Arc<Vec<RewardAccountV2>>,
809    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
810        let tree: PermittedRewardMerkleTreeV2 = self.db.load_reward_merkle_tree_v2(height).await?;
811
812        ensure!(tree.tree.commitment() == reward_merkle_tree_root);
813        ensure!(!forgotten_accounts_include(&tree, &accounts));
814
815        Ok(tree)
816    }
817
818    #[tracing::instrument(skip(self, _retry, instance))]
819    async fn try_fetch_reward_accounts_v1(
820        &self,
821        _retry: usize,
822        instance: &NodeState,
823        block_height: u64,
824        view: ViewNumber,
825        reward_merkle_tree_root: RewardMerkleCommitmentV1,
826        accounts: &[RewardAccountV1],
827    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
828        // Get the accounts
829        let (reward_merkle_tree_from_db, _) = self
830            .db
831            .get_reward_accounts_v1(instance, block_height, view, accounts)
832            .await
833            .with_context(|| "failed to get reward accounts from DB")?;
834        // Verify the accounts
835        let mut proofs = Vec::new();
836        for account in accounts {
837            let (proof, _) =
838                RewardAccountProofV1::prove(&reward_merkle_tree_from_db, (*account).into())
839                    .context(format!("response missing account {account}"))?;
840            proof.verify(&reward_merkle_tree_root).context(format!(
841                "invalid proof for v1 reward account {account}, root: {reward_merkle_tree_root}"
842            ))?;
843            proofs.push(proof);
844        }
845
846        Ok(proofs)
847    }
848
849    async fn try_fetch_state_cert(
850        &self,
851        _retry: usize,
852        _epoch: u64,
853    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
854        bail!("state cert catchup not supported for SqlStateCatchup");
855    }
856
857    fn backoff(&self) -> &BackoffParams {
858        &self.backoff
859    }
860
861    fn name(&self) -> String {
862        "SqlStateCatchup".into()
863    }
864
865    fn is_local(&self) -> bool {
866        true
867    }
868}
869
870/// Disable catchup entirely.
871#[derive(Clone, Debug)]
872pub struct NullStateCatchup {
873    backoff: BackoffParams,
874    chain_configs: HashMap<Commitment<ChainConfig>, ChainConfig>,
875}
876
877impl Default for NullStateCatchup {
878    fn default() -> Self {
879        Self {
880            backoff: BackoffParams::disabled(),
881            chain_configs: Default::default(),
882        }
883    }
884}
885
886impl NullStateCatchup {
887    /// Add a chain config preimage which can be fetched by hash during STF evaluation.
888    ///
889    /// [`NullStateCatchup`] is used to disable catchup entirely when evaluating the STF, which
890    /// requires the [`ValidatedState`](espresso_types::ValidatedState) to be pre-seeded with all
891    /// the dependencies of STF evaluation. However, the STF also depends on having the preimage of
892    /// various [`ChainConfig`] commitments, which are not stored in the
893    /// [`ValidatedState`](espresso_types::ValidatedState), but which instead must be supplied by a
894    /// separate preimage oracle. Thus, [`NullStateCatchup`] may be populated with a set of
895    /// [`ChainConfig`]s, which it can feed to the STF during evaluation.
896    pub fn add_chain_config(&mut self, cf: ChainConfig) {
897        self.chain_configs.insert(cf.commit(), cf);
898    }
899}
900
901#[async_trait]
902impl StateCatchup for NullStateCatchup {
903    async fn try_fetch_leaf(
904        &self,
905        _retry: usize,
906        _height: u64,
907        _stake_table: HSStakeTable<SeqTypes>,
908        _success_threshold: U256,
909    ) -> anyhow::Result<Leaf2> {
910        bail!("state catchup is disabled")
911    }
912
913    async fn try_fetch_accounts(
914        &self,
915        _retry: usize,
916        _instance: &NodeState,
917        _height: u64,
918        _view: ViewNumber,
919        _fee_merkle_tree_root: FeeMerkleCommitment,
920        _account: &[FeeAccount],
921    ) -> anyhow::Result<Vec<FeeAccountProof>> {
922        bail!("state catchup is disabled");
923    }
924
925    async fn try_remember_blocks_merkle_tree(
926        &self,
927        _retry: usize,
928        _instance: &NodeState,
929        _height: u64,
930        _view: ViewNumber,
931        _mt: &mut BlockMerkleTree,
932    ) -> anyhow::Result<()> {
933        bail!("state catchup is disabled");
934    }
935
936    async fn try_fetch_chain_config(
937        &self,
938        _retry: usize,
939        commitment: Commitment<ChainConfig>,
940    ) -> anyhow::Result<ChainConfig> {
941        self.chain_configs
942            .get(&commitment)
943            .copied()
944            .context(format!("chain config {commitment} not available"))
945    }
946
947    async fn try_fetch_reward_merkle_tree_v2(
948        &self,
949        _retry: usize,
950        _height: u64,
951        _view: ViewNumber,
952        _reward_merkle_tree_root: RewardMerkleCommitmentV2,
953        _accounts: Arc<Vec<RewardAccountV2>>,
954    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
955        bail!("state catchup is disabled");
956    }
957
958    async fn try_fetch_reward_accounts_v1(
959        &self,
960        _retry: usize,
961        _instance: &NodeState,
962        _height: u64,
963        _view: ViewNumber,
964        _fee_merkle_tree_root: RewardMerkleCommitmentV1,
965        _account: &[RewardAccountV1],
966    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
967        bail!("state catchup is disabled");
968    }
969
970    async fn try_fetch_state_cert(
971        &self,
972        _retry: usize,
973        _epoch: u64,
974    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
975        bail!("state catchup is disabled");
976    }
977
978    fn backoff(&self) -> &BackoffParams {
979        &self.backoff
980    }
981
982    fn name(&self) -> String {
983        "NullStateCatchup".into()
984    }
985
986    fn is_local(&self) -> bool {
987        true
988    }
989}
990
991/// A catchup implementation that parallelizes requests to many providers.
992/// It returns the result of the first non-erroring provider to complete.
993#[derive(Clone)]
994pub struct ParallelStateCatchup {
995    providers: Arc<Mutex<Vec<Arc<dyn StateCatchup>>>>,
996    backoff: BackoffParams,
997    /// Timeout for local provider requests
998    local_timeout: Duration,
999}
1000
1001impl ParallelStateCatchup {
1002    /// Create a new [`ParallelStateCatchup`] with the given providers and local timeout.
1003    pub fn new(providers: &[Arc<dyn StateCatchup>], local_timeout: Duration) -> Self {
1004        Self {
1005            providers: Arc::new(Mutex::new(providers.to_vec())),
1006            backoff: BackoffParams::disabled(),
1007            local_timeout,
1008        }
1009    }
1010
1011    /// Add a provider to the list of providers
1012    pub fn add_provider(&self, provider: Arc<dyn StateCatchup>) {
1013        self.providers.lock().push(provider);
1014    }
1015
1016    /// Perform an async operation on all local providers, returning the first result to succeed.
1017    ///
1018    /// A timeout is applied so that a slow local lookup does not prevent the node from
1019    /// falling back to remote providers in time to vote within the current view.
1020    pub async fn on_local_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
1021    where
1022        C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1023        F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1024        RT: Send + Sync + 'static,
1025    {
1026        let local_timeout = self.local_timeout;
1027        match timeout(
1028            local_timeout,
1029            self.on_providers(|provider| provider.is_local(), closure),
1030        )
1031        .await
1032        {
1033            Ok(result) => result,
1034            Err(_) => {
1035                let err = format!("local provider timed out after {local_timeout:?}");
1036                tracing::warn!("{err}");
1037                Err(anyhow::anyhow!(err))
1038            },
1039        }
1040    }
1041
1042    /// Perform an async operation on all remote providers, returning the first result to succeed
1043    pub async fn on_remote_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
1044    where
1045        C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1046        F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1047        RT: Send + Sync + 'static,
1048    {
1049        self.on_providers(|provider| !provider.is_local(), closure)
1050            .await
1051    }
1052
1053    /// Perform an async operation on all providers matching the given predicate, returning the first result to succeed
1054    pub async fn on_providers<P, C, F, RT>(&self, predicate: P, closure: C) -> anyhow::Result<RT>
1055    where
1056        P: Fn(&Arc<dyn StateCatchup>) -> bool + Clone + Send + Sync + 'static,
1057        C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1058        F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1059        RT: Send + Sync + 'static,
1060    {
1061        // Make sure we have at least one provider
1062        let providers = self.providers.lock().clone();
1063        if providers.is_empty() {
1064            return Err(anyhow::anyhow!("no providers were initialized"));
1065        }
1066
1067        // Filter the providers by the predicate
1068        let providers = providers.into_iter().filter(predicate).collect::<Vec<_>>();
1069        if providers.is_empty() {
1070            return Err(anyhow::anyhow!("no providers matched the given predicate"));
1071        }
1072
1073        // Spawn futures for each provider
1074        let mut futures = FuturesUnordered::new();
1075        for provider in providers {
1076            let closure = closure.clone();
1077            futures.push(AbortOnDropHandle::new(tokio::spawn(closure(provider))));
1078        }
1079
1080        let mut logs = vec![format!("No providers returned a successful result.\n")];
1081        // Return the first successful result
1082        while let Some(result) = futures.next().await {
1083            // Unwrap the inner (join) result
1084            let result = match result {
1085                Ok(res) => res,
1086                Err(err) => {
1087                    tracing::debug!("Failed to join on provider: {err:#}.");
1088                    logs.push(format!("Failed to join on provider: {err:#}."));
1089                    continue;
1090                },
1091            };
1092
1093            // If a provider fails, print why
1094            let result = match result {
1095                Ok(res) => res,
1096                Err(err) => {
1097                    tracing::debug!("Failed to fetch data: {err:#}.");
1098                    logs.push(format!("Failed to fetch data: {err:#}."));
1099                    continue;
1100                },
1101            };
1102
1103            return Ok(result);
1104        }
1105
1106        Err(anyhow::anyhow!(logs.join("\n")))
1107    }
1108}
1109
1110macro_rules! clone {
1111    ( ($( $x:ident ),*) $y:expr ) => {
1112        {
1113            $(let $x = $x.clone();)*
1114            $y
1115        }
1116    };
1117}
1118
1119/// A catchup implementation that parallelizes requests to a local and remote provider.
1120/// It returns the result of the first provider to complete.
1121#[async_trait]
1122impl StateCatchup for ParallelStateCatchup {
1123    async fn try_fetch_leaf(
1124        &self,
1125        retry: usize,
1126        height: u64,
1127        stake_table: HSStakeTable<SeqTypes>,
1128        success_threshold: U256,
1129    ) -> anyhow::Result<Leaf2> {
1130        // Try fetching the leaf on the local providers first
1131        let local_result = self
1132            .on_local_providers(clone! {(stake_table) move |provider| {
1133                clone!{(stake_table) async move {
1134                    provider
1135                        .try_fetch_leaf(retry, height, stake_table, success_threshold)
1136                        .await
1137                }}
1138            }})
1139            .await;
1140
1141        // Check if we were successful locally
1142        match &local_result {
1143            Ok(_) => return local_result,
1144            Err(err) => tracing::debug!("{err:#}"),
1145        }
1146
1147        // If that fails, try the remote ones
1148        self.on_remote_providers(clone! {(stake_table) move |provider| {
1149            clone!{(stake_table) async move {
1150                provider
1151                    .try_fetch_leaf(retry, height, stake_table, success_threshold)
1152                    .await
1153            }}
1154        }})
1155        .await
1156    }
1157
1158    async fn try_fetch_accounts(
1159        &self,
1160        retry: usize,
1161        instance: &NodeState,
1162        height: u64,
1163        view: ViewNumber,
1164        fee_merkle_tree_root: FeeMerkleCommitment,
1165        accounts: &[FeeAccount],
1166    ) -> anyhow::Result<Vec<FeeAccountProof>> {
1167        // Try to get the accounts on local providers first
1168        let accounts_vec = accounts.to_vec();
1169        let local_result = self
1170            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1171                clone! {(instance, accounts_vec) async move {
1172                    provider
1173                        .try_fetch_accounts(
1174                            retry,
1175                            &instance,
1176                            height,
1177                            view,
1178                            fee_merkle_tree_root,
1179                            &accounts_vec,
1180                        )
1181                        .await
1182                }}
1183            }})
1184            .await;
1185
1186        // Check if we were successful locally
1187        match &local_result {
1188            Ok(_) => return local_result,
1189            Err(err) => tracing::debug!("{err:#}"),
1190        }
1191
1192        // If that fails, try the remote ones
1193        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1194            clone!{(instance, accounts_vec) async move {
1195                provider
1196                .try_fetch_accounts(
1197                    retry,
1198                    &instance,
1199                    height,
1200                    view,
1201                    fee_merkle_tree_root,
1202                    &accounts_vec,
1203                ).await
1204            }}
1205        }})
1206        .await
1207    }
1208
1209    async fn try_remember_blocks_merkle_tree(
1210        &self,
1211        retry: usize,
1212        instance: &NodeState,
1213        height: u64,
1214        view: ViewNumber,
1215        mt: &mut BlockMerkleTree,
1216    ) -> anyhow::Result<()> {
1217        // Try to remember the blocks merkle tree on local providers first
1218        let local_result = self
1219            .on_local_providers(clone! {(mt, instance) move |provider| {
1220                let mut mt = mt.clone();
1221                clone! {(instance) async move {
1222                    // Perform the call
1223                    provider
1224                        .try_remember_blocks_merkle_tree(
1225                            retry,
1226                            &instance,
1227                            height,
1228                            view,
1229                            &mut mt,
1230                        )
1231                        .await?;
1232
1233                    // Return the merkle tree so we can modify it
1234                    Ok(mt)
1235                }}
1236            }})
1237            .await;
1238
1239        // Check if we were successful locally
1240        if let Ok(modified_mt) = local_result {
1241            // Set the merkle tree to the output of the successful local call
1242            *mt = modified_mt;
1243
1244            return Ok(());
1245        }
1246
1247        // If that fails, try the remote ones
1248        let remote_result = self
1249            .on_remote_providers(clone! {(mt, instance) move |provider| {
1250                let mut mt = mt.clone();
1251                clone!{(instance) async move {
1252                    // Perform the call
1253                    provider
1254                    .try_remember_blocks_merkle_tree(
1255                        retry,
1256                        &instance,
1257                        height,
1258                        view,
1259                        &mut mt,
1260                    )
1261                    .await?;
1262
1263                    // Return the merkle tree
1264                    Ok(mt)
1265                }}
1266            }})
1267            .await?;
1268
1269        // Update the original, local merkle tree
1270        *mt = remote_result;
1271
1272        Ok(())
1273    }
1274
1275    async fn try_fetch_chain_config(
1276        &self,
1277        retry: usize,
1278        commitment: Commitment<ChainConfig>,
1279    ) -> anyhow::Result<ChainConfig> {
1280        // Try fetching the chain config on the local providers first
1281        let local_result = self
1282            .on_local_providers(move |provider| async move {
1283                provider.try_fetch_chain_config(retry, commitment).await
1284            })
1285            .await;
1286
1287        // Check if we were successful locally
1288        match &local_result {
1289            Ok(_) => return local_result,
1290            Err(err) => tracing::debug!("{err:#}"),
1291        }
1292
1293        // If that fails, try the remote ones
1294        self.on_remote_providers(move |provider| async move {
1295            provider.try_fetch_chain_config(retry, commitment).await
1296        })
1297        .await
1298    }
1299
1300    async fn try_fetch_reward_merkle_tree_v2(
1301        &self,
1302        retry: usize,
1303        height: u64,
1304        view: ViewNumber,
1305        reward_merkle_tree_root: RewardMerkleCommitmentV2,
1306        accounts: Arc<Vec<RewardAccountV2>>,
1307    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
1308        let local_result = self
1309            .on_local_providers(clone! {(accounts) move |provider| {
1310                clone! {(accounts) async move {
1311                    provider
1312                        .try_fetch_reward_merkle_tree_v2(
1313                            retry,
1314                            height,
1315                            view,
1316                            reward_merkle_tree_root,
1317                            accounts,
1318                        )
1319                        .await
1320                }}
1321            }})
1322            .await;
1323
1324        // Check if we were successful locally
1325        match &local_result {
1326            Ok(_) => return local_result,
1327            Err(err) => tracing::debug!("{err:#}"),
1328        }
1329
1330        // If that fails, try the remote ones
1331        self.on_remote_providers(clone! {(accounts) move |provider| {
1332            clone!{(accounts) async move {
1333                provider
1334                .try_fetch_reward_merkle_tree_v2(
1335                    retry,
1336                    height,
1337                    view,
1338                    reward_merkle_tree_root,
1339                    accounts
1340                ).await
1341            }}
1342        }})
1343        .await
1344    }
1345
1346    async fn try_fetch_reward_accounts_v1(
1347        &self,
1348        retry: usize,
1349        instance: &NodeState,
1350        height: u64,
1351        view: ViewNumber,
1352        reward_merkle_tree_root: RewardMerkleCommitmentV1,
1353        accounts: &[RewardAccountV1],
1354    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1355        // Try to get the accounts on local providers first
1356        let accounts_vec = accounts.to_vec();
1357        let local_result = self
1358            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1359                clone! {(instance, accounts_vec) async move {
1360                    provider
1361                        .try_fetch_reward_accounts_v1(
1362                            retry,
1363                            &instance,
1364                            height,
1365                            view,
1366                            reward_merkle_tree_root,
1367                            &accounts_vec,
1368                        )
1369                        .await
1370                }}
1371            }})
1372            .await;
1373
1374        // Check if we were successful locally
1375        match &local_result {
1376            Ok(_) => return local_result,
1377            Err(err) => tracing::debug!("{err:#}"),
1378        }
1379
1380        // If that fails, try the remote ones
1381        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1382            clone!{(instance, accounts_vec) async move {
1383                provider
1384                .try_fetch_reward_accounts_v1(
1385                    retry,
1386                    &instance,
1387                    height,
1388                    view,
1389                    reward_merkle_tree_root,
1390                    &accounts_vec,
1391                ).await
1392            }}
1393        }})
1394        .await
1395    }
1396
1397    async fn try_fetch_state_cert(
1398        &self,
1399        retry: usize,
1400        epoch: u64,
1401    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1402        // Try fetching the state cert on the local providers first
1403        let local_result = self
1404            .on_local_providers(move |provider| async move {
1405                provider.try_fetch_state_cert(retry, epoch).await
1406            })
1407            .await;
1408
1409        // Check if we were successful locally
1410        match &local_result {
1411            Ok(_) => return local_result,
1412            Err(err) => tracing::debug!("{err:#}"),
1413        }
1414
1415        // If that fails, try the remote ones
1416        self.on_remote_providers(move |provider| async move {
1417            provider.try_fetch_state_cert(retry, epoch).await
1418        })
1419        .await
1420    }
1421
1422    fn backoff(&self) -> &BackoffParams {
1423        &self.backoff
1424    }
1425
1426    fn name(&self) -> String {
1427        format!(
1428            "[{}]",
1429            self.providers
1430                .lock()
1431                .iter()
1432                .map(|p| p.name())
1433                .collect::<Vec<_>>()
1434                .join(", ")
1435        )
1436    }
1437
1438    async fn fetch_accounts(
1439        &self,
1440        instance: &NodeState,
1441        height: u64,
1442        view: ViewNumber,
1443        fee_merkle_tree_root: FeeMerkleCommitment,
1444        accounts: Vec<FeeAccount>,
1445    ) -> anyhow::Result<Vec<FeeAccountProof>> {
1446        // Try to get the accounts on local providers first
1447        let accounts_vec = accounts.to_vec();
1448        let local_result = self
1449            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1450                clone! {(instance, accounts_vec) async move {
1451                    provider
1452                        .try_fetch_accounts(
1453                            0,
1454                            &instance,
1455                            height,
1456                            view,
1457                            fee_merkle_tree_root,
1458                            &accounts_vec,
1459                        )
1460                        .await
1461                }}
1462            }})
1463            .await;
1464
1465        // Check if we were successful locally
1466        match &local_result {
1467            Ok(_) => return local_result,
1468            Err(err) => tracing::debug!("{err:#}"),
1469        }
1470
1471        // If that fails, try the remote ones (with retry)
1472        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1473            clone!{(instance, accounts_vec) async move {
1474                provider
1475                .fetch_accounts(
1476                    &instance,
1477                    height,
1478                    view,
1479                    fee_merkle_tree_root,
1480                    accounts_vec,
1481                ).await
1482            }}
1483        }})
1484        .await
1485    }
1486
1487    async fn fetch_leaf(
1488        &self,
1489        height: u64,
1490        stake_table: HSStakeTable<SeqTypes>,
1491        success_threshold: U256,
1492    ) -> anyhow::Result<Leaf2> {
1493        // Try fetching the leaf on the local providers first
1494        let local_result = self
1495            .on_local_providers(clone! {(stake_table) move |provider| {
1496                clone!{(stake_table) async move {
1497                    provider
1498                        .try_fetch_leaf(0, height, stake_table, success_threshold)
1499                        .await
1500                }}
1501            }})
1502            .await;
1503
1504        // Check if we were successful locally
1505        match &local_result {
1506            Ok(_) => return local_result,
1507            Err(err) => tracing::debug!("{err:#}"),
1508        }
1509
1510        // If that fails, try the remote ones (with retry)
1511        self.on_remote_providers(clone! {(stake_table) move |provider| {
1512        clone!{(stake_table) async move {
1513            provider
1514                .fetch_leaf(height, stake_table, success_threshold)
1515                .await
1516        }}
1517        }})
1518        .await
1519    }
1520
1521    async fn fetch_chain_config(
1522        &self,
1523        commitment: Commitment<ChainConfig>,
1524    ) -> anyhow::Result<ChainConfig> {
1525        // Try fetching the chain config on the local providers first
1526        let local_result = self
1527            .on_local_providers(move |provider| async move {
1528                provider.try_fetch_chain_config(0, commitment).await
1529            })
1530            .await;
1531
1532        // Check if we were successful locally
1533        match &local_result {
1534            Ok(_) => return local_result,
1535            Err(err) => tracing::debug!("{err:#}"),
1536        }
1537
1538        // If that fails, try the remote ones (with retry)
1539        self.on_remote_providers(move |provider| async move {
1540            provider.fetch_chain_config(commitment).await
1541        })
1542        .await
1543    }
1544
1545    async fn fetch_reward_accounts_v1(
1546        &self,
1547        instance: &NodeState,
1548        height: u64,
1549        view: ViewNumber,
1550        reward_merkle_tree_root: RewardMerkleCommitmentV1,
1551        accounts: Vec<RewardAccountV1>,
1552    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1553        // Try to get the accounts on local providers first
1554        let accounts_vec = accounts.to_vec();
1555        let local_result = self
1556            .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1557                clone! {(instance, accounts_vec) async move {
1558                    provider
1559                        .try_fetch_reward_accounts_v1(
1560                            0,
1561                            &instance,
1562                            height,
1563                            view,
1564                            reward_merkle_tree_root,
1565                            &accounts_vec,
1566                        )
1567                        .await
1568                }}
1569            }})
1570            .await;
1571
1572        // Check if we were successful locally
1573        match &local_result {
1574            Ok(_) => return local_result,
1575            Err(err) => tracing::debug!("{err:#}"),
1576        }
1577
1578        // If that fails, try the remote ones (with retry)
1579        self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1580            clone!{(instance, accounts_vec) async move {
1581                provider
1582                .fetch_reward_accounts_v1(
1583                    &instance,
1584                    height,
1585                    view,
1586                    reward_merkle_tree_root,
1587                    accounts_vec,
1588                ).await
1589            }}
1590        }})
1591        .await
1592    }
1593
1594    async fn fetch_state_cert(
1595        &self,
1596        epoch: u64,
1597    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1598        let local_result = self
1599            .on_local_providers(move |provider| async move {
1600                provider.try_fetch_state_cert(0, epoch).await
1601            })
1602            .await;
1603
1604        // Check if we were successful locally
1605        match &local_result {
1606            Ok(_) => return local_result,
1607            Err(err) => tracing::debug!("{err:#}"),
1608        }
1609
1610        // If that fails, try the remote ones (with retry)
1611        self.on_remote_providers(
1612            move |provider| async move { provider.fetch_state_cert(epoch).await },
1613        )
1614        .await
1615    }
1616
1617    async fn remember_blocks_merkle_tree(
1618        &self,
1619        instance: &NodeState,
1620        height: u64,
1621        view: ViewNumber,
1622        mt: &mut BlockMerkleTree,
1623    ) -> anyhow::Result<()> {
1624        // Try to remember the blocks merkle tree on local providers first
1625        let local_result = self
1626            .on_local_providers(clone! {(mt, instance) move |provider| {
1627                let mut mt = mt.clone();
1628                clone! {(instance) async move {
1629                    // Perform the call
1630                    provider
1631                        .try_remember_blocks_merkle_tree(
1632                            0,
1633                            &instance,
1634                            height,
1635                            view,
1636                            &mut mt,
1637                        )
1638                        .await?;
1639
1640                    // Return the merkle tree so we can modify it
1641                    Ok(mt)
1642                }}
1643            }})
1644            .await;
1645
1646        // Check if we were successful locally
1647        if let Ok(modified_mt) = local_result {
1648            // Set the merkle tree to the one with the
1649            // successful call
1650            *mt = modified_mt;
1651
1652            return Ok(());
1653        }
1654
1655        // If that fails, try the remote ones (with retry)
1656        let remote_result = self
1657            .on_remote_providers(clone! {(mt, instance) move |provider| {
1658                let mut mt = mt.clone();
1659                clone!{(instance) async move {
1660                    // Perform the call
1661                    provider
1662                    .remember_blocks_merkle_tree(
1663                        &instance,
1664                        height,
1665                        view,
1666                        &mut mt,
1667                    )
1668                    .await?;
1669
1670                    // Return the merkle tree
1671                    Ok(mt)
1672                }}
1673            }})
1674            .await?;
1675
1676        // Update the original, local merkle tree
1677        *mt = remote_result;
1678
1679        Ok(())
1680    }
1681
1682    fn is_local(&self) -> bool {
1683        self.providers.lock().iter().all(|p| p.is_local())
1684    }
1685}
1686
1687/// Add accounts to the in-memory consensus state.
1688/// We use this during catchup after receiving verified accounts.
1689#[allow(clippy::type_complexity)]
1690pub async fn add_fee_accounts_to_state<N: ConnectedNetwork<PubKey>, P: SequencerPersistence>(
1691    consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1692    view: &ViewNumber,
1693    accounts: &[FeeAccount],
1694    tree: &FeeMerkleTree,
1695    leaf: Leaf2,
1696) -> anyhow::Result<()> {
1697    // Get the consensus handle
1698    let mut consensus = consensus.write().await;
1699
1700    let (state, delta) = match consensus.validated_state_map().get(view) {
1701        Some(View {
1702            view_inner: ViewInner::Leaf { state, delta, .. },
1703        }) => {
1704            let mut state = (**state).clone();
1705
1706            // Add the fetched accounts to the state.
1707            for account in accounts {
1708                if let Some((proof, _)) = FeeAccountProof::prove(tree, (*account).into()) {
1709                    if let Err(err) = proof.remember(&mut state.fee_merkle_tree) {
1710                        tracing::warn!(
1711                            ?view,
1712                            %account,
1713                            "cannot update fetched account state: {err:#}"
1714                        );
1715                    }
1716                } else {
1717                    tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1718                };
1719            }
1720
1721            (Arc::new(state), delta.clone())
1722        },
1723        _ => {
1724            // If we don't already have a leaf for this view, or if we don't have the view
1725            // at all, we can create a new view based on the recovered leaf and add it to
1726            // our state map. In this case, we must also add the leaf to the saved leaves
1727            // map to ensure consistency.
1728            let mut state = ValidatedState::from_header(leaf.block_header());
1729            state.fee_merkle_tree = tree.clone();
1730            (Arc::new(state), None)
1731        },
1732    };
1733
1734    consensus
1735        .update_leaf(leaf, Arc::clone(&state), delta)
1736        .with_context(|| "failed to update leaf")?;
1737
1738    Ok(())
1739}
1740
1741/// Add accounts to the in-memory consensus state.
1742/// We use this during catchup after receiving verified accounts.
1743#[allow(clippy::type_complexity)]
1744pub async fn add_v2_reward_accounts_to_state<
1745    N: ConnectedNetwork<PubKey>,
1746    P: SequencerPersistence,
1747>(
1748    consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1749    view: &ViewNumber,
1750    accounts: &[RewardAccountV2],
1751    tree: &RewardMerkleTreeV2,
1752    leaf: Leaf2,
1753) -> anyhow::Result<()> {
1754    // Get the consensus handle
1755    let mut consensus = consensus.write().await;
1756
1757    let (state, delta) = match consensus.validated_state_map().get(view) {
1758        Some(View {
1759            view_inner: ViewInner::Leaf { state, delta, .. },
1760        }) => {
1761            let mut state = (**state).clone();
1762
1763            // Add the fetched accounts to the state.
1764            for account in accounts {
1765                if let Some((proof, _)) = RewardAccountProofV2::prove(tree, (*account).into()) {
1766                    if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v2) {
1767                        tracing::warn!(
1768                            ?view,
1769                            %account,
1770                            "cannot update fetched account state: {err:#}"
1771                        );
1772                    }
1773                } else {
1774                    tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1775                };
1776            }
1777
1778            (Arc::new(state), delta.clone())
1779        },
1780        _ => {
1781            // If we don't already have a leaf for this view, or if we don't have the view
1782            // at all, we can create a new view based on the recovered leaf and add it to
1783            // our state map. In this case, we must also add the leaf to the saved leaves
1784            // map to ensure consistency.
1785            let mut state = ValidatedState::from_header(leaf.block_header());
1786            state.reward_merkle_tree_v2 = tree.clone();
1787            (Arc::new(state), None)
1788        },
1789    };
1790
1791    consensus
1792        .update_leaf(leaf, Arc::clone(&state), delta)
1793        .with_context(|| "failed to update leaf")?;
1794
1795    Ok(())
1796}
1797
1798/// Add accounts to the in-memory consensus state.
1799/// We use this during catchup after receiving verified accounts.
1800#[allow(clippy::type_complexity)]
1801pub async fn add_v1_reward_accounts_to_state<
1802    N: ConnectedNetwork<PubKey>,
1803    P: SequencerPersistence,
1804>(
1805    consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1806    view: &ViewNumber,
1807    accounts: &[RewardAccountV1],
1808    tree: &RewardMerkleTreeV1,
1809    leaf: Leaf2,
1810) -> anyhow::Result<()> {
1811    // Get the consensus handle
1812    let mut consensus = consensus.write().await;
1813
1814    let (state, delta) = match consensus.validated_state_map().get(view) {
1815        Some(View {
1816            view_inner: ViewInner::Leaf { state, delta, .. },
1817        }) => {
1818            let mut state = (**state).clone();
1819
1820            // Add the fetched accounts to the state.
1821            for account in accounts {
1822                if let Some((proof, _)) = RewardAccountProofV1::prove(tree, (*account).into()) {
1823                    if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v1) {
1824                        tracing::warn!(
1825                            ?view,
1826                            %account,
1827                            "cannot update fetched account state: {err:#}"
1828                        );
1829                    }
1830                } else {
1831                    tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1832                };
1833            }
1834
1835            (Arc::new(state), delta.clone())
1836        },
1837        _ => {
1838            // If we don't already have a leaf for this view, or if we don't have the view
1839            // at all, we can create a new view based on the recovered leaf and add it to
1840            // our state map. In this case, we must also add the leaf to the saved leaves
1841            // map to ensure consistency.
1842            let mut state = ValidatedState::from_header(leaf.block_header());
1843            state.reward_merkle_tree_v1 = tree.clone();
1844            (Arc::new(state), None)
1845        },
1846    };
1847
1848    consensus
1849        .update_leaf(leaf, Arc::clone(&state), delta)
1850        .with_context(|| "failed to update leaf")?;
1851
1852    Ok(())
1853}
1854
1855#[cfg(test)]
1856mod test {
1857    use super::*;
1858
1859    #[test]
1860    fn test_peer_priority() {
1861        let good_peer = PeerScore {
1862            requests: 1000,
1863            failures: 2,
1864        };
1865        let bad_peer = PeerScore {
1866            requests: 10,
1867            failures: 1,
1868        };
1869        assert!(good_peer > bad_peer);
1870
1871        let mut peers: PriorityQueue<_, _> = [(0, good_peer), (1, bad_peer)].into_iter().collect();
1872        assert_eq!(peers.pop(), Some((0, good_peer)));
1873        assert_eq!(peers.pop(), Some((1, bad_peer)));
1874    }
1875}