espresso_node/request_response/catchup/
state.rs

1use std::sync::Arc;
2
3use alloy::primitives::U256;
4use anyhow::Context;
5use async_trait::async_trait;
6use committable::{Commitment, Committable};
7use espresso_types::{
8    BackoffParams, BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2,
9    NodeState, PubKey, SeqTypes,
10    traits::{SequencerPersistence, StateCatchup},
11    v0_3::{ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1},
12    v0_4::{
13        PermittedRewardMerkleTreeV2, RewardAccountV2, RewardMerkleCommitmentV2,
14        forgotten_accounts_include,
15    },
16};
17use hotshot::traits::NodeImplementation;
18use hotshot_types::{
19    data::ViewNumber, message::UpgradeLock,
20    simple_certificate::LightClientStateUpdateCertificateV2, stake_table::HSStakeTable,
21    traits::network::ConnectedNetwork, utils::verify_leaf_chain,
22};
23use jf_merkle_tree_compat::{ForgetableMerkleTreeScheme, MerkleTreeScheme};
24use request_response::RequestType;
25use tokio::time::timeout;
26use versions::EPOCH_VERSION;
27
28use crate::{
29    api::RewardMerkleTreeV2Data,
30    request_response::{
31        RequestResponseProtocol,
32        request::{Request, Response},
33    },
34};
35
36#[async_trait]
37impl<I: NodeImplementation<SeqTypes>, N: ConnectedNetwork<PubKey>, P: SequencerPersistence>
38    StateCatchup for RequestResponseProtocol<I, N, P>
39{
40    async fn try_fetch_leaf(
41        &self,
42        _retry: usize,
43        height: u64,
44        stake_table: HSStakeTable<SeqTypes>,
45        success_threshold: U256,
46    ) -> anyhow::Result<Leaf2> {
47        // Timeout after a few batches
48        let timeout_duration = self.config.request_batch_interval * 3;
49
50        // Fetch the leaf
51        timeout(
52            timeout_duration,
53            self.fetch_leaf(height, stake_table, success_threshold),
54        )
55        .await
56        .with_context(|| "timed out while fetching leaf")?
57    }
58
59    async fn try_fetch_accounts(
60        &self,
61        _retry: usize,
62        instance: &NodeState,
63        height: u64,
64        view: ViewNumber,
65        fee_merkle_tree_root: FeeMerkleCommitment,
66        accounts: &[FeeAccount],
67    ) -> anyhow::Result<Vec<FeeAccountProof>> {
68        // Timeout after a few batches
69        let timeout_duration = self.config.request_batch_interval * 3;
70
71        // Fetch the accounts
72        timeout(
73            timeout_duration,
74            self.fetch_accounts(
75                instance,
76                height,
77                view,
78                fee_merkle_tree_root,
79                accounts.to_vec(),
80            ),
81        )
82        .await
83        .with_context(|| "timed out while fetching accounts")?
84    }
85
86    async fn try_remember_blocks_merkle_tree(
87        &self,
88        _retry: usize,
89        instance: &NodeState,
90        height: u64,
91        view: ViewNumber,
92        mt: &mut BlockMerkleTree,
93    ) -> anyhow::Result<()> {
94        // Timeout after a few batches
95        let timeout_duration = self.config.request_batch_interval * 3;
96
97        // Remember the blocks merkle tree
98        timeout(
99            timeout_duration,
100            self.remember_blocks_merkle_tree(instance, height, view, mt),
101        )
102        .await
103        .with_context(|| "timed out while remembering blocks merkle tree")?
104    }
105
106    async fn try_fetch_chain_config(
107        &self,
108        _retry: usize,
109        commitment: Commitment<ChainConfig>,
110    ) -> anyhow::Result<ChainConfig> {
111        // Timeout after a few batches
112        let timeout_duration = self.config.request_batch_interval * 3;
113
114        // Fetch the chain config
115        timeout(timeout_duration, self.fetch_chain_config(commitment))
116            .await
117            .with_context(|| "timed out while fetching chain config")?
118    }
119
120    async fn try_fetch_reward_merkle_tree_v2(
121        &self,
122        _retry: usize,
123        height: u64,
124        view: ViewNumber,
125        reward_merkle_tree_root: RewardMerkleCommitmentV2,
126        accounts: Arc<Vec<RewardAccountV2>>,
127    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
128        // Timeout after a few batches
129        let timeout_duration = self.config.request_batch_interval * 3;
130
131        // Fetch the reward accounts
132        timeout(
133            timeout_duration,
134            self.fetch_reward_merkle_tree_v2(height, view, reward_merkle_tree_root, accounts),
135        )
136        .await
137        .with_context(|| "timed out while fetching reward merkle tree v2")?
138    }
139
140    async fn try_fetch_reward_accounts_v1(
141        &self,
142        _retry: usize,
143        instance: &NodeState,
144        height: u64,
145        view: ViewNumber,
146        reward_merkle_tree_root: RewardMerkleCommitmentV1,
147        accounts: &[RewardAccountV1],
148    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
149        // Timeout after a few batches
150        let timeout_duration = self.config.request_batch_interval * 3;
151
152        // Fetch the reward accounts
153        timeout(
154            timeout_duration,
155            self.fetch_reward_accounts_v1(
156                instance,
157                height,
158                view,
159                reward_merkle_tree_root,
160                accounts.to_vec(),
161            ),
162        )
163        .await
164        .with_context(|| "timed out while fetching reward accounts")?
165    }
166
167    async fn try_fetch_state_cert(
168        &self,
169        _retry: usize,
170        epoch: u64,
171    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
172        let timeout_duration = self.config.request_batch_interval * 3;
173
174        timeout(timeout_duration, self.fetch_state_cert(epoch))
175            .await
176            .with_context(|| "timed out while fetching state cert")?
177    }
178
179    fn backoff(&self) -> &BackoffParams {
180        unreachable!()
181    }
182
183    fn name(&self) -> String {
184        "request-response".to_string()
185    }
186
187    async fn fetch_accounts(
188        &self,
189        _instance: &NodeState,
190        height: u64,
191        view: ViewNumber,
192        fee_merkle_tree_root: FeeMerkleCommitment,
193        accounts: Vec<FeeAccount>,
194    ) -> anyhow::Result<Vec<FeeAccountProof>> {
195        tracing::info!("Fetching accounts for height: {height}, view: {view}");
196
197        // Clone things we need in the first closure
198        let accounts_clone = accounts.clone();
199        let response_validation_fn = move |_request: &Request, response: Response| {
200            // Clone again
201            let accounts_clone = accounts_clone.clone();
202
203            async move {
204                // Make sure the response is an accounts response
205                let Response::Accounts(fee_merkle_tree) = response else {
206                    return Err(anyhow::anyhow!("expected accounts response"));
207                };
208
209                // Verify the merkle proofs
210                let mut proofs = Vec::new();
211                for account in accounts_clone {
212                    let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree, account.into())
213                        .with_context(|| format!("response was missing account {account}"))?;
214                    proof.verify(&fee_merkle_tree_root).with_context(|| {
215                        format!(
216                            "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
217                        )
218                    })?;
219                    proofs.push(proof);
220                }
221
222                Ok(proofs)
223            }
224        };
225
226        // Wait for the protocol to send us the accounts
227        let response = self
228            .request_indefinitely(
229                Request::Accounts(height, *view, accounts),
230                RequestType::Batched,
231                response_validation_fn,
232            )
233            .await
234            .with_context(|| "failed to request accounts")?;
235
236        tracing::info!("Fetched accounts for height: {height}, view: {view}");
237
238        Ok(response)
239    }
240
241    async fn fetch_leaf(
242        &self,
243        height: u64,
244        stake_table: HSStakeTable<SeqTypes>,
245        success_threshold: U256,
246    ) -> anyhow::Result<Leaf2> {
247        tracing::info!("Fetching leaf for height: {height}");
248
249        // Clone things we need in the first closure
250        let stake_table_clone = stake_table.clone();
251        let response_validation_fn = move |_request: &Request, response: Response| {
252            // Clone again
253            let stake_table_clone = stake_table_clone.clone();
254
255            async move {
256                // Make sure the response is a leaf response
257                let Response::Leaf(leaf_chain) = response else {
258                    return Err(anyhow::anyhow!("expected leaf response"));
259                };
260
261                // Verify the leaf chain
262                let leaf = verify_leaf_chain(
263                    leaf_chain,
264                    &stake_table_clone,
265                    success_threshold,
266                    height,
267                    &UpgradeLock::<SeqTypes>::new(versions::Upgrade::trivial(EPOCH_VERSION)),
268                )
269                .await
270                .with_context(|| "leaf chain verification failed")?;
271
272                Ok(leaf)
273            }
274        };
275
276        // Wait for the protocol to send us the accounts
277        let response = self
278            .request_indefinitely(
279                Request::Leaf(height),
280                RequestType::Batched,
281                response_validation_fn,
282            )
283            .await
284            .with_context(|| "failed to request leaf")?;
285
286        tracing::info!("Fetched leaf for height: {height}");
287
288        Ok(response)
289    }
290
291    async fn fetch_chain_config(
292        &self,
293        commitment: Commitment<ChainConfig>,
294    ) -> anyhow::Result<ChainConfig> {
295        tracing::info!("Fetching chain config with commitment: {commitment}");
296
297        // Create the response validation function
298        let response_validation_fn = move |_request: &Request, response: Response| {
299            async move {
300                // Make sure the response is a chain config response
301                let Response::ChainConfig(chain_config) = response else {
302                    return Err(anyhow::anyhow!("expected chain config response"));
303                };
304
305                // Make sure the commitments match
306                if commitment != chain_config.commit() {
307                    return Err(anyhow::anyhow!("chain config commitment mismatch"));
308                }
309
310                Ok(chain_config)
311            }
312        };
313
314        // Wait for the protocol to send us the chain config
315        let response = self
316            .request_indefinitely(
317                Request::ChainConfig(commitment),
318                RequestType::Batched,
319                response_validation_fn,
320            )
321            .await
322            .with_context(|| "failed to request chain config")?;
323
324        tracing::info!("Fetched chain config with commitment: {commitment}");
325
326        Ok(response)
327    }
328
329    async fn remember_blocks_merkle_tree(
330        &self,
331        _instance: &NodeState,
332        height: u64,
333        view: ViewNumber,
334        mt: &mut BlockMerkleTree,
335    ) -> anyhow::Result<()> {
336        tracing::info!("Fetching blocks frontier for height: {height}, view: {view}");
337
338        // Clone the merkle tree
339        let mt_clone = mt.clone();
340
341        // Create the response validation function
342        let response_validation_fn = move |_request: &Request, response: Response| {
343            // Clone the merkle tree
344            let mut block_merkle_tree = mt_clone.clone();
345
346            async move {
347                // Make sure the response is a blocks frontier response
348                let Response::BlocksFrontier(blocks_frontier) = response else {
349                    return Err(anyhow::anyhow!("expected blocks frontier response"));
350                };
351
352                // Get the leaf element associated with the proof
353                let leaf_elem = blocks_frontier
354                    .elem()
355                    .with_context(|| "provided frontier is missing leaf element")?;
356
357                // Verify the block proof
358                block_merkle_tree
359                    .remember(
360                        block_merkle_tree.num_leaves() - 1,
361                        *leaf_elem,
362                        blocks_frontier,
363                    )
364                    .with_context(|| "merkle tree verification failed")?;
365
366                // Return the verified merkle tree
367                Ok(block_merkle_tree)
368            }
369        };
370
371        // Wait for the protocol to send us the blocks frontier
372        let response = self
373            .request_indefinitely(
374                Request::BlocksFrontier(height, *view),
375                RequestType::Batched,
376                response_validation_fn,
377            )
378            .await
379            .with_context(|| "failed to request blocks frontier")?;
380
381        // Replace the merkle tree
382        *mt = response;
383
384        tracing::info!("Fetched blocks frontier for height: {height}, view: {view}");
385
386        Ok(())
387    }
388
389    async fn fetch_reward_merkle_tree_v2(
390        &self,
391        height: u64,
392        view: ViewNumber,
393        reward_merkle_tree_root: RewardMerkleCommitmentV2,
394        accounts: Arc<Vec<RewardAccountV2>>,
395    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
396        tracing::info!("Fetching RewardMerkleTreeV2 for height: {height}");
397
398        // Create the response validation function
399        let response_validation_fn = move |_request: &Request, response: Response| {
400            let accounts = accounts.clone();
401            async move {
402                // Make sure the response is a reward accounts response
403                let Response::RewardMerkleTreeV2(tree_bytes) = response else {
404                    return Err(anyhow::anyhow!("expected reward accounts response"));
405                };
406
407                let tree_data = bincode::deserialize::<RewardMerkleTreeV2Data>(&tree_bytes)
408                    .context(
409                        "Failed to deserialize RewardMerkleTreeV2 for height {height} from remote",
410                    )?;
411
412                // Verify the tree's commitment
413                let reward_merkle_tree: PermittedRewardMerkleTreeV2 =
414                    PermittedRewardMerkleTreeV2::try_from_kv_set(tree_data.balances).await?;
415
416                anyhow::ensure!(reward_merkle_tree.tree.commitment() == reward_merkle_tree_root);
417                anyhow::ensure!(!forgotten_accounts_include(&reward_merkle_tree, &accounts));
418
419                Ok(reward_merkle_tree)
420            }
421        };
422
423        // Wait for the protocol to send us the reward accounts
424        let response = self
425            .request_indefinitely(
426                Request::RewardMerkleTreeV2(height, *view),
427                RequestType::Batched,
428                response_validation_fn,
429            )
430            .await
431            .with_context(|| "failed to request reward accounts")?;
432
433        tracing::info!("Fetched RewardMerkleTreeV2 for height: {height}");
434
435        Ok(response)
436    }
437
438    async fn fetch_reward_accounts_v1(
439        &self,
440        _instance: &NodeState,
441        height: u64,
442        view: ViewNumber,
443        reward_merkle_tree_root: RewardMerkleCommitmentV1,
444        accounts: Vec<RewardAccountV1>,
445    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
446        tracing::info!("Fetching v1 reward accounts for height: {height}, view: {view}");
447
448        // Clone things we need in the first closure
449        let accounts_clone = accounts.clone();
450
451        // Create the response validation function
452        let response_validation_fn = move |_request: &Request, response: Response| {
453            // Clone again
454            let accounts_clone = accounts_clone.clone();
455
456            async move {
457                // Make sure the response is a reward accounts response
458                let Response::RewardAccountsV1(reward_merkle_tree) = response else {
459                    return Err(anyhow::anyhow!("expected v1 reward accounts response"));
460                };
461
462                // Verify the merkle proofs
463                let mut proofs = Vec::new();
464                for account in accounts_clone {
465                    let (proof, _) =
466                        RewardAccountProofV1::prove(&reward_merkle_tree, account.into())
467                            .with_context(|| format!("response was missing account {account}"))?;
468                    proof.verify(&reward_merkle_tree_root).with_context(|| {
469                        format!(
470                            "invalid proof for v1 reward account {account}, root: \
471                             {reward_merkle_tree_root}"
472                        )
473                    })?;
474                    proofs.push(proof);
475                }
476
477                Ok(proofs)
478            }
479        };
480
481        // Wait for the protocol to send us the reward accounts
482        let response = self
483            .request_indefinitely(
484                Request::RewardAccountsV1(height, *view, accounts),
485                RequestType::Batched,
486                response_validation_fn,
487            )
488            .await
489            .with_context(|| "failed to request v1 reward accounts")?;
490
491        tracing::info!("Fetched v1 reward accounts for height: {height}, view: {view}");
492
493        Ok(response)
494    }
495
496    async fn fetch_state_cert(
497        &self,
498        epoch: u64,
499    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
500        tracing::info!("Fetching state cert for epoch: {epoch}");
501
502        // Create the response validation function
503        let response_validation_fn = move |_request: &Request, response: Response| async move {
504            // Make sure the response is a state cert response
505            let Response::StateCert(state_cert) = response else {
506                return Err(anyhow::anyhow!("expected state cert response"));
507            };
508
509            Ok(state_cert)
510        };
511
512        // Wait for the protocol to send us the state cert
513        let response = self
514            .request_indefinitely(
515                Request::StateCert(epoch),
516                RequestType::Batched,
517                response_validation_fn,
518            )
519            .await
520            .with_context(|| "failed to request state cert")?;
521
522        tracing::info!("Fetched state cert for epoch: {epoch}");
523
524        Ok(response)
525    }
526
527    fn is_local(&self) -> bool {
528        false
529    }
530}