1use std::sync::Arc;
2
3use alloy::primitives::U256;
4use anyhow::Context;
5use async_trait::async_trait;
6use committable::{Commitment, Committable};
7use espresso_types::{
8 BackoffParams, BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2,
9 NodeState, PubKey, SeqTypes,
10 traits::{SequencerPersistence, StateCatchup},
11 v0_3::{ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1},
12 v0_4::{
13 PermittedRewardMerkleTreeV2, RewardAccountV2, RewardMerkleCommitmentV2,
14 forgotten_accounts_include,
15 },
16};
17use hotshot::traits::NodeImplementation;
18use hotshot_new_protocol::utils::verify_leaf_chain_with_cert2;
19use hotshot_types::{
20 data::ViewNumber, message::UpgradeLock,
21 simple_certificate::LightClientStateUpdateCertificateV2, stake_table::HSStakeTable,
22 traits::network::ConnectedNetwork, utils::verify_leaf_chain,
23};
24use jf_merkle_tree_compat::{ForgetableMerkleTreeScheme, MerkleTreeScheme};
25use request_response::RequestType;
26use tokio::time::timeout;
27use versions::{EPOCH_VERSION, NEW_PROTOCOL_VERSION};
28
29use crate::{
30 api::RewardMerkleTreeV2Data,
31 request_response::{
32 RequestResponseProtocol,
33 request::{Request, Response},
34 },
35};
36
37#[async_trait]
38impl<I: NodeImplementation<SeqTypes>, N: ConnectedNetwork<PubKey>, P: SequencerPersistence>
39 StateCatchup for RequestResponseProtocol<I, N, P>
40{
41 async fn try_fetch_leaf(
42 &self,
43 _retry: usize,
44 height: u64,
45 stake_table: HSStakeTable<SeqTypes>,
46 success_threshold: U256,
47 ) -> anyhow::Result<Leaf2> {
48 let timeout_duration = self.config.request_batch_interval * 3;
50
51 timeout(
53 timeout_duration,
54 self.fetch_leaf(height, stake_table, success_threshold),
55 )
56 .await
57 .with_context(|| "timed out while fetching leaf")?
58 }
59
60 async fn try_fetch_accounts(
61 &self,
62 _retry: usize,
63 instance: &NodeState,
64 height: u64,
65 view: ViewNumber,
66 fee_merkle_tree_root: FeeMerkleCommitment,
67 accounts: &[FeeAccount],
68 ) -> anyhow::Result<Vec<FeeAccountProof>> {
69 let timeout_duration = self.config.request_batch_interval * 3;
71
72 timeout(
74 timeout_duration,
75 self.fetch_accounts(
76 instance,
77 height,
78 view,
79 fee_merkle_tree_root,
80 accounts.to_vec(),
81 ),
82 )
83 .await
84 .with_context(|| "timed out while fetching accounts")?
85 }
86
87 async fn try_remember_blocks_merkle_tree(
88 &self,
89 _retry: usize,
90 instance: &NodeState,
91 height: u64,
92 view: ViewNumber,
93 mt: &mut BlockMerkleTree,
94 ) -> anyhow::Result<()> {
95 let timeout_duration = self.config.request_batch_interval * 3;
97
98 timeout(
100 timeout_duration,
101 self.remember_blocks_merkle_tree(instance, height, view, mt),
102 )
103 .await
104 .with_context(|| "timed out while remembering blocks merkle tree")?
105 }
106
107 async fn try_fetch_chain_config(
108 &self,
109 _retry: usize,
110 commitment: Commitment<ChainConfig>,
111 ) -> anyhow::Result<ChainConfig> {
112 let timeout_duration = self.config.request_batch_interval * 3;
114
115 timeout(timeout_duration, self.fetch_chain_config(commitment))
117 .await
118 .with_context(|| "timed out while fetching chain config")?
119 }
120
121 async fn try_fetch_reward_merkle_tree_v2(
122 &self,
123 _retry: usize,
124 height: u64,
125 view: ViewNumber,
126 reward_merkle_tree_root: RewardMerkleCommitmentV2,
127 accounts: Arc<Vec<RewardAccountV2>>,
128 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
129 let timeout_duration = self.config.request_batch_interval * 3;
131
132 timeout(
134 timeout_duration,
135 self.fetch_reward_merkle_tree_v2(height, view, reward_merkle_tree_root, accounts),
136 )
137 .await
138 .with_context(|| "timed out while fetching reward merkle tree v2")?
139 }
140
141 async fn try_fetch_reward_accounts_v1(
142 &self,
143 _retry: usize,
144 instance: &NodeState,
145 height: u64,
146 view: ViewNumber,
147 reward_merkle_tree_root: RewardMerkleCommitmentV1,
148 accounts: &[RewardAccountV1],
149 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
150 let timeout_duration = self.config.request_batch_interval * 3;
152
153 timeout(
155 timeout_duration,
156 self.fetch_reward_accounts_v1(
157 instance,
158 height,
159 view,
160 reward_merkle_tree_root,
161 accounts.to_vec(),
162 ),
163 )
164 .await
165 .with_context(|| "timed out while fetching reward accounts")?
166 }
167
168 async fn try_fetch_state_cert(
169 &self,
170 _retry: usize,
171 epoch: u64,
172 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
173 let timeout_duration = self.config.request_batch_interval * 3;
174
175 timeout(timeout_duration, self.fetch_state_cert(epoch))
176 .await
177 .with_context(|| "timed out while fetching state cert")?
178 }
179
180 fn backoff(&self) -> &BackoffParams {
181 unreachable!()
182 }
183
184 fn name(&self) -> String {
185 "request-response".to_string()
186 }
187
188 async fn fetch_accounts(
189 &self,
190 _instance: &NodeState,
191 height: u64,
192 view: ViewNumber,
193 fee_merkle_tree_root: FeeMerkleCommitment,
194 accounts: Vec<FeeAccount>,
195 ) -> anyhow::Result<Vec<FeeAccountProof>> {
196 tracing::info!("Fetching accounts for height: {height}, view: {view}");
197
198 let accounts_clone = accounts.clone();
200 let response_validation_fn = move |_request: &Request, response: Response| {
201 let accounts_clone = accounts_clone.clone();
203
204 async move {
205 let Response::Accounts(fee_merkle_tree) = response else {
207 return Err(anyhow::anyhow!("expected accounts response"));
208 };
209
210 let mut proofs = Vec::new();
212 for account in accounts_clone {
213 let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree, account.into())
214 .with_context(|| format!("response was missing account {account}"))?;
215 proof.verify(&fee_merkle_tree_root).with_context(|| {
216 format!(
217 "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
218 )
219 })?;
220 proofs.push(proof);
221 }
222
223 Ok(proofs)
224 }
225 };
226
227 let response = self
229 .request_indefinitely(
230 Request::Accounts(height, *view, accounts),
231 RequestType::Batched,
232 response_validation_fn,
233 )
234 .await
235 .with_context(|| "failed to request accounts")?;
236
237 tracing::info!("Fetched accounts for height: {height}, view: {view}");
238
239 Ok(response)
240 }
241
242 async fn fetch_leaf(
243 &self,
244 height: u64,
245 stake_table: HSStakeTable<SeqTypes>,
246 success_threshold: U256,
247 ) -> anyhow::Result<Leaf2> {
248 tracing::info!("Fetching leaf for height: {height}");
249
250 let leaf_chain = self
253 .request_indefinitely(
254 Request::Leaf(height),
255 RequestType::Batched,
256 move |_request: &Request, response: Response| async move {
257 let Response::Leaf(leaves) = response else {
258 return Err(anyhow::anyhow!("expected leaf response"));
259 };
260 if leaves.is_empty() {
261 return Err(anyhow::anyhow!(
262 "received empty leaf chain for height {height}"
263 ));
264 }
265 Ok(leaves)
266 },
267 )
268 .await
269 .with_context(|| "failed to request leaf chain")?;
270
271 let result = if leaf_chain[0].block_header().version() >= NEW_PROTOCOL_VERSION {
272 let upgrade_lock =
274 UpgradeLock::<SeqTypes>::new(versions::Upgrade::trivial(NEW_PROTOCOL_VERSION));
275 let cert2 = self
276 .request_indefinitely(
277 Request::Cert2(height),
278 RequestType::Batched,
279 move |_request: &Request, response: Response| async move {
280 let Response::Cert2(cert2) = response else {
281 return Err(anyhow::anyhow!("expected cert2 response"));
282 };
283 if cert2.data.block_number < height {
284 return Err(anyhow::anyhow!(
285 "received cert2 at height {} below requested height {height}",
286 cert2.data.block_number
287 ));
288 }
289 Ok(cert2)
290 },
291 )
292 .await
293 .with_context(|| format!("failed to request cert2 at or above height {height}"))?;
294
295 verify_leaf_chain_with_cert2(
296 leaf_chain,
297 &stake_table,
298 success_threshold,
299 height,
300 &upgrade_lock,
301 cert2,
302 )
303 .await
304 .with_context(|| "leaf chain verification with cert2 failed")?
305 } else {
306 let upgrade_lock =
307 UpgradeLock::<SeqTypes>::new(versions::Upgrade::trivial(EPOCH_VERSION));
308 verify_leaf_chain(
309 leaf_chain,
310 &stake_table,
311 success_threshold,
312 height,
313 &upgrade_lock,
314 )
315 .await
316 .with_context(|| "leaf chain verification failed")?
317 };
318
319 tracing::info!("Fetched leaf for height: {height}");
320 Ok(result)
321 }
322
323 async fn fetch_chain_config(
324 &self,
325 commitment: Commitment<ChainConfig>,
326 ) -> anyhow::Result<ChainConfig> {
327 tracing::info!("Fetching chain config with commitment: {commitment}");
328
329 let response_validation_fn = move |_request: &Request, response: Response| {
331 async move {
332 let Response::ChainConfig(chain_config) = response else {
334 return Err(anyhow::anyhow!("expected chain config response"));
335 };
336
337 if commitment != chain_config.commit() {
339 return Err(anyhow::anyhow!("chain config commitment mismatch"));
340 }
341
342 Ok(chain_config)
343 }
344 };
345
346 let response = self
348 .request_indefinitely(
349 Request::ChainConfig(commitment),
350 RequestType::Batched,
351 response_validation_fn,
352 )
353 .await
354 .with_context(|| "failed to request chain config")?;
355
356 tracing::info!("Fetched chain config with commitment: {commitment}");
357
358 Ok(response)
359 }
360
361 async fn remember_blocks_merkle_tree(
362 &self,
363 _instance: &NodeState,
364 height: u64,
365 view: ViewNumber,
366 mt: &mut BlockMerkleTree,
367 ) -> anyhow::Result<()> {
368 tracing::info!("Fetching blocks frontier for height: {height}, view: {view}");
369
370 let mt_clone = mt.clone();
372
373 let response_validation_fn = move |_request: &Request, response: Response| {
375 let mut block_merkle_tree = mt_clone.clone();
377
378 async move {
379 let Response::BlocksFrontier(blocks_frontier) = response else {
381 return Err(anyhow::anyhow!("expected blocks frontier response"));
382 };
383
384 let leaf_elem = blocks_frontier
386 .elem()
387 .with_context(|| "provided frontier is missing leaf element")?;
388
389 block_merkle_tree
391 .remember(
392 block_merkle_tree.num_leaves() - 1,
393 *leaf_elem,
394 blocks_frontier,
395 )
396 .with_context(|| "merkle tree verification failed")?;
397
398 Ok(block_merkle_tree)
400 }
401 };
402
403 let response = self
405 .request_indefinitely(
406 Request::BlocksFrontier(height, *view),
407 RequestType::Batched,
408 response_validation_fn,
409 )
410 .await
411 .with_context(|| "failed to request blocks frontier")?;
412
413 *mt = response;
415
416 tracing::info!("Fetched blocks frontier for height: {height}, view: {view}");
417
418 Ok(())
419 }
420
421 async fn fetch_reward_merkle_tree_v2(
422 &self,
423 height: u64,
424 view: ViewNumber,
425 reward_merkle_tree_root: RewardMerkleCommitmentV2,
426 accounts: Arc<Vec<RewardAccountV2>>,
427 ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
428 tracing::info!("Fetching RewardMerkleTreeV2 for height: {height}");
429
430 let response_validation_fn = move |_request: &Request, response: Response| {
432 let accounts = accounts.clone();
433 async move {
434 let Response::RewardMerkleTreeV2(tree_bytes) = response else {
436 return Err(anyhow::anyhow!("expected reward accounts response"));
437 };
438
439 let tree_data = bincode::deserialize::<RewardMerkleTreeV2Data>(&tree_bytes)
440 .context(
441 "Failed to deserialize RewardMerkleTreeV2 for height {height} from remote",
442 )?;
443
444 let reward_merkle_tree: PermittedRewardMerkleTreeV2 =
446 PermittedRewardMerkleTreeV2::try_from_kv_set(tree_data.balances).await?;
447
448 anyhow::ensure!(reward_merkle_tree.tree.commitment() == reward_merkle_tree_root);
449 anyhow::ensure!(!forgotten_accounts_include(&reward_merkle_tree, &accounts));
450
451 Ok(reward_merkle_tree)
452 }
453 };
454
455 let response = self
457 .request_indefinitely(
458 Request::RewardMerkleTreeV2(height, *view),
459 RequestType::Batched,
460 response_validation_fn,
461 )
462 .await
463 .with_context(|| "failed to request reward accounts")?;
464
465 tracing::info!("Fetched RewardMerkleTreeV2 for height: {height}");
466
467 Ok(response)
468 }
469
470 async fn fetch_reward_accounts_v1(
471 &self,
472 _instance: &NodeState,
473 height: u64,
474 view: ViewNumber,
475 reward_merkle_tree_root: RewardMerkleCommitmentV1,
476 accounts: Vec<RewardAccountV1>,
477 ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
478 tracing::info!("Fetching v1 reward accounts for height: {height}, view: {view}");
479
480 let accounts_clone = accounts.clone();
482
483 let response_validation_fn = move |_request: &Request, response: Response| {
485 let accounts_clone = accounts_clone.clone();
487
488 async move {
489 let Response::RewardAccountsV1(reward_merkle_tree) = response else {
491 return Err(anyhow::anyhow!("expected v1 reward accounts response"));
492 };
493
494 let mut proofs = Vec::new();
496 for account in accounts_clone {
497 let (proof, _) =
498 RewardAccountProofV1::prove(&reward_merkle_tree, account.into())
499 .with_context(|| format!("response was missing account {account}"))?;
500 proof.verify(&reward_merkle_tree_root).with_context(|| {
501 format!(
502 "invalid proof for v1 reward account {account}, root: \
503 {reward_merkle_tree_root}"
504 )
505 })?;
506 proofs.push(proof);
507 }
508
509 Ok(proofs)
510 }
511 };
512
513 let response = self
515 .request_indefinitely(
516 Request::RewardAccountsV1(height, *view, accounts),
517 RequestType::Batched,
518 response_validation_fn,
519 )
520 .await
521 .with_context(|| "failed to request v1 reward accounts")?;
522
523 tracing::info!("Fetched v1 reward accounts for height: {height}, view: {view}");
524
525 Ok(response)
526 }
527
528 async fn fetch_state_cert(
529 &self,
530 epoch: u64,
531 ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
532 tracing::info!("Fetching state cert for epoch: {epoch}");
533
534 let response_validation_fn = move |_request: &Request, response: Response| async move {
536 let Response::StateCert(state_cert) = response else {
538 return Err(anyhow::anyhow!("expected state cert response"));
539 };
540
541 Ok(state_cert)
542 };
543
544 let response = self
546 .request_indefinitely(
547 Request::StateCert(epoch),
548 RequestType::Batched,
549 response_validation_fn,
550 )
551 .await
552 .with_context(|| "failed to request state cert")?;
553
554 tracing::info!("Fetched state cert for epoch: {epoch}");
555
556 Ok(response)
557 }
558
559 fn is_local(&self) -> bool {
560 false
561 }
562}