1use std::{
2 cmp::Ordering,
3 collections::HashMap,
4 fmt::{Debug, Display},
5 sync::Arc,
6 time::Duration,
7};
8
9use alloy::primitives::U256;
10use anyhow::{Context, anyhow, bail, ensure};
11use async_lock::RwLock;
12use async_trait::async_trait;
13use committable::{Commitment, Committable};
14use espresso_types::{
15 BackoffParams, BlockMerkleTree, Certificate2, FeeAccount, FeeAccountProof, FeeMerkleCommitment,
16 FeeMerkleTree, Leaf2, NodeState, SeqTypes, ValidatedState,
17 config::PublicNetworkConfig,
18 v0::traits::StateCatchup,
19 v0_3::{
20 ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1,
21 RewardMerkleTreeV1,
22 },
23 v0_4::{
24 PermittedRewardMerkleTreeV2, RewardAccountProofV2, RewardAccountV2,
25 RewardMerkleCommitmentV2, RewardMerkleTreeV2, forgotten_accounts_include,
26 },
27};
28use futures::{
29 StreamExt,
30 future::{Future, FutureExt, TryFuture, TryFutureExt},
31 stream::FuturesUnordered,
32};
33use hotshot_new_protocol::utils::verify_leaf_chain_with_cert2;
34use hotshot_types::{
35 ValidatorConfig,
36 data::ViewNumber,
37 message::UpgradeLock,
38 network::NetworkConfig,
39 simple_certificate::LightClientStateUpdateCertificateV2,
40 stake_table::HSStakeTable,
41 traits::{
42 ValidatedState as ValidatedStateTrait,
43 metrics::{Counter, CounterFamily, Metrics},
44 },
45 utils::verify_leaf_chain,
46};
47use itertools::Itertools;
48use jf_merkle_tree_compat::{ForgetableMerkleTreeScheme, MerkleTreeScheme, prelude::MerkleNode};
49use parking_lot::Mutex;
50use priority_queue::PriorityQueue;
51use serde::de::DeserializeOwned;
52use surf_disco::Request;
53use tide_disco::error::ServerError;
54use tokio::time::timeout;
55use tokio_util::task::AbortOnDropHandle;
56use url::Url;
57use vbs::version::StaticVersionType;
58use versions::{EPOCH_VERSION, NEW_PROTOCOL_VERSION};
59
60use crate::{
61 api::{BlocksFrontier, RewardMerkleTreeDataSource, RewardMerkleTreeV2Data},
62 consensus_handle::ConsensusHandle,
63};
64
65#[derive(Debug, Clone)]
68struct Client<ServerError, ApiVer: StaticVersionType> {
69 inner: surf_disco::Client<ServerError, ApiVer>,
70 url: Url,
71 requests: Arc<Box<dyn Counter>>,
72 failures: Arc<Box<dyn Counter>>,
73}
74
75impl<ApiVer: StaticVersionType> Client<ServerError, ApiVer> {
76 pub fn new(
77 url: Url,
78 requests: &(impl CounterFamily + ?Sized),
79 failures: &(impl CounterFamily + ?Sized),
80 ) -> Self {
81 Self {
82 inner: surf_disco::Client::new(url.clone()),
83 requests: Arc::new(requests.create(vec![url.to_string()])),
84 failures: Arc::new(failures.create(vec![url.to_string()])),
85 url,
86 }
87 }
88
89 pub fn get<T: DeserializeOwned>(&self, route: &str) -> Request<T, ServerError, ApiVer> {
90 self.inner.get(route)
91 }
92}
93
94#[derive(Clone, Copy, Debug, Default)]
103struct PeerScore {
104 requests: usize,
105 failures: usize,
106}
107
108impl Ord for PeerScore {
109 fn cmp(&self, other: &Self) -> Ordering {
110 (other.failures * self.requests).cmp(&(self.failures * other.requests))
115 }
116}
117
118impl PartialOrd for PeerScore {
119 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
120 Some(self.cmp(other))
121 }
122}
123
124impl PartialEq for PeerScore {
125 fn eq(&self, other: &Self) -> bool {
126 self.cmp(other).is_eq()
127 }
128}
129
130impl Eq for PeerScore {}
131
132#[derive(Debug, Clone, Default)]
133pub struct StatePeers<ApiVer: StaticVersionType> {
134 scores: Arc<RwLock<PriorityQueue<usize, PeerScore>>>,
136 clients: Vec<Client<ServerError, ApiVer>>,
137 backoff: BackoffParams,
138 base_timeout: Duration,
140}
141
142impl<ApiVer: StaticVersionType> StatePeers<ApiVer> {
143 async fn fetch<Fut>(
144 &self,
145 retry: usize,
146 f: impl Fn(Client<ServerError, ApiVer>) -> Fut,
147 ) -> anyhow::Result<Fut::Ok>
148 where
149 Fut: TryFuture<Error: Display>,
150 {
151 let timeout_dur = self.base_timeout * (retry as u32 + 1);
162
163 let mut requests = HashMap::new();
166 let mut res = Err(anyhow!("failed fetching from every peer"));
167
168 let mut scores = { (*self.scores.read().await).clone() };
173 let mut logs = vec![format!("Fetching failed.\n")];
174 while let Some((id, score)) = scores.pop() {
175 let client = &self.clients[id];
176 tracing::info!("fetching from {}", client.url);
177 match timeout(timeout_dur, TryFutureExt::into_future(f(client.clone()))).await {
178 Ok(Ok(t)) => {
179 requests.insert(id, true);
180 res = Ok(t);
181 logs = Vec::new();
182 break;
183 },
184 Ok(Err(err)) => {
185 tracing::debug!(id, ?score, peer = %client.url, "error from peer: {err:#}");
186 logs.push(format!(
187 "Error from peer {} with id {id} and score {score:?}: {err:#}",
188 client.url
189 ));
190 requests.insert(id, false);
191 },
192 Err(_) => {
193 tracing::debug!(id, ?score, peer = %client.url, ?timeout_dur, "request timed out");
194 logs.push(format!(
195 "Error from peer {} with id {id} and score {score:?}: request timed out",
196 client.url
197 ));
198 requests.insert(id, false);
199 },
200 }
201 }
202
203 if !logs.is_empty() {
204 tracing::warn!("{}", logs.join("\n"));
205 }
206
207 let mut scores = self.scores.write().await;
209 for (id, success) in requests {
210 scores.change_priority_by(&id, |score| {
211 score.requests += 1;
212 self.clients[id].requests.add(1);
213 if !success {
214 score.failures += 1;
215 self.clients[id].failures.add(1);
216 }
217 });
218 }
219
220 res
221 }
222
223 pub fn from_urls(
224 urls: Vec<Url>,
225 backoff: BackoffParams,
226 base_timeout: Duration,
227 metrics: &(impl Metrics + ?Sized),
228 ) -> Self {
229 if urls.is_empty() {
230 panic!("Cannot create StatePeers with no peers");
231 }
232
233 let metrics = metrics.subgroup("catchup".into());
234 let requests = metrics.counter_family("requests".into(), vec!["peer".into()]);
235 let failures = metrics.counter_family("request_failures".into(), vec!["peer".into()]);
236
237 let scores = urls
238 .iter()
239 .enumerate()
240 .map(|(i, _)| (i, PeerScore::default()))
241 .collect();
242 let clients = urls
243 .into_iter()
244 .map(|url| Client::new(url, &*requests, &*failures))
245 .collect();
246
247 Self {
248 clients,
249 scores: Arc::new(RwLock::new(scores)),
250 backoff,
251 base_timeout,
252 }
253 }
254
255 #[tracing::instrument(skip(self, my_own_validator_config))]
256 pub async fn fetch_config(
257 &self,
258 my_own_validator_config: ValidatorConfig<SeqTypes>,
259 ) -> anyhow::Result<NetworkConfig<SeqTypes>> {
260 self.backoff()
261 .retry(self, move |provider, retry| {
262 let my_own_validator_config = my_own_validator_config.clone();
263 async move {
264 let cfg: PublicNetworkConfig = provider
265 .fetch(retry, |client| {
266 let url = client.url.join("config/hotshot").unwrap();
267
268 reqwest::get(url.clone())
269 })
270 .await?
271 .json()
272 .await?;
273 cfg.into_network_config(my_own_validator_config)
274 .context("fetched config, but failed to convert to private config")
275 }
276 .boxed()
277 })
278 .await
279 }
280}
281
282#[async_trait]
283impl<ApiVer: StaticVersionType> StateCatchup for StatePeers<ApiVer> {
284 #[tracing::instrument(skip(self, _instance))]
285 async fn try_fetch_accounts(
286 &self,
287 retry: usize,
288 _instance: &NodeState,
289 height: u64,
290 view: ViewNumber,
291 fee_merkle_tree_root: FeeMerkleCommitment,
292 accounts: &[FeeAccount],
293 ) -> anyhow::Result<Vec<FeeAccountProof>> {
294 self.fetch(retry, |client| async move {
295 let tree = client
296 .inner
297 .post::<FeeMerkleTree>(&format!("catchup/{height}/{}/accounts", view.u64()))
298 .body_binary(&accounts.to_vec())?
299 .send()
300 .await?;
301
302 let mut proofs = Vec::new();
304 for account in accounts {
305 let (proof, _) = FeeAccountProof::prove(&tree, (*account).into())
306 .context(format!("response missing fee account {account}"))?;
307 proof.verify(&fee_merkle_tree_root).context(format!(
308 "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
309 ))?;
310 proofs.push(proof);
311 }
312
313 anyhow::Ok(proofs)
314 })
315 .await
316 }
317
318 #[tracing::instrument(skip(self, _instance, mt))]
319 async fn try_remember_blocks_merkle_tree(
320 &self,
321 retry: usize,
322 _instance: &NodeState,
323 height: u64,
324 view: ViewNumber,
325 mt: &mut BlockMerkleTree,
326 ) -> anyhow::Result<()> {
327 *mt = self
328 .fetch(retry, |client| {
329 let mut mt = mt.clone();
330 async move {
331 let frontier = client
332 .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
333 .send()
334 .await?;
335 let elem = frontier
336 .elem()
337 .context("provided frontier is missing leaf element")?;
338 mt.remember(mt.num_leaves() - 1, *elem, &frontier)
339 .context("verifying block proof")?;
340 anyhow::Ok(mt)
341 }
342 })
343 .await?;
344 Ok(())
345 }
346
347 async fn try_fetch_chain_config(
348 &self,
349 retry: usize,
350 commitment: Commitment<ChainConfig>,
351 ) -> anyhow::Result<ChainConfig> {
352 self.fetch(retry, |client| async move {
353 let cf = client
354 .get::<ChainConfig>(&format!("catchup/chain-config/{commitment}"))
355 .send()
356 .await?;
357 ensure!(
358 cf.commit() == commitment,
359 "received chain config with mismatched commitment: expected {commitment}, got {}",
360 cf.commit()
361 );
362 Ok(cf)
363 })
364 .await
365 }
366
367 async fn try_fetch_leaf(
368 &self,
369 retry: usize,
370 height: u64,
371 stake_table: HSStakeTable<SeqTypes>,
372 success_threshold: U256,
373 ) -> anyhow::Result<Leaf2> {
374 let leaf_chain = self
378 .fetch(retry, |client| async move {
379 let chain = client
380 .get::<Vec<Leaf2>>(&format!("catchup/{height}/leafchain"))
381 .send()
382 .await?;
383 anyhow::Ok(chain)
384 })
385 .await
386 .with_context(|| format!("failed to fetch leaf chain at height {height}"))?;
387
388 let first = leaf_chain
389 .first()
390 .ok_or_else(|| anyhow!("empty leaf chain returned for height {height}"))?;
391
392 if first.block_header().version() >= NEW_PROTOCOL_VERSION {
393 let upgrade_lock =
394 UpgradeLock::<SeqTypes>::new(versions::Upgrade::trivial(NEW_PROTOCOL_VERSION));
395 let cert2 = self
396 .fetch(retry, |client| async move {
397 let cert2 = client
398 .get::<Certificate2<SeqTypes>>(&format!("catchup/{height}/cert2"))
399 .send()
400 .await?;
401 anyhow::Ok(cert2)
402 })
403 .await
404 .with_context(|| format!("failed to fetch cert2 for height {height}"))?;
405
406 verify_leaf_chain_with_cert2(
407 leaf_chain,
408 &stake_table,
409 success_threshold,
410 height,
411 &upgrade_lock,
412 cert2,
413 )
414 .await
415 .with_context(|| format!("failed to verify leaf chain with cert2 at height {height}"))
416 } else {
417 let upgrade_lock =
418 UpgradeLock::<SeqTypes>::new(versions::Upgrade::trivial(EPOCH_VERSION));
419 verify_leaf_chain(
420 leaf_chain,
421 &stake_table,
422 success_threshold,
423 height,
424 &upgrade_lock,
425 )
426 .await
427 .with_context(|| format!("failed to verify leaf chain at height {height}"))
428 }
429 }
430
431 async fn try_fetch_reward_merkle_tree_v2(
432 &self,
433 retry: usize,
434 height: u64,
435 view: ViewNumber,
436 reward_merkle_tree_root: RewardMerkleCommitmentV2,
437 accounts: Arc<Vec<RewardAccountV2>>,
438 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
439 let result = self
440 .fetch(retry, |client| async move {
441 let tree_bytes = match client
445 .inner
446 .get::<Vec<u8>>(&format!("catchup/reward-merkle-tree-v2/{height}/{}", *view))
447 .send()
448 .await
449 {
450 Ok(bytes) => bytes,
451 Err(err) => {
452 tracing::info!(
453 "catchup endpoint failed, falling back to reward-state-v2: {err:#}"
454 );
455 client
456 .inner
457 .get::<Vec<u8>>(&format!(
458 "reward-state-v2/reward-merkle-tree-v2/{height}"
459 ))
460 .send()
461 .await?
462 },
463 };
464
465 Ok::<Vec<u8>, anyhow::Error>(tree_bytes)
466 })
467 .await
468 .context("Fetching from peer failed")?;
469
470 let tree_data = bincode::deserialize::<RewardMerkleTreeV2Data>(&result)
471 .context("Failed to deserialize merkle tree from catchup")?;
472
473 let tree: PermittedRewardMerkleTreeV2 =
474 PermittedRewardMerkleTreeV2::try_from_kv_set(tree_data.balances).await?;
475
476 ensure!(
477 tree.tree.commitment() == reward_merkle_tree_root,
478 "RewardMerkleTreeV2 from peer failed commitment check."
479 );
480 ensure!(!forgotten_accounts_include(&tree, &accounts));
481
482 Ok(tree)
483 }
484
485 #[tracing::instrument(skip(self, _instance))]
486 async fn try_fetch_reward_accounts_v1(
487 &self,
488 retry: usize,
489 _instance: &NodeState,
490 height: u64,
491 view: ViewNumber,
492 reward_merkle_tree_root: RewardMerkleCommitmentV1,
493 accounts: &[RewardAccountV1],
494 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
495 self.fetch(retry, |client| async move {
496 let tree = client
497 .inner
498 .post::<RewardMerkleTreeV1>(&format!(
499 "catchup/{height}/{}/reward-accounts",
500 view.u64()
501 ))
502 .body_binary(&accounts.to_vec())?
503 .send()
504 .await?;
505
506 let mut proofs = Vec::new();
508 for account in accounts {
509 let (proof, _) = RewardAccountProofV1::prove(&tree, (*account).into())
510 .context(format!("response missing reward account {account}"))?;
511 proof.verify(&reward_merkle_tree_root).context(format!(
512 "invalid proof for v1 reward account {account}, root: \
513 {reward_merkle_tree_root} height {height} view {view}"
514 ))?;
515 proofs.push(proof);
516 }
517
518 anyhow::Ok(proofs)
519 })
520 .await
521 }
522
523 async fn try_fetch_state_cert(
524 &self,
525 retry: usize,
526 epoch: u64,
527 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
528 self.fetch(retry, |client| async move {
529 client
530 .get::<LightClientStateUpdateCertificateV2<SeqTypes>>(&format!(
531 "catchup/{epoch}/state-cert"
532 ))
533 .send()
534 .await
535 })
536 .await
537 }
538
539 fn backoff(&self) -> &BackoffParams {
540 &self.backoff
541 }
542
543 fn name(&self) -> String {
544 format!(
545 "StatePeers({})",
546 self.clients
547 .iter()
548 .map(|client| client.url.to_string())
549 .join(",")
550 )
551 }
552
553 fn is_local(&self) -> bool {
554 false
555 }
556}
557
558pub(crate) trait CatchupStorage: Sync {
559 fn get_accounts(
570 &self,
571 _instance: &NodeState,
572 _height: u64,
573 _view: ViewNumber,
574 _accounts: &[FeeAccount],
575 ) -> impl Send + Future<Output = anyhow::Result<(FeeMerkleTree, Leaf2)>> {
576 async {
581 bail!("merklized state catchup is not supported for this data source");
582 }
583 }
584
585 fn get_reward_accounts_v1(
586 &self,
587 _instance: &NodeState,
588 _height: u64,
589 _view: ViewNumber,
590 _accounts: &[RewardAccountV1],
591 ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV1, Leaf2)>> {
592 async {
593 bail!("merklized state catchup is not supported for this data source");
594 }
595 }
596
597 fn get_reward_accounts_v2(
598 &self,
599 _instance: &NodeState,
600 _height: u64,
601 _view: ViewNumber,
602 _accounts: &[RewardAccountV2],
603 ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV2, Leaf2)>> {
604 async {
605 bail!("merklized state catchup is not supported for this data source");
606 }
607 }
608
609 fn get_frontier(
616 &self,
617 _instance: &NodeState,
618 _height: u64,
619 _view: ViewNumber,
620 ) -> impl Send + Future<Output = anyhow::Result<BlocksFrontier>> {
621 async {
626 bail!("merklized state catchup is not supported for this data source");
627 }
628 }
629
630 fn get_chain_config(
631 &self,
632 _commitment: Commitment<ChainConfig>,
633 ) -> impl Send + Future<Output = anyhow::Result<ChainConfig>> {
634 async {
635 bail!("chain config catchup is not supported for this data source");
636 }
637 }
638
639 fn get_leaf_chain(
640 &self,
641 _height: u64,
642 ) -> impl Send + Future<Output = anyhow::Result<Vec<Leaf2>>> {
643 async {
644 bail!("leaf chain catchup is not supported for this data source");
645 }
646 }
647
648 fn load_earliest_cert2(
653 &self,
654 _height: u64,
655 ) -> impl Send + Future<Output = anyhow::Result<Option<Certificate2<SeqTypes>>>> {
656 async { Ok(None) }
657 }
658
659 fn get_leaf(&self, _height: u64) -> impl Send + Future<Output = anyhow::Result<Leaf2>> {
661 async {
662 bail!("leaf fetch is not supported for this data source");
663 }
664 }
665}
666
667impl CatchupStorage for hotshot_query_service::data_source::MetricsDataSource {}
668
669impl<T, S> CatchupStorage for hotshot_query_service::data_source::ExtensibleDataSource<T, S>
670where
671 T: CatchupStorage,
672 S: Sync,
673{
674 async fn get_accounts(
675 &self,
676 instance: &NodeState,
677 height: u64,
678 view: ViewNumber,
679 accounts: &[FeeAccount],
680 ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
681 self.inner()
682 .get_accounts(instance, height, view, accounts)
683 .await
684 }
685
686 async fn get_reward_accounts_v2(
687 &self,
688 instance: &NodeState,
689 height: u64,
690 view: ViewNumber,
691 accounts: &[RewardAccountV2],
692 ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
693 self.inner()
694 .get_reward_accounts_v2(instance, height, view, accounts)
695 .await
696 }
697
698 async fn get_reward_accounts_v1(
699 &self,
700 instance: &NodeState,
701 height: u64,
702 view: ViewNumber,
703 accounts: &[RewardAccountV1],
704 ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
705 self.inner()
706 .get_reward_accounts_v1(instance, height, view, accounts)
707 .await
708 }
709
710 async fn get_frontier(
711 &self,
712 instance: &NodeState,
713 height: u64,
714 view: ViewNumber,
715 ) -> anyhow::Result<BlocksFrontier> {
716 self.inner().get_frontier(instance, height, view).await
717 }
718
719 async fn get_chain_config(
720 &self,
721 commitment: Commitment<ChainConfig>,
722 ) -> anyhow::Result<ChainConfig> {
723 self.inner().get_chain_config(commitment).await
724 }
725 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
726 self.inner().get_leaf_chain(height).await
727 }
728
729 async fn load_earliest_cert2(
730 &self,
731 height: u64,
732 ) -> anyhow::Result<Option<Certificate2<SeqTypes>>> {
733 self.inner().load_earliest_cert2(height).await
734 }
735
736 async fn get_leaf(&self, height: u64) -> anyhow::Result<Leaf2> {
737 self.inner().get_leaf(height).await
738 }
739}
740
741#[derive(Debug)]
742pub(crate) struct SqlStateCatchup<T> {
743 db: Arc<T>,
744 backoff: BackoffParams,
745}
746
747impl<T> SqlStateCatchup<T> {
748 pub(crate) fn new(db: Arc<T>, backoff: BackoffParams) -> Self {
749 Self { db, backoff }
750 }
751}
752
753#[async_trait]
754impl<T> StateCatchup for SqlStateCatchup<T>
755where
756 T: CatchupStorage + RewardMerkleTreeDataSource + Send + Sync,
757{
758 async fn try_fetch_leaf(
759 &self,
760 _retry: usize,
761 height: u64,
762 _stake_table: HSStakeTable<SeqTypes>,
763 _success_threshold: U256,
764 ) -> anyhow::Result<Leaf2> {
765 self.db
768 .get_leaf(height)
769 .await
770 .with_context(|| format!("failed to load leaf at height {height} from DB"))
771 }
772
773 #[tracing::instrument(skip(self, _retry, instance))]
776 async fn try_fetch_accounts(
777 &self,
778 _retry: usize,
779 instance: &NodeState,
780 block_height: u64,
781 view: ViewNumber,
782 fee_merkle_tree_root: FeeMerkleCommitment,
783 accounts: &[FeeAccount],
784 ) -> anyhow::Result<Vec<FeeAccountProof>> {
785 let (fee_merkle_tree_from_db, _) = self
787 .db
788 .get_accounts(instance, block_height, view, accounts)
789 .await
790 .with_context(|| "failed to get fee accounts from DB")?;
791
792 let mut proofs = Vec::new();
794 for account in accounts {
795 let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree_from_db, (*account).into())
796 .context(format!("response missing account {account}"))?;
797 proof.verify(&fee_merkle_tree_root).context(format!(
798 "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
799 ))?;
800 proofs.push(proof);
801 }
802
803 Ok(proofs)
804 }
805
806 #[tracing::instrument(skip(self, _retry, instance, mt))]
807 async fn try_remember_blocks_merkle_tree(
808 &self,
809 _retry: usize,
810 instance: &NodeState,
811 bh: u64,
812 view: ViewNumber,
813 mt: &mut BlockMerkleTree,
814 ) -> anyhow::Result<()> {
815 if bh == 0 {
816 return Ok(());
817 }
818
819 let proof = self.db.get_frontier(instance, bh, view).await?;
820 match proof
821 .proof
822 .first()
823 .context(format!("empty proof for frontier at height {bh}"))?
824 {
825 MerkleNode::Leaf { pos, elem, .. } => mt
826 .remember(pos, elem, proof.clone())
827 .context("failed to remember proof"),
828 _ => bail!("invalid proof"),
829 }
830 }
831
832 async fn try_fetch_chain_config(
833 &self,
834 _retry: usize,
835 commitment: Commitment<ChainConfig>,
836 ) -> anyhow::Result<ChainConfig> {
837 let cf = self.db.get_chain_config(commitment).await?;
838
839 if cf.commit() != commitment {
840 panic!(
841 "Critical error: Mismatched chain config detected. Expected chain config: {:?}, \
842 but got: {:?}.
843 This may indicate a compromised database",
844 commitment,
845 cf.commit()
846 )
847 }
848
849 Ok(cf)
850 }
851
852 async fn try_fetch_reward_merkle_tree_v2(
853 &self,
854 _retry: usize,
855 height: u64,
856 _view: ViewNumber,
857 reward_merkle_tree_root: RewardMerkleCommitmentV2,
858 accounts: Arc<Vec<RewardAccountV2>>,
859 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
860 let tree: PermittedRewardMerkleTreeV2 = self.db.load_reward_merkle_tree_v2(height).await?;
861
862 ensure!(tree.tree.commitment() == reward_merkle_tree_root);
863 ensure!(!forgotten_accounts_include(&tree, &accounts));
864
865 Ok(tree)
866 }
867
868 #[tracing::instrument(skip(self, _retry, instance))]
869 async fn try_fetch_reward_accounts_v1(
870 &self,
871 _retry: usize,
872 instance: &NodeState,
873 block_height: u64,
874 view: ViewNumber,
875 reward_merkle_tree_root: RewardMerkleCommitmentV1,
876 accounts: &[RewardAccountV1],
877 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
878 let (reward_merkle_tree_from_db, _) = self
880 .db
881 .get_reward_accounts_v1(instance, block_height, view, accounts)
882 .await
883 .with_context(|| "failed to get reward accounts from DB")?;
884 let mut proofs = Vec::new();
886 for account in accounts {
887 let (proof, _) =
888 RewardAccountProofV1::prove(&reward_merkle_tree_from_db, (*account).into())
889 .context(format!("response missing account {account}"))?;
890 proof.verify(&reward_merkle_tree_root).context(format!(
891 "invalid proof for v1 reward account {account}, root: {reward_merkle_tree_root}"
892 ))?;
893 proofs.push(proof);
894 }
895
896 Ok(proofs)
897 }
898
899 async fn try_fetch_state_cert(
900 &self,
901 _retry: usize,
902 _epoch: u64,
903 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
904 bail!("state cert catchup not supported for SqlStateCatchup");
905 }
906
907 fn backoff(&self) -> &BackoffParams {
908 &self.backoff
909 }
910
911 fn name(&self) -> String {
912 "SqlStateCatchup".into()
913 }
914
915 fn is_local(&self) -> bool {
916 true
917 }
918}
919
920#[derive(Clone, Debug)]
922pub struct NullStateCatchup {
923 backoff: BackoffParams,
924 chain_configs: HashMap<Commitment<ChainConfig>, ChainConfig>,
925}
926
927impl Default for NullStateCatchup {
928 fn default() -> Self {
929 Self {
930 backoff: BackoffParams::disabled(),
931 chain_configs: Default::default(),
932 }
933 }
934}
935
936impl NullStateCatchup {
937 pub fn add_chain_config(&mut self, cf: ChainConfig) {
947 self.chain_configs.insert(cf.commit(), cf);
948 }
949}
950
951#[async_trait]
952impl StateCatchup for NullStateCatchup {
953 async fn try_fetch_leaf(
954 &self,
955 _retry: usize,
956 _height: u64,
957 _stake_table: HSStakeTable<SeqTypes>,
958 _success_threshold: U256,
959 ) -> anyhow::Result<Leaf2> {
960 bail!("state catchup is disabled")
961 }
962
963 async fn try_fetch_accounts(
964 &self,
965 _retry: usize,
966 _instance: &NodeState,
967 _height: u64,
968 _view: ViewNumber,
969 _fee_merkle_tree_root: FeeMerkleCommitment,
970 _account: &[FeeAccount],
971 ) -> anyhow::Result<Vec<FeeAccountProof>> {
972 bail!("state catchup is disabled");
973 }
974
975 async fn try_remember_blocks_merkle_tree(
976 &self,
977 _retry: usize,
978 _instance: &NodeState,
979 _height: u64,
980 _view: ViewNumber,
981 _mt: &mut BlockMerkleTree,
982 ) -> anyhow::Result<()> {
983 bail!("state catchup is disabled");
984 }
985
986 async fn try_fetch_chain_config(
987 &self,
988 _retry: usize,
989 commitment: Commitment<ChainConfig>,
990 ) -> anyhow::Result<ChainConfig> {
991 self.chain_configs
992 .get(&commitment)
993 .copied()
994 .context(format!("chain config {commitment} not available"))
995 }
996
997 async fn try_fetch_reward_merkle_tree_v2(
998 &self,
999 _retry: usize,
1000 _height: u64,
1001 _view: ViewNumber,
1002 _reward_merkle_tree_root: RewardMerkleCommitmentV2,
1003 _accounts: Arc<Vec<RewardAccountV2>>,
1004 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
1005 bail!("state catchup is disabled");
1006 }
1007
1008 async fn try_fetch_reward_accounts_v1(
1009 &self,
1010 _retry: usize,
1011 _instance: &NodeState,
1012 _height: u64,
1013 _view: ViewNumber,
1014 _fee_merkle_tree_root: RewardMerkleCommitmentV1,
1015 _account: &[RewardAccountV1],
1016 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1017 bail!("state catchup is disabled");
1018 }
1019
1020 async fn try_fetch_state_cert(
1021 &self,
1022 _retry: usize,
1023 _epoch: u64,
1024 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1025 bail!("state catchup is disabled");
1026 }
1027
1028 fn backoff(&self) -> &BackoffParams {
1029 &self.backoff
1030 }
1031
1032 fn name(&self) -> String {
1033 "NullStateCatchup".into()
1034 }
1035
1036 fn is_local(&self) -> bool {
1037 true
1038 }
1039}
1040
1041#[derive(Clone)]
1044pub struct ParallelStateCatchup {
1045 providers: Arc<Mutex<Vec<Arc<dyn StateCatchup>>>>,
1046 backoff: BackoffParams,
1047 local_timeout: Duration,
1049}
1050
1051impl ParallelStateCatchup {
1052 pub fn new(providers: &[Arc<dyn StateCatchup>], local_timeout: Duration) -> Self {
1054 Self {
1055 providers: Arc::new(Mutex::new(providers.to_vec())),
1056 backoff: BackoffParams::disabled(),
1057 local_timeout,
1058 }
1059 }
1060
1061 pub fn add_provider(&self, provider: Arc<dyn StateCatchup>) {
1063 self.providers.lock().push(provider);
1064 }
1065
1066 pub async fn on_local_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
1071 where
1072 C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1073 F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1074 RT: Send + Sync + 'static,
1075 {
1076 let local_timeout = self.local_timeout;
1077 match timeout(
1078 local_timeout,
1079 self.on_providers(|provider| provider.is_local(), closure),
1080 )
1081 .await
1082 {
1083 Ok(result) => result,
1084 Err(_) => {
1085 let err = format!("local provider timed out after {local_timeout:?}");
1086 tracing::warn!("{err}");
1087 Err(anyhow::anyhow!(err))
1088 },
1089 }
1090 }
1091
1092 pub async fn on_remote_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
1094 where
1095 C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1096 F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1097 RT: Send + Sync + 'static,
1098 {
1099 self.on_providers(|provider| !provider.is_local(), closure)
1100 .await
1101 }
1102
1103 pub async fn on_providers<P, C, F, RT>(&self, predicate: P, closure: C) -> anyhow::Result<RT>
1105 where
1106 P: Fn(&Arc<dyn StateCatchup>) -> bool + Clone + Send + Sync + 'static,
1107 C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1108 F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1109 RT: Send + Sync + 'static,
1110 {
1111 let providers = self.providers.lock().clone();
1113 if providers.is_empty() {
1114 return Err(anyhow::anyhow!("no providers were initialized"));
1115 }
1116
1117 let providers = providers.into_iter().filter(predicate).collect::<Vec<_>>();
1119 if providers.is_empty() {
1120 return Err(anyhow::anyhow!("no providers matched the given predicate"));
1121 }
1122
1123 let mut futures = FuturesUnordered::new();
1125 for provider in providers {
1126 let closure = closure.clone();
1127 futures.push(AbortOnDropHandle::new(tokio::spawn(closure(provider))));
1128 }
1129
1130 let mut logs = vec![format!("No providers returned a successful result.\n")];
1131 while let Some(result) = futures.next().await {
1133 let result = match result {
1135 Ok(res) => res,
1136 Err(err) => {
1137 tracing::debug!("Failed to join on provider: {err:#}.");
1138 logs.push(format!("Failed to join on provider: {err:#}."));
1139 continue;
1140 },
1141 };
1142
1143 let result = match result {
1145 Ok(res) => res,
1146 Err(err) => {
1147 tracing::debug!("Failed to fetch data: {err:#}.");
1148 logs.push(format!("Failed to fetch data: {err:#}."));
1149 continue;
1150 },
1151 };
1152
1153 return Ok(result);
1154 }
1155
1156 Err(anyhow::anyhow!(logs.join("\n")))
1157 }
1158}
1159
1160macro_rules! clone {
1161 ( ($( $x:ident ),*) $y:expr ) => {
1162 {
1163 $(let $x = $x.clone();)*
1164 $y
1165 }
1166 };
1167}
1168
1169#[async_trait]
1172impl StateCatchup for ParallelStateCatchup {
1173 async fn try_fetch_leaf(
1174 &self,
1175 retry: usize,
1176 height: u64,
1177 stake_table: HSStakeTable<SeqTypes>,
1178 success_threshold: U256,
1179 ) -> anyhow::Result<Leaf2> {
1180 let local_result = self
1182 .on_local_providers(clone! {(stake_table) move |provider| {
1183 clone!{(stake_table) async move {
1184 provider
1185 .try_fetch_leaf(retry, height, stake_table, success_threshold)
1186 .await
1187 }}
1188 }})
1189 .await;
1190
1191 match &local_result {
1193 Ok(_) => return local_result,
1194 Err(err) => tracing::debug!("{err:#}"),
1195 }
1196
1197 self.on_remote_providers(clone! {(stake_table) move |provider| {
1199 clone!{(stake_table) async move {
1200 provider
1201 .try_fetch_leaf(retry, height, stake_table, success_threshold)
1202 .await
1203 }}
1204 }})
1205 .await
1206 }
1207
1208 async fn try_fetch_accounts(
1209 &self,
1210 retry: usize,
1211 instance: &NodeState,
1212 height: u64,
1213 view: ViewNumber,
1214 fee_merkle_tree_root: FeeMerkleCommitment,
1215 accounts: &[FeeAccount],
1216 ) -> anyhow::Result<Vec<FeeAccountProof>> {
1217 let accounts_vec = accounts.to_vec();
1219 let local_result = self
1220 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1221 clone! {(instance, accounts_vec) async move {
1222 provider
1223 .try_fetch_accounts(
1224 retry,
1225 &instance,
1226 height,
1227 view,
1228 fee_merkle_tree_root,
1229 &accounts_vec,
1230 )
1231 .await
1232 }}
1233 }})
1234 .await;
1235
1236 match &local_result {
1238 Ok(_) => return local_result,
1239 Err(err) => tracing::debug!("{err:#}"),
1240 }
1241
1242 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1244 clone!{(instance, accounts_vec) async move {
1245 provider
1246 .try_fetch_accounts(
1247 retry,
1248 &instance,
1249 height,
1250 view,
1251 fee_merkle_tree_root,
1252 &accounts_vec,
1253 ).await
1254 }}
1255 }})
1256 .await
1257 }
1258
1259 async fn try_remember_blocks_merkle_tree(
1260 &self,
1261 retry: usize,
1262 instance: &NodeState,
1263 height: u64,
1264 view: ViewNumber,
1265 mt: &mut BlockMerkleTree,
1266 ) -> anyhow::Result<()> {
1267 let local_result = self
1269 .on_local_providers(clone! {(mt, instance) move |provider| {
1270 let mut mt = mt.clone();
1271 clone! {(instance) async move {
1272 provider
1274 .try_remember_blocks_merkle_tree(
1275 retry,
1276 &instance,
1277 height,
1278 view,
1279 &mut mt,
1280 )
1281 .await?;
1282
1283 Ok(mt)
1285 }}
1286 }})
1287 .await;
1288
1289 if let Ok(modified_mt) = local_result {
1291 *mt = modified_mt;
1293
1294 return Ok(());
1295 }
1296
1297 let remote_result = self
1299 .on_remote_providers(clone! {(mt, instance) move |provider| {
1300 let mut mt = mt.clone();
1301 clone!{(instance) async move {
1302 provider
1304 .try_remember_blocks_merkle_tree(
1305 retry,
1306 &instance,
1307 height,
1308 view,
1309 &mut mt,
1310 )
1311 .await?;
1312
1313 Ok(mt)
1315 }}
1316 }})
1317 .await?;
1318
1319 *mt = remote_result;
1321
1322 Ok(())
1323 }
1324
1325 async fn try_fetch_chain_config(
1326 &self,
1327 retry: usize,
1328 commitment: Commitment<ChainConfig>,
1329 ) -> anyhow::Result<ChainConfig> {
1330 let local_result = self
1332 .on_local_providers(move |provider| async move {
1333 provider.try_fetch_chain_config(retry, commitment).await
1334 })
1335 .await;
1336
1337 match &local_result {
1339 Ok(_) => return local_result,
1340 Err(err) => tracing::debug!("{err:#}"),
1341 }
1342
1343 self.on_remote_providers(move |provider| async move {
1345 provider.try_fetch_chain_config(retry, commitment).await
1346 })
1347 .await
1348 }
1349
1350 async fn try_fetch_reward_merkle_tree_v2(
1351 &self,
1352 retry: usize,
1353 height: u64,
1354 view: ViewNumber,
1355 reward_merkle_tree_root: RewardMerkleCommitmentV2,
1356 accounts: Arc<Vec<RewardAccountV2>>,
1357 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
1358 let local_result = self
1359 .on_local_providers(clone! {(accounts) move |provider| {
1360 clone! {(accounts) async move {
1361 provider
1362 .try_fetch_reward_merkle_tree_v2(
1363 retry,
1364 height,
1365 view,
1366 reward_merkle_tree_root,
1367 accounts,
1368 )
1369 .await
1370 }}
1371 }})
1372 .await;
1373
1374 match &local_result {
1376 Ok(_) => return local_result,
1377 Err(err) => tracing::debug!("{err:#}"),
1378 }
1379
1380 self.on_remote_providers(clone! {(accounts) move |provider| {
1382 clone!{(accounts) async move {
1383 provider
1384 .try_fetch_reward_merkle_tree_v2(
1385 retry,
1386 height,
1387 view,
1388 reward_merkle_tree_root,
1389 accounts
1390 ).await
1391 }}
1392 }})
1393 .await
1394 }
1395
1396 async fn try_fetch_reward_accounts_v1(
1397 &self,
1398 retry: usize,
1399 instance: &NodeState,
1400 height: u64,
1401 view: ViewNumber,
1402 reward_merkle_tree_root: RewardMerkleCommitmentV1,
1403 accounts: &[RewardAccountV1],
1404 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1405 let accounts_vec = accounts.to_vec();
1407 let local_result = self
1408 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1409 clone! {(instance, accounts_vec) async move {
1410 provider
1411 .try_fetch_reward_accounts_v1(
1412 retry,
1413 &instance,
1414 height,
1415 view,
1416 reward_merkle_tree_root,
1417 &accounts_vec,
1418 )
1419 .await
1420 }}
1421 }})
1422 .await;
1423
1424 match &local_result {
1426 Ok(_) => return local_result,
1427 Err(err) => tracing::debug!("{err:#}"),
1428 }
1429
1430 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1432 clone!{(instance, accounts_vec) async move {
1433 provider
1434 .try_fetch_reward_accounts_v1(
1435 retry,
1436 &instance,
1437 height,
1438 view,
1439 reward_merkle_tree_root,
1440 &accounts_vec,
1441 ).await
1442 }}
1443 }})
1444 .await
1445 }
1446
1447 async fn try_fetch_state_cert(
1448 &self,
1449 retry: usize,
1450 epoch: u64,
1451 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1452 let local_result = self
1454 .on_local_providers(move |provider| async move {
1455 provider.try_fetch_state_cert(retry, epoch).await
1456 })
1457 .await;
1458
1459 match &local_result {
1461 Ok(_) => return local_result,
1462 Err(err) => tracing::debug!("{err:#}"),
1463 }
1464
1465 self.on_remote_providers(move |provider| async move {
1467 provider.try_fetch_state_cert(retry, epoch).await
1468 })
1469 .await
1470 }
1471
1472 fn backoff(&self) -> &BackoffParams {
1473 &self.backoff
1474 }
1475
1476 fn name(&self) -> String {
1477 format!(
1478 "[{}]",
1479 self.providers
1480 .lock()
1481 .iter()
1482 .map(|p| p.name())
1483 .collect::<Vec<_>>()
1484 .join(", ")
1485 )
1486 }
1487
1488 async fn fetch_accounts(
1489 &self,
1490 instance: &NodeState,
1491 height: u64,
1492 view: ViewNumber,
1493 fee_merkle_tree_root: FeeMerkleCommitment,
1494 accounts: Vec<FeeAccount>,
1495 ) -> anyhow::Result<Vec<FeeAccountProof>> {
1496 let accounts_vec = accounts.to_vec();
1498 let local_result = self
1499 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1500 clone! {(instance, accounts_vec) async move {
1501 provider
1502 .try_fetch_accounts(
1503 0,
1504 &instance,
1505 height,
1506 view,
1507 fee_merkle_tree_root,
1508 &accounts_vec,
1509 )
1510 .await
1511 }}
1512 }})
1513 .await;
1514
1515 match &local_result {
1517 Ok(_) => return local_result,
1518 Err(err) => tracing::debug!("{err:#}"),
1519 }
1520
1521 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1523 clone!{(instance, accounts_vec) async move {
1524 provider
1525 .fetch_accounts(
1526 &instance,
1527 height,
1528 view,
1529 fee_merkle_tree_root,
1530 accounts_vec,
1531 ).await
1532 }}
1533 }})
1534 .await
1535 }
1536
1537 async fn fetch_leaf(
1538 &self,
1539 height: u64,
1540 stake_table: HSStakeTable<SeqTypes>,
1541 success_threshold: U256,
1542 ) -> anyhow::Result<Leaf2> {
1543 let local_result = self
1545 .on_local_providers(clone! {(stake_table) move |provider| {
1546 clone!{(stake_table) async move {
1547 provider
1548 .try_fetch_leaf(0, height, stake_table, success_threshold)
1549 .await
1550 }}
1551 }})
1552 .await;
1553
1554 match &local_result {
1556 Ok(_) => return local_result,
1557 Err(err) => tracing::debug!("{err:#}"),
1558 }
1559
1560 self.on_remote_providers(clone! {(stake_table) move |provider| {
1562 clone!{(stake_table) async move {
1563 provider
1564 .fetch_leaf(height, stake_table, success_threshold)
1565 .await
1566 }}
1567 }})
1568 .await
1569 }
1570
1571 async fn fetch_chain_config(
1572 &self,
1573 commitment: Commitment<ChainConfig>,
1574 ) -> anyhow::Result<ChainConfig> {
1575 let local_result = self
1577 .on_local_providers(move |provider| async move {
1578 provider.try_fetch_chain_config(0, commitment).await
1579 })
1580 .await;
1581
1582 match &local_result {
1584 Ok(_) => return local_result,
1585 Err(err) => tracing::debug!("{err:#}"),
1586 }
1587
1588 self.on_remote_providers(move |provider| async move {
1590 provider.fetch_chain_config(commitment).await
1591 })
1592 .await
1593 }
1594
1595 async fn fetch_reward_accounts_v1(
1596 &self,
1597 instance: &NodeState,
1598 height: u64,
1599 view: ViewNumber,
1600 reward_merkle_tree_root: RewardMerkleCommitmentV1,
1601 accounts: Vec<RewardAccountV1>,
1602 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1603 let accounts_vec = accounts.to_vec();
1605 let local_result = self
1606 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1607 clone! {(instance, accounts_vec) async move {
1608 provider
1609 .try_fetch_reward_accounts_v1(
1610 0,
1611 &instance,
1612 height,
1613 view,
1614 reward_merkle_tree_root,
1615 &accounts_vec,
1616 )
1617 .await
1618 }}
1619 }})
1620 .await;
1621
1622 match &local_result {
1624 Ok(_) => return local_result,
1625 Err(err) => tracing::debug!("{err:#}"),
1626 }
1627
1628 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1630 clone!{(instance, accounts_vec) async move {
1631 provider
1632 .fetch_reward_accounts_v1(
1633 &instance,
1634 height,
1635 view,
1636 reward_merkle_tree_root,
1637 accounts_vec,
1638 ).await
1639 }}
1640 }})
1641 .await
1642 }
1643
1644 async fn fetch_state_cert(
1645 &self,
1646 epoch: u64,
1647 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1648 let local_result = self
1649 .on_local_providers(move |provider| async move {
1650 provider.try_fetch_state_cert(0, epoch).await
1651 })
1652 .await;
1653
1654 match &local_result {
1656 Ok(_) => return local_result,
1657 Err(err) => tracing::debug!("{err:#}"),
1658 }
1659
1660 self.on_remote_providers(
1662 move |provider| async move { provider.fetch_state_cert(epoch).await },
1663 )
1664 .await
1665 }
1666
1667 async fn remember_blocks_merkle_tree(
1668 &self,
1669 instance: &NodeState,
1670 height: u64,
1671 view: ViewNumber,
1672 mt: &mut BlockMerkleTree,
1673 ) -> anyhow::Result<()> {
1674 let local_result = self
1676 .on_local_providers(clone! {(mt, instance) move |provider| {
1677 let mut mt = mt.clone();
1678 clone! {(instance) async move {
1679 provider
1681 .try_remember_blocks_merkle_tree(
1682 0,
1683 &instance,
1684 height,
1685 view,
1686 &mut mt,
1687 )
1688 .await?;
1689
1690 Ok(mt)
1692 }}
1693 }})
1694 .await;
1695
1696 if let Ok(modified_mt) = local_result {
1698 *mt = modified_mt;
1701
1702 return Ok(());
1703 }
1704
1705 let remote_result = self
1707 .on_remote_providers(clone! {(mt, instance) move |provider| {
1708 let mut mt = mt.clone();
1709 clone!{(instance) async move {
1710 provider
1712 .remember_blocks_merkle_tree(
1713 &instance,
1714 height,
1715 view,
1716 &mut mt,
1717 )
1718 .await?;
1719
1720 Ok(mt)
1722 }}
1723 }})
1724 .await?;
1725
1726 *mt = remote_result;
1728
1729 Ok(())
1730 }
1731
1732 fn is_local(&self) -> bool {
1733 self.providers.lock().iter().all(|p| p.is_local())
1734 }
1735}
1736
1737pub async fn add_fee_accounts_to_state<I: hotshot::traits::NodeImplementation<SeqTypes>>(
1740 consensus_handle: &ConsensusHandle<SeqTypes, I>,
1741 view: &ViewNumber,
1742 accounts: &[FeeAccount],
1743 tree: &FeeMerkleTree,
1744 leaf: Leaf2,
1745) -> anyhow::Result<()> {
1746 let (existing_state, delta) = consensus_handle.state_and_delta(*view).await;
1747 let (state, delta) = match existing_state {
1748 Some(existing) => {
1749 let mut state = (*existing).clone();
1750 for account in accounts {
1751 if let Some((proof, _)) = FeeAccountProof::prove(tree, (*account).into()) {
1752 if let Err(err) = proof.remember(&mut state.fee_merkle_tree) {
1753 tracing::warn!(
1754 ?view,
1755 %account,
1756 "cannot update fetched account state: {err:#}"
1757 );
1758 }
1759 } else {
1760 tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1761 };
1762 }
1763 (Arc::new(state), delta)
1764 },
1765 None => {
1766 let mut state = ValidatedState::from_header(leaf.block_header());
1771 state.fee_merkle_tree = tree.clone();
1772 (Arc::new(state), None)
1773 },
1774 };
1775
1776 consensus_handle
1777 .update_leaf(leaf, state, delta)
1778 .await
1779 .with_context(|| "failed to update leaf")?;
1780
1781 Ok(())
1782}
1783
1784pub async fn add_v2_reward_accounts_to_state<I: hotshot::traits::NodeImplementation<SeqTypes>>(
1787 consensus_handle: &ConsensusHandle<SeqTypes, I>,
1788 view: &ViewNumber,
1789 accounts: &[RewardAccountV2],
1790 tree: &RewardMerkleTreeV2,
1791 leaf: Leaf2,
1792) -> anyhow::Result<()> {
1793 let (existing_state, delta) = consensus_handle.state_and_delta(*view).await;
1794 let (state, delta) = match existing_state {
1795 Some(existing) => {
1796 let mut state = (*existing).clone();
1797 for account in accounts {
1798 if let Some((proof, _)) = RewardAccountProofV2::prove(tree, (*account).into()) {
1799 if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v2) {
1800 tracing::warn!(
1801 ?view,
1802 %account,
1803 "cannot update fetched account state: {err:#}"
1804 );
1805 }
1806 } else {
1807 tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1808 };
1809 }
1810 (Arc::new(state), delta)
1811 },
1812 None => {
1813 let mut state = ValidatedState::from_header(leaf.block_header());
1818 state.reward_merkle_tree_v2 = tree.clone();
1819 (Arc::new(state), None)
1820 },
1821 };
1822
1823 consensus_handle
1824 .update_leaf(leaf, state, delta)
1825 .await
1826 .with_context(|| "failed to update leaf")?;
1827
1828 Ok(())
1829}
1830
1831pub async fn add_v1_reward_accounts_to_state<I: hotshot::traits::NodeImplementation<SeqTypes>>(
1834 consensus_handle: &ConsensusHandle<SeqTypes, I>,
1835 view: &ViewNumber,
1836 accounts: &[RewardAccountV1],
1837 tree: &RewardMerkleTreeV1,
1838 leaf: Leaf2,
1839) -> anyhow::Result<()> {
1840 let (existing_state, delta) = consensus_handle.state_and_delta(*view).await;
1841 let (state, delta) = match existing_state {
1842 Some(existing) => {
1843 let mut state = (*existing).clone();
1844 for account in accounts {
1845 if let Some((proof, _)) = RewardAccountProofV1::prove(tree, (*account).into()) {
1846 if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v1) {
1847 tracing::warn!(
1848 ?view,
1849 %account,
1850 "cannot update fetched account state: {err:#}"
1851 );
1852 }
1853 } else {
1854 tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1855 };
1856 }
1857 (Arc::new(state), delta)
1858 },
1859 None => {
1860 let mut state = ValidatedState::from_header(leaf.block_header());
1865 state.reward_merkle_tree_v1 = tree.clone();
1866 (Arc::new(state), None)
1867 },
1868 };
1869
1870 consensus_handle
1871 .update_leaf(leaf, state, delta)
1872 .await
1873 .with_context(|| "failed to update leaf")?;
1874
1875 Ok(())
1876}
1877
1878#[cfg(test)]
1879mod test {
1880 use super::*;
1881
1882 #[test]
1883 fn test_peer_priority() {
1884 let good_peer = PeerScore {
1885 requests: 1000,
1886 failures: 2,
1887 };
1888 let bad_peer = PeerScore {
1889 requests: 10,
1890 failures: 1,
1891 };
1892 assert!(good_peer > bad_peer);
1893
1894 let mut peers: PriorityQueue<_, _> = [(0, good_peer), (1, bad_peer)].into_iter().collect();
1895 assert_eq!(peers.pop(), Some((0, good_peer)));
1896 assert_eq!(peers.pop(), Some((1, bad_peer)));
1897 }
1898}