1use std::{
2 cmp::Ordering,
3 collections::HashMap,
4 fmt::{Debug, Display},
5 sync::Arc,
6 time::Duration,
7};
8
9use alloy::primitives::U256;
10use anyhow::{Context, anyhow, bail, ensure};
11use async_lock::RwLock;
12use async_trait::async_trait;
13use committable::{Commitment, Committable};
14use espresso_types::{
15 BackoffParams, BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment,
16 FeeMerkleTree, Leaf2, NodeState, PubKey, SeqTypes, ValidatedState,
17 config::PublicNetworkConfig,
18 traits::SequencerPersistence,
19 v0::traits::StateCatchup,
20 v0_3::{
21 ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1,
22 RewardMerkleTreeV1,
23 },
24 v0_4::{
25 PermittedRewardMerkleTreeV2, RewardAccountProofV2, RewardAccountV2,
26 RewardMerkleCommitmentV2, RewardMerkleTreeV2, forgotten_accounts_include,
27 },
28};
29use futures::{
30 StreamExt,
31 future::{Future, FutureExt, TryFuture, TryFutureExt},
32 stream::FuturesUnordered,
33};
34use hotshot_types::{
35 ValidatorConfig,
36 consensus::Consensus,
37 data::ViewNumber,
38 message::UpgradeLock,
39 network::NetworkConfig,
40 simple_certificate::LightClientStateUpdateCertificateV2,
41 stake_table::HSStakeTable,
42 traits::{
43 ValidatedState as ValidatedStateTrait,
44 metrics::{Counter, CounterFamily, Metrics},
45 network::ConnectedNetwork,
46 },
47 utils::{View, ViewInner, verify_leaf_chain},
48};
49use itertools::Itertools;
50use jf_merkle_tree_compat::{ForgetableMerkleTreeScheme, MerkleTreeScheme, prelude::MerkleNode};
51use parking_lot::Mutex;
52use priority_queue::PriorityQueue;
53use serde::de::DeserializeOwned;
54use surf_disco::Request;
55use tide_disco::error::ServerError;
56use tokio::time::timeout;
57use tokio_util::task::AbortOnDropHandle;
58use url::Url;
59use vbs::version::StaticVersionType;
60use versions::EPOCH_VERSION;
61
62use crate::api::{BlocksFrontier, RewardMerkleTreeDataSource, RewardMerkleTreeV2Data};
63
64#[derive(Debug, Clone)]
67struct Client<ServerError, ApiVer: StaticVersionType> {
68 inner: surf_disco::Client<ServerError, ApiVer>,
69 url: Url,
70 requests: Arc<Box<dyn Counter>>,
71 failures: Arc<Box<dyn Counter>>,
72}
73
74impl<ApiVer: StaticVersionType> Client<ServerError, ApiVer> {
75 pub fn new(
76 url: Url,
77 requests: &(impl CounterFamily + ?Sized),
78 failures: &(impl CounterFamily + ?Sized),
79 ) -> Self {
80 Self {
81 inner: surf_disco::Client::new(url.clone()),
82 requests: Arc::new(requests.create(vec![url.to_string()])),
83 failures: Arc::new(failures.create(vec![url.to_string()])),
84 url,
85 }
86 }
87
88 pub fn get<T: DeserializeOwned>(&self, route: &str) -> Request<T, ServerError, ApiVer> {
89 self.inner.get(route)
90 }
91}
92
93#[derive(Clone, Copy, Debug, Default)]
102struct PeerScore {
103 requests: usize,
104 failures: usize,
105}
106
107impl Ord for PeerScore {
108 fn cmp(&self, other: &Self) -> Ordering {
109 (other.failures * self.requests).cmp(&(self.failures * other.requests))
114 }
115}
116
117impl PartialOrd for PeerScore {
118 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
119 Some(self.cmp(other))
120 }
121}
122
123impl PartialEq for PeerScore {
124 fn eq(&self, other: &Self) -> bool {
125 self.cmp(other).is_eq()
126 }
127}
128
129impl Eq for PeerScore {}
130
131#[derive(Debug, Clone, Default)]
132pub struct StatePeers<ApiVer: StaticVersionType> {
133 scores: Arc<RwLock<PriorityQueue<usize, PeerScore>>>,
135 clients: Vec<Client<ServerError, ApiVer>>,
136 backoff: BackoffParams,
137 base_timeout: Duration,
139}
140
141impl<ApiVer: StaticVersionType> StatePeers<ApiVer> {
142 async fn fetch<Fut>(
143 &self,
144 retry: usize,
145 f: impl Fn(Client<ServerError, ApiVer>) -> Fut,
146 ) -> anyhow::Result<Fut::Ok>
147 where
148 Fut: TryFuture<Error: Display>,
149 {
150 let timeout_dur = self.base_timeout * (retry as u32 + 1);
161
162 let mut requests = HashMap::new();
165 let mut res = Err(anyhow!("failed fetching from every peer"));
166
167 let mut scores = { (*self.scores.read().await).clone() };
172 let mut logs = vec![format!("Fetching failed.\n")];
173 while let Some((id, score)) = scores.pop() {
174 let client = &self.clients[id];
175 tracing::info!("fetching from {}", client.url);
176 match timeout(timeout_dur, TryFutureExt::into_future(f(client.clone()))).await {
177 Ok(Ok(t)) => {
178 requests.insert(id, true);
179 res = Ok(t);
180 logs = Vec::new();
181 break;
182 },
183 Ok(Err(err)) => {
184 tracing::debug!(id, ?score, peer = %client.url, "error from peer: {err:#}");
185 logs.push(format!(
186 "Error from peer {} with id {id} and score {score:?}: {err:#}",
187 client.url
188 ));
189 requests.insert(id, false);
190 },
191 Err(_) => {
192 tracing::debug!(id, ?score, peer = %client.url, ?timeout_dur, "request timed out");
193 logs.push(format!(
194 "Error from peer {} with id {id} and score {score:?}: request timed out",
195 client.url
196 ));
197 requests.insert(id, false);
198 },
199 }
200 }
201
202 if !logs.is_empty() {
203 tracing::warn!("{}", logs.join("\n"));
204 }
205
206 let mut scores = self.scores.write().await;
208 for (id, success) in requests {
209 scores.change_priority_by(&id, |score| {
210 score.requests += 1;
211 self.clients[id].requests.add(1);
212 if !success {
213 score.failures += 1;
214 self.clients[id].failures.add(1);
215 }
216 });
217 }
218
219 res
220 }
221
222 pub fn from_urls(
223 urls: Vec<Url>,
224 backoff: BackoffParams,
225 base_timeout: Duration,
226 metrics: &(impl Metrics + ?Sized),
227 ) -> Self {
228 if urls.is_empty() {
229 panic!("Cannot create StatePeers with no peers");
230 }
231
232 let metrics = metrics.subgroup("catchup".into());
233 let requests = metrics.counter_family("requests".into(), vec!["peer".into()]);
234 let failures = metrics.counter_family("request_failures".into(), vec!["peer".into()]);
235
236 let scores = urls
237 .iter()
238 .enumerate()
239 .map(|(i, _)| (i, PeerScore::default()))
240 .collect();
241 let clients = urls
242 .into_iter()
243 .map(|url| Client::new(url, &*requests, &*failures))
244 .collect();
245
246 Self {
247 clients,
248 scores: Arc::new(RwLock::new(scores)),
249 backoff,
250 base_timeout,
251 }
252 }
253
254 #[tracing::instrument(skip(self, my_own_validator_config))]
255 pub async fn fetch_config(
256 &self,
257 my_own_validator_config: ValidatorConfig<SeqTypes>,
258 ) -> anyhow::Result<NetworkConfig<SeqTypes>> {
259 self.backoff()
260 .retry(self, move |provider, retry| {
261 let my_own_validator_config = my_own_validator_config.clone();
262 async move {
263 let cfg: PublicNetworkConfig = provider
264 .fetch(retry, |client| {
265 let url = client.url.join("config/hotshot").unwrap();
266
267 reqwest::get(url.clone())
268 })
269 .await?
270 .json()
271 .await?;
272 cfg.into_network_config(my_own_validator_config)
273 .context("fetched config, but failed to convert to private config")
274 }
275 .boxed()
276 })
277 .await
278 }
279}
280
281#[async_trait]
282impl<ApiVer: StaticVersionType> StateCatchup for StatePeers<ApiVer> {
283 #[tracing::instrument(skip(self, _instance))]
284 async fn try_fetch_accounts(
285 &self,
286 retry: usize,
287 _instance: &NodeState,
288 height: u64,
289 view: ViewNumber,
290 fee_merkle_tree_root: FeeMerkleCommitment,
291 accounts: &[FeeAccount],
292 ) -> anyhow::Result<Vec<FeeAccountProof>> {
293 self.fetch(retry, |client| async move {
294 let tree = client
295 .inner
296 .post::<FeeMerkleTree>(&format!("catchup/{height}/{}/accounts", view.u64()))
297 .body_binary(&accounts.to_vec())?
298 .send()
299 .await?;
300
301 let mut proofs = Vec::new();
303 for account in accounts {
304 let (proof, _) = FeeAccountProof::prove(&tree, (*account).into())
305 .context(format!("response missing fee account {account}"))?;
306 proof.verify(&fee_merkle_tree_root).context(format!(
307 "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
308 ))?;
309 proofs.push(proof);
310 }
311
312 anyhow::Ok(proofs)
313 })
314 .await
315 }
316
317 #[tracing::instrument(skip(self, _instance, mt))]
318 async fn try_remember_blocks_merkle_tree(
319 &self,
320 retry: usize,
321 _instance: &NodeState,
322 height: u64,
323 view: ViewNumber,
324 mt: &mut BlockMerkleTree,
325 ) -> anyhow::Result<()> {
326 *mt = self
327 .fetch(retry, |client| {
328 let mut mt = mt.clone();
329 async move {
330 let frontier = client
331 .get::<BlocksFrontier>(&format!("catchup/{height}/{}/blocks", view.u64()))
332 .send()
333 .await?;
334 let elem = frontier
335 .elem()
336 .context("provided frontier is missing leaf element")?;
337 mt.remember(mt.num_leaves() - 1, *elem, &frontier)
338 .context("verifying block proof")?;
339 anyhow::Ok(mt)
340 }
341 })
342 .await?;
343 Ok(())
344 }
345
346 async fn try_fetch_chain_config(
347 &self,
348 retry: usize,
349 commitment: Commitment<ChainConfig>,
350 ) -> anyhow::Result<ChainConfig> {
351 self.fetch(retry, |client| async move {
352 let cf = client
353 .get::<ChainConfig>(&format!("catchup/chain-config/{commitment}"))
354 .send()
355 .await?;
356 ensure!(
357 cf.commit() == commitment,
358 "received chain config with mismatched commitment: expected {commitment}, got {}",
359 cf.commit()
360 );
361 Ok(cf)
362 })
363 .await
364 }
365
366 async fn try_fetch_leaf(
367 &self,
368 retry: usize,
369 height: u64,
370 stake_table: HSStakeTable<SeqTypes>,
371 success_threshold: U256,
372 ) -> anyhow::Result<Leaf2> {
373 let leaf_chain = self
375 .fetch(retry, |client| async move {
376 let leaf = client
377 .get::<Vec<Leaf2>>(&format!("catchup/{height}/leafchain"))
378 .send()
379 .await?;
380 anyhow::Ok(leaf)
381 })
382 .await
383 .with_context(|| format!("failed to fetch leaf chain at height {height}"))?;
384
385 verify_leaf_chain(
387 leaf_chain,
388 &stake_table,
389 success_threshold,
390 height,
391 &UpgradeLock::<SeqTypes>::new(versions::Upgrade::trivial(EPOCH_VERSION)),
392 )
393 .await
394 .with_context(|| format!("failed to verify leaf chain at height {height}"))
395 }
396
397 async fn try_fetch_reward_merkle_tree_v2(
398 &self,
399 retry: usize,
400 height: u64,
401 view: ViewNumber,
402 reward_merkle_tree_root: RewardMerkleCommitmentV2,
403 accounts: Arc<Vec<RewardAccountV2>>,
404 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
405 let result = self
406 .fetch(retry, |client| async move {
407 let tree_bytes = match client
411 .inner
412 .get::<Vec<u8>>(&format!("catchup/reward-merkle-tree-v2/{height}/{}", *view))
413 .send()
414 .await
415 {
416 Ok(bytes) => bytes,
417 Err(err) => {
418 tracing::info!(
419 "catchup endpoint failed, falling back to reward-state-v2: {err:#}"
420 );
421 client
422 .inner
423 .get::<Vec<u8>>(&format!(
424 "reward-state-v2/reward-merkle-tree-v2/{height}"
425 ))
426 .send()
427 .await?
428 },
429 };
430
431 Ok::<Vec<u8>, anyhow::Error>(tree_bytes)
432 })
433 .await
434 .context("Fetching from peer failed")?;
435
436 let tree_data = bincode::deserialize::<RewardMerkleTreeV2Data>(&result)
437 .context("Failed to deserialize merkle tree from catchup")?;
438
439 let tree: PermittedRewardMerkleTreeV2 =
440 PermittedRewardMerkleTreeV2::try_from_kv_set(tree_data.balances).await?;
441
442 ensure!(
443 tree.tree.commitment() == reward_merkle_tree_root,
444 "RewardMerkleTreeV2 from peer failed commitment check."
445 );
446 ensure!(!forgotten_accounts_include(&tree, &accounts));
447
448 Ok(tree)
449 }
450
451 #[tracing::instrument(skip(self, _instance))]
452 async fn try_fetch_reward_accounts_v1(
453 &self,
454 retry: usize,
455 _instance: &NodeState,
456 height: u64,
457 view: ViewNumber,
458 reward_merkle_tree_root: RewardMerkleCommitmentV1,
459 accounts: &[RewardAccountV1],
460 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
461 self.fetch(retry, |client| async move {
462 let tree = client
463 .inner
464 .post::<RewardMerkleTreeV1>(&format!(
465 "catchup/{height}/{}/reward-accounts",
466 view.u64()
467 ))
468 .body_binary(&accounts.to_vec())?
469 .send()
470 .await?;
471
472 let mut proofs = Vec::new();
474 for account in accounts {
475 let (proof, _) = RewardAccountProofV1::prove(&tree, (*account).into())
476 .context(format!("response missing reward account {account}"))?;
477 proof.verify(&reward_merkle_tree_root).context(format!(
478 "invalid proof for v1 reward account {account}, root: \
479 {reward_merkle_tree_root} height {height} view {view}"
480 ))?;
481 proofs.push(proof);
482 }
483
484 anyhow::Ok(proofs)
485 })
486 .await
487 }
488
489 async fn try_fetch_state_cert(
490 &self,
491 retry: usize,
492 epoch: u64,
493 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
494 self.fetch(retry, |client| async move {
495 client
496 .get::<LightClientStateUpdateCertificateV2<SeqTypes>>(&format!(
497 "catchup/{epoch}/state-cert"
498 ))
499 .send()
500 .await
501 })
502 .await
503 }
504
505 fn backoff(&self) -> &BackoffParams {
506 &self.backoff
507 }
508
509 fn name(&self) -> String {
510 format!(
511 "StatePeers({})",
512 self.clients
513 .iter()
514 .map(|client| client.url.to_string())
515 .join(",")
516 )
517 }
518
519 fn is_local(&self) -> bool {
520 false
521 }
522}
523
524pub(crate) trait CatchupStorage: Sync {
525 fn get_accounts(
536 &self,
537 _instance: &NodeState,
538 _height: u64,
539 _view: ViewNumber,
540 _accounts: &[FeeAccount],
541 ) -> impl Send + Future<Output = anyhow::Result<(FeeMerkleTree, Leaf2)>> {
542 async {
547 bail!("merklized state catchup is not supported for this data source");
548 }
549 }
550
551 fn get_reward_accounts_v1(
552 &self,
553 _instance: &NodeState,
554 _height: u64,
555 _view: ViewNumber,
556 _accounts: &[RewardAccountV1],
557 ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV1, Leaf2)>> {
558 async {
559 bail!("merklized state catchup is not supported for this data source");
560 }
561 }
562
563 fn get_reward_accounts_v2(
564 &self,
565 _instance: &NodeState,
566 _height: u64,
567 _view: ViewNumber,
568 _accounts: &[RewardAccountV2],
569 ) -> impl Send + Future<Output = anyhow::Result<(RewardMerkleTreeV2, Leaf2)>> {
570 async {
571 bail!("merklized state catchup is not supported for this data source");
572 }
573 }
574
575 fn get_frontier(
582 &self,
583 _instance: &NodeState,
584 _height: u64,
585 _view: ViewNumber,
586 ) -> impl Send + Future<Output = anyhow::Result<BlocksFrontier>> {
587 async {
592 bail!("merklized state catchup is not supported for this data source");
593 }
594 }
595
596 fn get_chain_config(
597 &self,
598 _commitment: Commitment<ChainConfig>,
599 ) -> impl Send + Future<Output = anyhow::Result<ChainConfig>> {
600 async {
601 bail!("chain config catchup is not supported for this data source");
602 }
603 }
604
605 fn get_leaf_chain(
606 &self,
607 _height: u64,
608 ) -> impl Send + Future<Output = anyhow::Result<Vec<Leaf2>>> {
609 async {
610 bail!("leaf chain catchup is not supported for this data source");
611 }
612 }
613}
614
615impl CatchupStorage for hotshot_query_service::data_source::MetricsDataSource {}
616
617impl<T, S> CatchupStorage for hotshot_query_service::data_source::ExtensibleDataSource<T, S>
618where
619 T: CatchupStorage,
620 S: Sync,
621{
622 async fn get_accounts(
623 &self,
624 instance: &NodeState,
625 height: u64,
626 view: ViewNumber,
627 accounts: &[FeeAccount],
628 ) -> anyhow::Result<(FeeMerkleTree, Leaf2)> {
629 self.inner()
630 .get_accounts(instance, height, view, accounts)
631 .await
632 }
633
634 async fn get_reward_accounts_v2(
635 &self,
636 instance: &NodeState,
637 height: u64,
638 view: ViewNumber,
639 accounts: &[RewardAccountV2],
640 ) -> anyhow::Result<(RewardMerkleTreeV2, Leaf2)> {
641 self.inner()
642 .get_reward_accounts_v2(instance, height, view, accounts)
643 .await
644 }
645
646 async fn get_reward_accounts_v1(
647 &self,
648 instance: &NodeState,
649 height: u64,
650 view: ViewNumber,
651 accounts: &[RewardAccountV1],
652 ) -> anyhow::Result<(RewardMerkleTreeV1, Leaf2)> {
653 self.inner()
654 .get_reward_accounts_v1(instance, height, view, accounts)
655 .await
656 }
657
658 async fn get_frontier(
659 &self,
660 instance: &NodeState,
661 height: u64,
662 view: ViewNumber,
663 ) -> anyhow::Result<BlocksFrontier> {
664 self.inner().get_frontier(instance, height, view).await
665 }
666
667 async fn get_chain_config(
668 &self,
669 commitment: Commitment<ChainConfig>,
670 ) -> anyhow::Result<ChainConfig> {
671 self.inner().get_chain_config(commitment).await
672 }
673 async fn get_leaf_chain(&self, height: u64) -> anyhow::Result<Vec<Leaf2>> {
674 self.inner().get_leaf_chain(height).await
675 }
676}
677
678#[derive(Debug)]
679pub(crate) struct SqlStateCatchup<T> {
680 db: Arc<T>,
681 backoff: BackoffParams,
682}
683
684impl<T> SqlStateCatchup<T> {
685 pub(crate) fn new(db: Arc<T>, backoff: BackoffParams) -> Self {
686 Self { db, backoff }
687 }
688}
689
690#[async_trait]
691impl<T> StateCatchup for SqlStateCatchup<T>
692where
693 T: CatchupStorage + RewardMerkleTreeDataSource + Send + Sync,
694{
695 async fn try_fetch_leaf(
696 &self,
697 _retry: usize,
698 height: u64,
699 stake_table: HSStakeTable<SeqTypes>,
700 success_threshold: U256,
701 ) -> anyhow::Result<Leaf2> {
702 let leaf_chain = self
704 .db
705 .get_leaf_chain(height)
706 .await
707 .with_context(|| "failed to get leaf chain from DB")?;
708
709 let leaf = verify_leaf_chain(
711 leaf_chain,
712 &stake_table,
713 success_threshold,
714 height,
715 &UpgradeLock::<SeqTypes>::new(versions::Upgrade::trivial(EPOCH_VERSION)),
716 )
717 .await
718 .with_context(|| "failed to verify leaf chain")?;
719
720 Ok(leaf)
721 }
722
723 #[tracing::instrument(skip(self, _retry, instance))]
726 async fn try_fetch_accounts(
727 &self,
728 _retry: usize,
729 instance: &NodeState,
730 block_height: u64,
731 view: ViewNumber,
732 fee_merkle_tree_root: FeeMerkleCommitment,
733 accounts: &[FeeAccount],
734 ) -> anyhow::Result<Vec<FeeAccountProof>> {
735 let (fee_merkle_tree_from_db, _) = self
737 .db
738 .get_accounts(instance, block_height, view, accounts)
739 .await
740 .with_context(|| "failed to get fee accounts from DB")?;
741
742 let mut proofs = Vec::new();
744 for account in accounts {
745 let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree_from_db, (*account).into())
746 .context(format!("response missing account {account}"))?;
747 proof.verify(&fee_merkle_tree_root).context(format!(
748 "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
749 ))?;
750 proofs.push(proof);
751 }
752
753 Ok(proofs)
754 }
755
756 #[tracing::instrument(skip(self, _retry, instance, mt))]
757 async fn try_remember_blocks_merkle_tree(
758 &self,
759 _retry: usize,
760 instance: &NodeState,
761 bh: u64,
762 view: ViewNumber,
763 mt: &mut BlockMerkleTree,
764 ) -> anyhow::Result<()> {
765 if bh == 0 {
766 return Ok(());
767 }
768
769 let proof = self.db.get_frontier(instance, bh, view).await?;
770 match proof
771 .proof
772 .first()
773 .context(format!("empty proof for frontier at height {bh}"))?
774 {
775 MerkleNode::Leaf { pos, elem, .. } => mt
776 .remember(pos, elem, proof.clone())
777 .context("failed to remember proof"),
778 _ => bail!("invalid proof"),
779 }
780 }
781
782 async fn try_fetch_chain_config(
783 &self,
784 _retry: usize,
785 commitment: Commitment<ChainConfig>,
786 ) -> anyhow::Result<ChainConfig> {
787 let cf = self.db.get_chain_config(commitment).await?;
788
789 if cf.commit() != commitment {
790 panic!(
791 "Critical error: Mismatched chain config detected. Expected chain config: {:?}, \
792 but got: {:?}.
793 This may indicate a compromised database",
794 commitment,
795 cf.commit()
796 )
797 }
798
799 Ok(cf)
800 }
801
802 async fn try_fetch_reward_merkle_tree_v2(
803 &self,
804 _retry: usize,
805 height: u64,
806 _view: ViewNumber,
807 reward_merkle_tree_root: RewardMerkleCommitmentV2,
808 accounts: Arc<Vec<RewardAccountV2>>,
809 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
810 let tree: PermittedRewardMerkleTreeV2 = self.db.load_reward_merkle_tree_v2(height).await?;
811
812 ensure!(tree.tree.commitment() == reward_merkle_tree_root);
813 ensure!(!forgotten_accounts_include(&tree, &accounts));
814
815 Ok(tree)
816 }
817
818 #[tracing::instrument(skip(self, _retry, instance))]
819 async fn try_fetch_reward_accounts_v1(
820 &self,
821 _retry: usize,
822 instance: &NodeState,
823 block_height: u64,
824 view: ViewNumber,
825 reward_merkle_tree_root: RewardMerkleCommitmentV1,
826 accounts: &[RewardAccountV1],
827 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
828 let (reward_merkle_tree_from_db, _) = self
830 .db
831 .get_reward_accounts_v1(instance, block_height, view, accounts)
832 .await
833 .with_context(|| "failed to get reward accounts from DB")?;
834 let mut proofs = Vec::new();
836 for account in accounts {
837 let (proof, _) =
838 RewardAccountProofV1::prove(&reward_merkle_tree_from_db, (*account).into())
839 .context(format!("response missing account {account}"))?;
840 proof.verify(&reward_merkle_tree_root).context(format!(
841 "invalid proof for v1 reward account {account}, root: {reward_merkle_tree_root}"
842 ))?;
843 proofs.push(proof);
844 }
845
846 Ok(proofs)
847 }
848
849 async fn try_fetch_state_cert(
850 &self,
851 _retry: usize,
852 _epoch: u64,
853 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
854 bail!("state cert catchup not supported for SqlStateCatchup");
855 }
856
857 fn backoff(&self) -> &BackoffParams {
858 &self.backoff
859 }
860
861 fn name(&self) -> String {
862 "SqlStateCatchup".into()
863 }
864
865 fn is_local(&self) -> bool {
866 true
867 }
868}
869
870#[derive(Clone, Debug)]
872pub struct NullStateCatchup {
873 backoff: BackoffParams,
874 chain_configs: HashMap<Commitment<ChainConfig>, ChainConfig>,
875}
876
877impl Default for NullStateCatchup {
878 fn default() -> Self {
879 Self {
880 backoff: BackoffParams::disabled(),
881 chain_configs: Default::default(),
882 }
883 }
884}
885
886impl NullStateCatchup {
887 pub fn add_chain_config(&mut self, cf: ChainConfig) {
897 self.chain_configs.insert(cf.commit(), cf);
898 }
899}
900
901#[async_trait]
902impl StateCatchup for NullStateCatchup {
903 async fn try_fetch_leaf(
904 &self,
905 _retry: usize,
906 _height: u64,
907 _stake_table: HSStakeTable<SeqTypes>,
908 _success_threshold: U256,
909 ) -> anyhow::Result<Leaf2> {
910 bail!("state catchup is disabled")
911 }
912
913 async fn try_fetch_accounts(
914 &self,
915 _retry: usize,
916 _instance: &NodeState,
917 _height: u64,
918 _view: ViewNumber,
919 _fee_merkle_tree_root: FeeMerkleCommitment,
920 _account: &[FeeAccount],
921 ) -> anyhow::Result<Vec<FeeAccountProof>> {
922 bail!("state catchup is disabled");
923 }
924
925 async fn try_remember_blocks_merkle_tree(
926 &self,
927 _retry: usize,
928 _instance: &NodeState,
929 _height: u64,
930 _view: ViewNumber,
931 _mt: &mut BlockMerkleTree,
932 ) -> anyhow::Result<()> {
933 bail!("state catchup is disabled");
934 }
935
936 async fn try_fetch_chain_config(
937 &self,
938 _retry: usize,
939 commitment: Commitment<ChainConfig>,
940 ) -> anyhow::Result<ChainConfig> {
941 self.chain_configs
942 .get(&commitment)
943 .copied()
944 .context(format!("chain config {commitment} not available"))
945 }
946
947 async fn try_fetch_reward_merkle_tree_v2(
948 &self,
949 _retry: usize,
950 _height: u64,
951 _view: ViewNumber,
952 _reward_merkle_tree_root: RewardMerkleCommitmentV2,
953 _accounts: Arc<Vec<RewardAccountV2>>,
954 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
955 bail!("state catchup is disabled");
956 }
957
958 async fn try_fetch_reward_accounts_v1(
959 &self,
960 _retry: usize,
961 _instance: &NodeState,
962 _height: u64,
963 _view: ViewNumber,
964 _fee_merkle_tree_root: RewardMerkleCommitmentV1,
965 _account: &[RewardAccountV1],
966 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
967 bail!("state catchup is disabled");
968 }
969
970 async fn try_fetch_state_cert(
971 &self,
972 _retry: usize,
973 _epoch: u64,
974 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
975 bail!("state catchup is disabled");
976 }
977
978 fn backoff(&self) -> &BackoffParams {
979 &self.backoff
980 }
981
982 fn name(&self) -> String {
983 "NullStateCatchup".into()
984 }
985
986 fn is_local(&self) -> bool {
987 true
988 }
989}
990
991#[derive(Clone)]
994pub struct ParallelStateCatchup {
995 providers: Arc<Mutex<Vec<Arc<dyn StateCatchup>>>>,
996 backoff: BackoffParams,
997 local_timeout: Duration,
999}
1000
1001impl ParallelStateCatchup {
1002 pub fn new(providers: &[Arc<dyn StateCatchup>], local_timeout: Duration) -> Self {
1004 Self {
1005 providers: Arc::new(Mutex::new(providers.to_vec())),
1006 backoff: BackoffParams::disabled(),
1007 local_timeout,
1008 }
1009 }
1010
1011 pub fn add_provider(&self, provider: Arc<dyn StateCatchup>) {
1013 self.providers.lock().push(provider);
1014 }
1015
1016 pub async fn on_local_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
1021 where
1022 C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1023 F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1024 RT: Send + Sync + 'static,
1025 {
1026 let local_timeout = self.local_timeout;
1027 match timeout(
1028 local_timeout,
1029 self.on_providers(|provider| provider.is_local(), closure),
1030 )
1031 .await
1032 {
1033 Ok(result) => result,
1034 Err(_) => {
1035 let err = format!("local provider timed out after {local_timeout:?}");
1036 tracing::warn!("{err}");
1037 Err(anyhow::anyhow!(err))
1038 },
1039 }
1040 }
1041
1042 pub async fn on_remote_providers<C, F, RT>(&self, closure: C) -> anyhow::Result<RT>
1044 where
1045 C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1046 F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1047 RT: Send + Sync + 'static,
1048 {
1049 self.on_providers(|provider| !provider.is_local(), closure)
1050 .await
1051 }
1052
1053 pub async fn on_providers<P, C, F, RT>(&self, predicate: P, closure: C) -> anyhow::Result<RT>
1055 where
1056 P: Fn(&Arc<dyn StateCatchup>) -> bool + Clone + Send + Sync + 'static,
1057 C: Fn(Arc<dyn StateCatchup>) -> F + Clone + Send + Sync + 'static,
1058 F: Future<Output = anyhow::Result<RT>> + Send + 'static,
1059 RT: Send + Sync + 'static,
1060 {
1061 let providers = self.providers.lock().clone();
1063 if providers.is_empty() {
1064 return Err(anyhow::anyhow!("no providers were initialized"));
1065 }
1066
1067 let providers = providers.into_iter().filter(predicate).collect::<Vec<_>>();
1069 if providers.is_empty() {
1070 return Err(anyhow::anyhow!("no providers matched the given predicate"));
1071 }
1072
1073 let mut futures = FuturesUnordered::new();
1075 for provider in providers {
1076 let closure = closure.clone();
1077 futures.push(AbortOnDropHandle::new(tokio::spawn(closure(provider))));
1078 }
1079
1080 let mut logs = vec![format!("No providers returned a successful result.\n")];
1081 while let Some(result) = futures.next().await {
1083 let result = match result {
1085 Ok(res) => res,
1086 Err(err) => {
1087 tracing::debug!("Failed to join on provider: {err:#}.");
1088 logs.push(format!("Failed to join on provider: {err:#}."));
1089 continue;
1090 },
1091 };
1092
1093 let result = match result {
1095 Ok(res) => res,
1096 Err(err) => {
1097 tracing::debug!("Failed to fetch data: {err:#}.");
1098 logs.push(format!("Failed to fetch data: {err:#}."));
1099 continue;
1100 },
1101 };
1102
1103 return Ok(result);
1104 }
1105
1106 Err(anyhow::anyhow!(logs.join("\n")))
1107 }
1108}
1109
1110macro_rules! clone {
1111 ( ($( $x:ident ),*) $y:expr ) => {
1112 {
1113 $(let $x = $x.clone();)*
1114 $y
1115 }
1116 };
1117}
1118
1119#[async_trait]
1122impl StateCatchup for ParallelStateCatchup {
1123 async fn try_fetch_leaf(
1124 &self,
1125 retry: usize,
1126 height: u64,
1127 stake_table: HSStakeTable<SeqTypes>,
1128 success_threshold: U256,
1129 ) -> anyhow::Result<Leaf2> {
1130 let local_result = self
1132 .on_local_providers(clone! {(stake_table) move |provider| {
1133 clone!{(stake_table) async move {
1134 provider
1135 .try_fetch_leaf(retry, height, stake_table, success_threshold)
1136 .await
1137 }}
1138 }})
1139 .await;
1140
1141 match &local_result {
1143 Ok(_) => return local_result,
1144 Err(err) => tracing::debug!("{err:#}"),
1145 }
1146
1147 self.on_remote_providers(clone! {(stake_table) move |provider| {
1149 clone!{(stake_table) async move {
1150 provider
1151 .try_fetch_leaf(retry, height, stake_table, success_threshold)
1152 .await
1153 }}
1154 }})
1155 .await
1156 }
1157
1158 async fn try_fetch_accounts(
1159 &self,
1160 retry: usize,
1161 instance: &NodeState,
1162 height: u64,
1163 view: ViewNumber,
1164 fee_merkle_tree_root: FeeMerkleCommitment,
1165 accounts: &[FeeAccount],
1166 ) -> anyhow::Result<Vec<FeeAccountProof>> {
1167 let accounts_vec = accounts.to_vec();
1169 let local_result = self
1170 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1171 clone! {(instance, accounts_vec) async move {
1172 provider
1173 .try_fetch_accounts(
1174 retry,
1175 &instance,
1176 height,
1177 view,
1178 fee_merkle_tree_root,
1179 &accounts_vec,
1180 )
1181 .await
1182 }}
1183 }})
1184 .await;
1185
1186 match &local_result {
1188 Ok(_) => return local_result,
1189 Err(err) => tracing::debug!("{err:#}"),
1190 }
1191
1192 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1194 clone!{(instance, accounts_vec) async move {
1195 provider
1196 .try_fetch_accounts(
1197 retry,
1198 &instance,
1199 height,
1200 view,
1201 fee_merkle_tree_root,
1202 &accounts_vec,
1203 ).await
1204 }}
1205 }})
1206 .await
1207 }
1208
1209 async fn try_remember_blocks_merkle_tree(
1210 &self,
1211 retry: usize,
1212 instance: &NodeState,
1213 height: u64,
1214 view: ViewNumber,
1215 mt: &mut BlockMerkleTree,
1216 ) -> anyhow::Result<()> {
1217 let local_result = self
1219 .on_local_providers(clone! {(mt, instance) move |provider| {
1220 let mut mt = mt.clone();
1221 clone! {(instance) async move {
1222 provider
1224 .try_remember_blocks_merkle_tree(
1225 retry,
1226 &instance,
1227 height,
1228 view,
1229 &mut mt,
1230 )
1231 .await?;
1232
1233 Ok(mt)
1235 }}
1236 }})
1237 .await;
1238
1239 if let Ok(modified_mt) = local_result {
1241 *mt = modified_mt;
1243
1244 return Ok(());
1245 }
1246
1247 let remote_result = self
1249 .on_remote_providers(clone! {(mt, instance) move |provider| {
1250 let mut mt = mt.clone();
1251 clone!{(instance) async move {
1252 provider
1254 .try_remember_blocks_merkle_tree(
1255 retry,
1256 &instance,
1257 height,
1258 view,
1259 &mut mt,
1260 )
1261 .await?;
1262
1263 Ok(mt)
1265 }}
1266 }})
1267 .await?;
1268
1269 *mt = remote_result;
1271
1272 Ok(())
1273 }
1274
1275 async fn try_fetch_chain_config(
1276 &self,
1277 retry: usize,
1278 commitment: Commitment<ChainConfig>,
1279 ) -> anyhow::Result<ChainConfig> {
1280 let local_result = self
1282 .on_local_providers(move |provider| async move {
1283 provider.try_fetch_chain_config(retry, commitment).await
1284 })
1285 .await;
1286
1287 match &local_result {
1289 Ok(_) => return local_result,
1290 Err(err) => tracing::debug!("{err:#}"),
1291 }
1292
1293 self.on_remote_providers(move |provider| async move {
1295 provider.try_fetch_chain_config(retry, commitment).await
1296 })
1297 .await
1298 }
1299
1300 async fn try_fetch_reward_merkle_tree_v2(
1301 &self,
1302 retry: usize,
1303 height: u64,
1304 view: ViewNumber,
1305 reward_merkle_tree_root: RewardMerkleCommitmentV2,
1306 accounts: Arc<Vec<RewardAccountV2>>,
1307 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
1308 let local_result = self
1309 .on_local_providers(clone! {(accounts) move |provider| {
1310 clone! {(accounts) async move {
1311 provider
1312 .try_fetch_reward_merkle_tree_v2(
1313 retry,
1314 height,
1315 view,
1316 reward_merkle_tree_root,
1317 accounts,
1318 )
1319 .await
1320 }}
1321 }})
1322 .await;
1323
1324 match &local_result {
1326 Ok(_) => return local_result,
1327 Err(err) => tracing::debug!("{err:#}"),
1328 }
1329
1330 self.on_remote_providers(clone! {(accounts) move |provider| {
1332 clone!{(accounts) async move {
1333 provider
1334 .try_fetch_reward_merkle_tree_v2(
1335 retry,
1336 height,
1337 view,
1338 reward_merkle_tree_root,
1339 accounts
1340 ).await
1341 }}
1342 }})
1343 .await
1344 }
1345
1346 async fn try_fetch_reward_accounts_v1(
1347 &self,
1348 retry: usize,
1349 instance: &NodeState,
1350 height: u64,
1351 view: ViewNumber,
1352 reward_merkle_tree_root: RewardMerkleCommitmentV1,
1353 accounts: &[RewardAccountV1],
1354 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1355 let accounts_vec = accounts.to_vec();
1357 let local_result = self
1358 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1359 clone! {(instance, accounts_vec) async move {
1360 provider
1361 .try_fetch_reward_accounts_v1(
1362 retry,
1363 &instance,
1364 height,
1365 view,
1366 reward_merkle_tree_root,
1367 &accounts_vec,
1368 )
1369 .await
1370 }}
1371 }})
1372 .await;
1373
1374 match &local_result {
1376 Ok(_) => return local_result,
1377 Err(err) => tracing::debug!("{err:#}"),
1378 }
1379
1380 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1382 clone!{(instance, accounts_vec) async move {
1383 provider
1384 .try_fetch_reward_accounts_v1(
1385 retry,
1386 &instance,
1387 height,
1388 view,
1389 reward_merkle_tree_root,
1390 &accounts_vec,
1391 ).await
1392 }}
1393 }})
1394 .await
1395 }
1396
1397 async fn try_fetch_state_cert(
1398 &self,
1399 retry: usize,
1400 epoch: u64,
1401 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1402 let local_result = self
1404 .on_local_providers(move |provider| async move {
1405 provider.try_fetch_state_cert(retry, epoch).await
1406 })
1407 .await;
1408
1409 match &local_result {
1411 Ok(_) => return local_result,
1412 Err(err) => tracing::debug!("{err:#}"),
1413 }
1414
1415 self.on_remote_providers(move |provider| async move {
1417 provider.try_fetch_state_cert(retry, epoch).await
1418 })
1419 .await
1420 }
1421
1422 fn backoff(&self) -> &BackoffParams {
1423 &self.backoff
1424 }
1425
1426 fn name(&self) -> String {
1427 format!(
1428 "[{}]",
1429 self.providers
1430 .lock()
1431 .iter()
1432 .map(|p| p.name())
1433 .collect::<Vec<_>>()
1434 .join(", ")
1435 )
1436 }
1437
1438 async fn fetch_accounts(
1439 &self,
1440 instance: &NodeState,
1441 height: u64,
1442 view: ViewNumber,
1443 fee_merkle_tree_root: FeeMerkleCommitment,
1444 accounts: Vec<FeeAccount>,
1445 ) -> anyhow::Result<Vec<FeeAccountProof>> {
1446 let accounts_vec = accounts.to_vec();
1448 let local_result = self
1449 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1450 clone! {(instance, accounts_vec) async move {
1451 provider
1452 .try_fetch_accounts(
1453 0,
1454 &instance,
1455 height,
1456 view,
1457 fee_merkle_tree_root,
1458 &accounts_vec,
1459 )
1460 .await
1461 }}
1462 }})
1463 .await;
1464
1465 match &local_result {
1467 Ok(_) => return local_result,
1468 Err(err) => tracing::debug!("{err:#}"),
1469 }
1470
1471 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1473 clone!{(instance, accounts_vec) async move {
1474 provider
1475 .fetch_accounts(
1476 &instance,
1477 height,
1478 view,
1479 fee_merkle_tree_root,
1480 accounts_vec,
1481 ).await
1482 }}
1483 }})
1484 .await
1485 }
1486
1487 async fn fetch_leaf(
1488 &self,
1489 height: u64,
1490 stake_table: HSStakeTable<SeqTypes>,
1491 success_threshold: U256,
1492 ) -> anyhow::Result<Leaf2> {
1493 let local_result = self
1495 .on_local_providers(clone! {(stake_table) move |provider| {
1496 clone!{(stake_table) async move {
1497 provider
1498 .try_fetch_leaf(0, height, stake_table, success_threshold)
1499 .await
1500 }}
1501 }})
1502 .await;
1503
1504 match &local_result {
1506 Ok(_) => return local_result,
1507 Err(err) => tracing::debug!("{err:#}"),
1508 }
1509
1510 self.on_remote_providers(clone! {(stake_table) move |provider| {
1512 clone!{(stake_table) async move {
1513 provider
1514 .fetch_leaf(height, stake_table, success_threshold)
1515 .await
1516 }}
1517 }})
1518 .await
1519 }
1520
1521 async fn fetch_chain_config(
1522 &self,
1523 commitment: Commitment<ChainConfig>,
1524 ) -> anyhow::Result<ChainConfig> {
1525 let local_result = self
1527 .on_local_providers(move |provider| async move {
1528 provider.try_fetch_chain_config(0, commitment).await
1529 })
1530 .await;
1531
1532 match &local_result {
1534 Ok(_) => return local_result,
1535 Err(err) => tracing::debug!("{err:#}"),
1536 }
1537
1538 self.on_remote_providers(move |provider| async move {
1540 provider.fetch_chain_config(commitment).await
1541 })
1542 .await
1543 }
1544
1545 async fn fetch_reward_accounts_v1(
1546 &self,
1547 instance: &NodeState,
1548 height: u64,
1549 view: ViewNumber,
1550 reward_merkle_tree_root: RewardMerkleCommitmentV1,
1551 accounts: Vec<RewardAccountV1>,
1552 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
1553 let accounts_vec = accounts.to_vec();
1555 let local_result = self
1556 .on_local_providers(clone! {(instance, accounts_vec) move |provider| {
1557 clone! {(instance, accounts_vec) async move {
1558 provider
1559 .try_fetch_reward_accounts_v1(
1560 0,
1561 &instance,
1562 height,
1563 view,
1564 reward_merkle_tree_root,
1565 &accounts_vec,
1566 )
1567 .await
1568 }}
1569 }})
1570 .await;
1571
1572 match &local_result {
1574 Ok(_) => return local_result,
1575 Err(err) => tracing::debug!("{err:#}"),
1576 }
1577
1578 self.on_remote_providers(clone! {(instance, accounts_vec) move |provider| {
1580 clone!{(instance, accounts_vec) async move {
1581 provider
1582 .fetch_reward_accounts_v1(
1583 &instance,
1584 height,
1585 view,
1586 reward_merkle_tree_root,
1587 accounts_vec,
1588 ).await
1589 }}
1590 }})
1591 .await
1592 }
1593
1594 async fn fetch_state_cert(
1595 &self,
1596 epoch: u64,
1597 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
1598 let local_result = self
1599 .on_local_providers(move |provider| async move {
1600 provider.try_fetch_state_cert(0, epoch).await
1601 })
1602 .await;
1603
1604 match &local_result {
1606 Ok(_) => return local_result,
1607 Err(err) => tracing::debug!("{err:#}"),
1608 }
1609
1610 self.on_remote_providers(
1612 move |provider| async move { provider.fetch_state_cert(epoch).await },
1613 )
1614 .await
1615 }
1616
1617 async fn remember_blocks_merkle_tree(
1618 &self,
1619 instance: &NodeState,
1620 height: u64,
1621 view: ViewNumber,
1622 mt: &mut BlockMerkleTree,
1623 ) -> anyhow::Result<()> {
1624 let local_result = self
1626 .on_local_providers(clone! {(mt, instance) move |provider| {
1627 let mut mt = mt.clone();
1628 clone! {(instance) async move {
1629 provider
1631 .try_remember_blocks_merkle_tree(
1632 0,
1633 &instance,
1634 height,
1635 view,
1636 &mut mt,
1637 )
1638 .await?;
1639
1640 Ok(mt)
1642 }}
1643 }})
1644 .await;
1645
1646 if let Ok(modified_mt) = local_result {
1648 *mt = modified_mt;
1651
1652 return Ok(());
1653 }
1654
1655 let remote_result = self
1657 .on_remote_providers(clone! {(mt, instance) move |provider| {
1658 let mut mt = mt.clone();
1659 clone!{(instance) async move {
1660 provider
1662 .remember_blocks_merkle_tree(
1663 &instance,
1664 height,
1665 view,
1666 &mut mt,
1667 )
1668 .await?;
1669
1670 Ok(mt)
1672 }}
1673 }})
1674 .await?;
1675
1676 *mt = remote_result;
1678
1679 Ok(())
1680 }
1681
1682 fn is_local(&self) -> bool {
1683 self.providers.lock().iter().all(|p| p.is_local())
1684 }
1685}
1686
1687#[allow(clippy::type_complexity)]
1690pub async fn add_fee_accounts_to_state<N: ConnectedNetwork<PubKey>, P: SequencerPersistence>(
1691 consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1692 view: &ViewNumber,
1693 accounts: &[FeeAccount],
1694 tree: &FeeMerkleTree,
1695 leaf: Leaf2,
1696) -> anyhow::Result<()> {
1697 let mut consensus = consensus.write().await;
1699
1700 let (state, delta) = match consensus.validated_state_map().get(view) {
1701 Some(View {
1702 view_inner: ViewInner::Leaf { state, delta, .. },
1703 }) => {
1704 let mut state = (**state).clone();
1705
1706 for account in accounts {
1708 if let Some((proof, _)) = FeeAccountProof::prove(tree, (*account).into()) {
1709 if let Err(err) = proof.remember(&mut state.fee_merkle_tree) {
1710 tracing::warn!(
1711 ?view,
1712 %account,
1713 "cannot update fetched account state: {err:#}"
1714 );
1715 }
1716 } else {
1717 tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1718 };
1719 }
1720
1721 (Arc::new(state), delta.clone())
1722 },
1723 _ => {
1724 let mut state = ValidatedState::from_header(leaf.block_header());
1729 state.fee_merkle_tree = tree.clone();
1730 (Arc::new(state), None)
1731 },
1732 };
1733
1734 consensus
1735 .update_leaf(leaf, Arc::clone(&state), delta)
1736 .with_context(|| "failed to update leaf")?;
1737
1738 Ok(())
1739}
1740
1741#[allow(clippy::type_complexity)]
1744pub async fn add_v2_reward_accounts_to_state<
1745 N: ConnectedNetwork<PubKey>,
1746 P: SequencerPersistence,
1747>(
1748 consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1749 view: &ViewNumber,
1750 accounts: &[RewardAccountV2],
1751 tree: &RewardMerkleTreeV2,
1752 leaf: Leaf2,
1753) -> anyhow::Result<()> {
1754 let mut consensus = consensus.write().await;
1756
1757 let (state, delta) = match consensus.validated_state_map().get(view) {
1758 Some(View {
1759 view_inner: ViewInner::Leaf { state, delta, .. },
1760 }) => {
1761 let mut state = (**state).clone();
1762
1763 for account in accounts {
1765 if let Some((proof, _)) = RewardAccountProofV2::prove(tree, (*account).into()) {
1766 if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v2) {
1767 tracing::warn!(
1768 ?view,
1769 %account,
1770 "cannot update fetched account state: {err:#}"
1771 );
1772 }
1773 } else {
1774 tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1775 };
1776 }
1777
1778 (Arc::new(state), delta.clone())
1779 },
1780 _ => {
1781 let mut state = ValidatedState::from_header(leaf.block_header());
1786 state.reward_merkle_tree_v2 = tree.clone();
1787 (Arc::new(state), None)
1788 },
1789 };
1790
1791 consensus
1792 .update_leaf(leaf, Arc::clone(&state), delta)
1793 .with_context(|| "failed to update leaf")?;
1794
1795 Ok(())
1796}
1797
1798#[allow(clippy::type_complexity)]
1801pub async fn add_v1_reward_accounts_to_state<
1802 N: ConnectedNetwork<PubKey>,
1803 P: SequencerPersistence,
1804>(
1805 consensus: &Arc<RwLock<Consensus<SeqTypes>>>,
1806 view: &ViewNumber,
1807 accounts: &[RewardAccountV1],
1808 tree: &RewardMerkleTreeV1,
1809 leaf: Leaf2,
1810) -> anyhow::Result<()> {
1811 let mut consensus = consensus.write().await;
1813
1814 let (state, delta) = match consensus.validated_state_map().get(view) {
1815 Some(View {
1816 view_inner: ViewInner::Leaf { state, delta, .. },
1817 }) => {
1818 let mut state = (**state).clone();
1819
1820 for account in accounts {
1822 if let Some((proof, _)) = RewardAccountProofV1::prove(tree, (*account).into()) {
1823 if let Err(err) = proof.remember(&mut state.reward_merkle_tree_v1) {
1824 tracing::warn!(
1825 ?view,
1826 %account,
1827 "cannot update fetched account state: {err:#}"
1828 );
1829 }
1830 } else {
1831 tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
1832 };
1833 }
1834
1835 (Arc::new(state), delta.clone())
1836 },
1837 _ => {
1838 let mut state = ValidatedState::from_header(leaf.block_header());
1843 state.reward_merkle_tree_v1 = tree.clone();
1844 (Arc::new(state), None)
1845 },
1846 };
1847
1848 consensus
1849 .update_leaf(leaf, Arc::clone(&state), delta)
1850 .with_context(|| "failed to update leaf")?;
1851
1852 Ok(())
1853}
1854
1855#[cfg(test)]
1856mod test {
1857 use super::*;
1858
1859 #[test]
1860 fn test_peer_priority() {
1861 let good_peer = PeerScore {
1862 requests: 1000,
1863 failures: 2,
1864 };
1865 let bad_peer = PeerScore {
1866 requests: 10,
1867 failures: 1,
1868 };
1869 assert!(good_peer > bad_peer);
1870
1871 let mut peers: PriorityQueue<_, _> = [(0, good_peer), (1, bad_peer)].into_iter().collect();
1872 assert_eq!(peers.pop(), Some((0, good_peer)));
1873 assert_eq!(peers.pop(), Some((1, bad_peer)));
1874 }
1875}