1use std::sync::Arc;
2
3use alloy::primitives::U256;
4use anyhow::Context;
5use async_trait::async_trait;
6use committable::{Commitment, Committable};
7use espresso_types::{
8 BackoffParams, BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2,
9 NodeState, PubKey, SeqTypes,
10 traits::{SequencerPersistence, StateCatchup},
11 v0_3::{ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1},
12 v0_4::{
13 PermittedRewardMerkleTreeV2, RewardAccountV2, RewardMerkleCommitmentV2,
14 forgotten_accounts_include,
15 },
16};
17use hotshot::traits::NodeImplementation;
18use hotshot_types::{
19 data::ViewNumber, message::UpgradeLock,
20 simple_certificate::LightClientStateUpdateCertificateV2, stake_table::HSStakeTable,
21 traits::network::ConnectedNetwork, utils::verify_leaf_chain,
22};
23use jf_merkle_tree_compat::{ForgetableMerkleTreeScheme, MerkleTreeScheme};
24use request_response::RequestType;
25use tokio::time::timeout;
26use versions::EPOCH_VERSION;
27
28use crate::{
29 api::RewardMerkleTreeV2Data,
30 request_response::{
31 RequestResponseProtocol,
32 request::{Request, Response},
33 },
34};
35
36#[async_trait]
37impl<I: NodeImplementation<SeqTypes>, N: ConnectedNetwork<PubKey>, P: SequencerPersistence>
38 StateCatchup for RequestResponseProtocol<I, N, P>
39{
40 async fn try_fetch_leaf(
41 &self,
42 _retry: usize,
43 height: u64,
44 stake_table: HSStakeTable<SeqTypes>,
45 success_threshold: U256,
46 ) -> anyhow::Result<Leaf2> {
47 let timeout_duration = self.config.request_batch_interval * 3;
49
50 timeout(
52 timeout_duration,
53 self.fetch_leaf(height, stake_table, success_threshold),
54 )
55 .await
56 .with_context(|| "timed out while fetching leaf")?
57 }
58
59 async fn try_fetch_accounts(
60 &self,
61 _retry: usize,
62 instance: &NodeState,
63 height: u64,
64 view: ViewNumber,
65 fee_merkle_tree_root: FeeMerkleCommitment,
66 accounts: &[FeeAccount],
67 ) -> anyhow::Result<Vec<FeeAccountProof>> {
68 let timeout_duration = self.config.request_batch_interval * 3;
70
71 timeout(
73 timeout_duration,
74 self.fetch_accounts(
75 instance,
76 height,
77 view,
78 fee_merkle_tree_root,
79 accounts.to_vec(),
80 ),
81 )
82 .await
83 .with_context(|| "timed out while fetching accounts")?
84 }
85
86 async fn try_remember_blocks_merkle_tree(
87 &self,
88 _retry: usize,
89 instance: &NodeState,
90 height: u64,
91 view: ViewNumber,
92 mt: &mut BlockMerkleTree,
93 ) -> anyhow::Result<()> {
94 let timeout_duration = self.config.request_batch_interval * 3;
96
97 timeout(
99 timeout_duration,
100 self.remember_blocks_merkle_tree(instance, height, view, mt),
101 )
102 .await
103 .with_context(|| "timed out while remembering blocks merkle tree")?
104 }
105
106 async fn try_fetch_chain_config(
107 &self,
108 _retry: usize,
109 commitment: Commitment<ChainConfig>,
110 ) -> anyhow::Result<ChainConfig> {
111 let timeout_duration = self.config.request_batch_interval * 3;
113
114 timeout(timeout_duration, self.fetch_chain_config(commitment))
116 .await
117 .with_context(|| "timed out while fetching chain config")?
118 }
119
120 async fn try_fetch_reward_merkle_tree_v2(
121 &self,
122 _retry: usize,
123 height: u64,
124 view: ViewNumber,
125 reward_merkle_tree_root: RewardMerkleCommitmentV2,
126 accounts: Arc<Vec<RewardAccountV2>>,
127 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
128 let timeout_duration = self.config.request_batch_interval * 3;
130
131 timeout(
133 timeout_duration,
134 self.fetch_reward_merkle_tree_v2(height, view, reward_merkle_tree_root, accounts),
135 )
136 .await
137 .with_context(|| "timed out while fetching reward merkle tree v2")?
138 }
139
140 async fn try_fetch_reward_accounts_v1(
141 &self,
142 _retry: usize,
143 instance: &NodeState,
144 height: u64,
145 view: ViewNumber,
146 reward_merkle_tree_root: RewardMerkleCommitmentV1,
147 accounts: &[RewardAccountV1],
148 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
149 let timeout_duration = self.config.request_batch_interval * 3;
151
152 timeout(
154 timeout_duration,
155 self.fetch_reward_accounts_v1(
156 instance,
157 height,
158 view,
159 reward_merkle_tree_root,
160 accounts.to_vec(),
161 ),
162 )
163 .await
164 .with_context(|| "timed out while fetching reward accounts")?
165 }
166
167 async fn try_fetch_state_cert(
168 &self,
169 _retry: usize,
170 epoch: u64,
171 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
172 let timeout_duration = self.config.request_batch_interval * 3;
173
174 timeout(timeout_duration, self.fetch_state_cert(epoch))
175 .await
176 .with_context(|| "timed out while fetching state cert")?
177 }
178
179 fn backoff(&self) -> &BackoffParams {
180 unreachable!()
181 }
182
183 fn name(&self) -> String {
184 "request-response".to_string()
185 }
186
187 async fn fetch_accounts(
188 &self,
189 _instance: &NodeState,
190 height: u64,
191 view: ViewNumber,
192 fee_merkle_tree_root: FeeMerkleCommitment,
193 accounts: Vec<FeeAccount>,
194 ) -> anyhow::Result<Vec<FeeAccountProof>> {
195 tracing::info!("Fetching accounts for height: {height}, view: {view}");
196
197 let accounts_clone = accounts.clone();
199 let response_validation_fn = move |_request: &Request, response: Response| {
200 let accounts_clone = accounts_clone.clone();
202
203 async move {
204 let Response::Accounts(fee_merkle_tree) = response else {
206 return Err(anyhow::anyhow!("expected accounts response"));
207 };
208
209 let mut proofs = Vec::new();
211 for account in accounts_clone {
212 let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree, account.into())
213 .with_context(|| format!("response was missing account {account}"))?;
214 proof.verify(&fee_merkle_tree_root).with_context(|| {
215 format!(
216 "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
217 )
218 })?;
219 proofs.push(proof);
220 }
221
222 Ok(proofs)
223 }
224 };
225
226 let response = self
228 .request_indefinitely(
229 Request::Accounts(height, *view, accounts),
230 RequestType::Batched,
231 response_validation_fn,
232 )
233 .await
234 .with_context(|| "failed to request accounts")?;
235
236 tracing::info!("Fetched accounts for height: {height}, view: {view}");
237
238 Ok(response)
239 }
240
241 async fn fetch_leaf(
242 &self,
243 height: u64,
244 stake_table: HSStakeTable<SeqTypes>,
245 success_threshold: U256,
246 ) -> anyhow::Result<Leaf2> {
247 tracing::info!("Fetching leaf for height: {height}");
248
249 let stake_table_clone = stake_table.clone();
251 let response_validation_fn = move |_request: &Request, response: Response| {
252 let stake_table_clone = stake_table_clone.clone();
254
255 async move {
256 let Response::Leaf(leaf_chain) = response else {
258 return Err(anyhow::anyhow!("expected leaf response"));
259 };
260
261 let leaf = verify_leaf_chain(
263 leaf_chain,
264 &stake_table_clone,
265 success_threshold,
266 height,
267 &UpgradeLock::<SeqTypes>::new(versions::Upgrade::trivial(EPOCH_VERSION)),
268 )
269 .await
270 .with_context(|| "leaf chain verification failed")?;
271
272 Ok(leaf)
273 }
274 };
275
276 let response = self
278 .request_indefinitely(
279 Request::Leaf(height),
280 RequestType::Batched,
281 response_validation_fn,
282 )
283 .await
284 .with_context(|| "failed to request leaf")?;
285
286 tracing::info!("Fetched leaf for height: {height}");
287
288 Ok(response)
289 }
290
291 async fn fetch_chain_config(
292 &self,
293 commitment: Commitment<ChainConfig>,
294 ) -> anyhow::Result<ChainConfig> {
295 tracing::info!("Fetching chain config with commitment: {commitment}");
296
297 let response_validation_fn = move |_request: &Request, response: Response| {
299 async move {
300 let Response::ChainConfig(chain_config) = response else {
302 return Err(anyhow::anyhow!("expected chain config response"));
303 };
304
305 if commitment != chain_config.commit() {
307 return Err(anyhow::anyhow!("chain config commitment mismatch"));
308 }
309
310 Ok(chain_config)
311 }
312 };
313
314 let response = self
316 .request_indefinitely(
317 Request::ChainConfig(commitment),
318 RequestType::Batched,
319 response_validation_fn,
320 )
321 .await
322 .with_context(|| "failed to request chain config")?;
323
324 tracing::info!("Fetched chain config with commitment: {commitment}");
325
326 Ok(response)
327 }
328
329 async fn remember_blocks_merkle_tree(
330 &self,
331 _instance: &NodeState,
332 height: u64,
333 view: ViewNumber,
334 mt: &mut BlockMerkleTree,
335 ) -> anyhow::Result<()> {
336 tracing::info!("Fetching blocks frontier for height: {height}, view: {view}");
337
338 let mt_clone = mt.clone();
340
341 let response_validation_fn = move |_request: &Request, response: Response| {
343 let mut block_merkle_tree = mt_clone.clone();
345
346 async move {
347 let Response::BlocksFrontier(blocks_frontier) = response else {
349 return Err(anyhow::anyhow!("expected blocks frontier response"));
350 };
351
352 let leaf_elem = blocks_frontier
354 .elem()
355 .with_context(|| "provided frontier is missing leaf element")?;
356
357 block_merkle_tree
359 .remember(
360 block_merkle_tree.num_leaves() - 1,
361 *leaf_elem,
362 blocks_frontier,
363 )
364 .with_context(|| "merkle tree verification failed")?;
365
366 Ok(block_merkle_tree)
368 }
369 };
370
371 let response = self
373 .request_indefinitely(
374 Request::BlocksFrontier(height, *view),
375 RequestType::Batched,
376 response_validation_fn,
377 )
378 .await
379 .with_context(|| "failed to request blocks frontier")?;
380
381 *mt = response;
383
384 tracing::info!("Fetched blocks frontier for height: {height}, view: {view}");
385
386 Ok(())
387 }
388
389 async fn fetch_reward_merkle_tree_v2(
390 &self,
391 height: u64,
392 view: ViewNumber,
393 reward_merkle_tree_root: RewardMerkleCommitmentV2,
394 accounts: Arc<Vec<RewardAccountV2>>,
395 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
396 tracing::info!("Fetching RewardMerkleTreeV2 for height: {height}");
397
398 let response_validation_fn = move |_request: &Request, response: Response| {
400 let accounts = accounts.clone();
401 async move {
402 let Response::RewardMerkleTreeV2(tree_bytes) = response else {
404 return Err(anyhow::anyhow!("expected reward accounts response"));
405 };
406
407 let tree_data = bincode::deserialize::<RewardMerkleTreeV2Data>(&tree_bytes)
408 .context(
409 "Failed to deserialize RewardMerkleTreeV2 for height {height} from remote",
410 )?;
411
412 let reward_merkle_tree: PermittedRewardMerkleTreeV2 =
414 PermittedRewardMerkleTreeV2::try_from_kv_set(tree_data.balances).await?;
415
416 anyhow::ensure!(reward_merkle_tree.tree.commitment() == reward_merkle_tree_root);
417 anyhow::ensure!(!forgotten_accounts_include(&reward_merkle_tree, &accounts));
418
419 Ok(reward_merkle_tree)
420 }
421 };
422
423 let response = self
425 .request_indefinitely(
426 Request::RewardMerkleTreeV2(height, *view),
427 RequestType::Batched,
428 response_validation_fn,
429 )
430 .await
431 .with_context(|| "failed to request reward accounts")?;
432
433 tracing::info!("Fetched RewardMerkleTreeV2 for height: {height}");
434
435 Ok(response)
436 }
437
438 async fn fetch_reward_accounts_v1(
439 &self,
440 _instance: &NodeState,
441 height: u64,
442 view: ViewNumber,
443 reward_merkle_tree_root: RewardMerkleCommitmentV1,
444 accounts: Vec<RewardAccountV1>,
445 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
446 tracing::info!("Fetching v1 reward accounts for height: {height}, view: {view}");
447
448 let accounts_clone = accounts.clone();
450
451 let response_validation_fn = move |_request: &Request, response: Response| {
453 let accounts_clone = accounts_clone.clone();
455
456 async move {
457 let Response::RewardAccountsV1(reward_merkle_tree) = response else {
459 return Err(anyhow::anyhow!("expected v1 reward accounts response"));
460 };
461
462 let mut proofs = Vec::new();
464 for account in accounts_clone {
465 let (proof, _) =
466 RewardAccountProofV1::prove(&reward_merkle_tree, account.into())
467 .with_context(|| format!("response was missing account {account}"))?;
468 proof.verify(&reward_merkle_tree_root).with_context(|| {
469 format!(
470 "invalid proof for v1 reward account {account}, root: \
471 {reward_merkle_tree_root}"
472 )
473 })?;
474 proofs.push(proof);
475 }
476
477 Ok(proofs)
478 }
479 };
480
481 let response = self
483 .request_indefinitely(
484 Request::RewardAccountsV1(height, *view, accounts),
485 RequestType::Batched,
486 response_validation_fn,
487 )
488 .await
489 .with_context(|| "failed to request v1 reward accounts")?;
490
491 tracing::info!("Fetched v1 reward accounts for height: {height}, view: {view}");
492
493 Ok(response)
494 }
495
496 async fn fetch_state_cert(
497 &self,
498 epoch: u64,
499 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
500 tracing::info!("Fetching state cert for epoch: {epoch}");
501
502 let response_validation_fn = move |_request: &Request, response: Response| async move {
504 let Response::StateCert(state_cert) = response else {
506 return Err(anyhow::anyhow!("expected state cert response"));
507 };
508
509 Ok(state_cert)
510 };
511
512 let response = self
514 .request_indefinitely(
515 Request::StateCert(epoch),
516 RequestType::Batched,
517 response_validation_fn,
518 )
519 .await
520 .with_context(|| "failed to request state cert")?;
521
522 tracing::info!("Fetched state cert for epoch: {epoch}");
523
524 Ok(response)
525 }
526
527 fn is_local(&self) -> bool {
528 false
529 }
530}