Skip to main content

espresso_node/request_response/catchup/
state.rs

1use std::sync::Arc;
2
3use alloy::primitives::U256;
4use anyhow::Context;
5use async_trait::async_trait;
6use committable::{Commitment, Committable};
7use espresso_types::{
8    BackoffParams, BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment, Leaf2,
9    NodeState, PubKey, SeqTypes,
10    traits::{SequencerPersistence, StateCatchup},
11    v0_3::{ChainConfig, RewardAccountProofV1, RewardAccountV1, RewardMerkleCommitmentV1},
12    v0_4::{
13        PermittedRewardMerkleTreeV2, RewardAccountV2, RewardMerkleCommitmentV2,
14        forgotten_accounts_include,
15    },
16};
17use hotshot::traits::NodeImplementation;
18use hotshot_new_protocol::utils::verify_leaf_chain_with_cert2;
19use hotshot_types::{
20    data::ViewNumber, message::UpgradeLock,
21    simple_certificate::LightClientStateUpdateCertificateV2, stake_table::HSStakeTable,
22    traits::network::ConnectedNetwork, utils::verify_leaf_chain,
23};
24use jf_merkle_tree_compat::{ForgetableMerkleTreeScheme, MerkleTreeScheme};
25use request_response::RequestType;
26use tokio::time::timeout;
27use versions::{EPOCH_VERSION, NEW_PROTOCOL_VERSION};
28
29use crate::{
30    api::RewardMerkleTreeV2Data,
31    request_response::{
32        RequestResponseProtocol,
33        request::{Request, Response},
34    },
35};
36
37#[async_trait]
38impl<I: NodeImplementation<SeqTypes>, N: ConnectedNetwork<PubKey>, P: SequencerPersistence>
39    StateCatchup for RequestResponseProtocol<I, N, P>
40{
41    async fn try_fetch_leaf(
42        &self,
43        _retry: usize,
44        height: u64,
45        stake_table: HSStakeTable<SeqTypes>,
46        success_threshold: U256,
47    ) -> anyhow::Result<Leaf2> {
48        // Timeout after a few batches
49        let timeout_duration = self.config.request_batch_interval * 3;
50
51        // Fetch the leaf
52        timeout(
53            timeout_duration,
54            self.fetch_leaf(height, stake_table, success_threshold),
55        )
56        .await
57        .with_context(|| "timed out while fetching leaf")?
58    }
59
60    async fn try_fetch_accounts(
61        &self,
62        _retry: usize,
63        instance: &NodeState,
64        height: u64,
65        view: ViewNumber,
66        fee_merkle_tree_root: FeeMerkleCommitment,
67        accounts: &[FeeAccount],
68    ) -> anyhow::Result<Vec<FeeAccountProof>> {
69        // Timeout after a few batches
70        let timeout_duration = self.config.request_batch_interval * 3;
71
72        // Fetch the accounts
73        timeout(
74            timeout_duration,
75            self.fetch_accounts(
76                instance,
77                height,
78                view,
79                fee_merkle_tree_root,
80                accounts.to_vec(),
81            ),
82        )
83        .await
84        .with_context(|| "timed out while fetching accounts")?
85    }
86
87    async fn try_remember_blocks_merkle_tree(
88        &self,
89        _retry: usize,
90        instance: &NodeState,
91        height: u64,
92        view: ViewNumber,
93        mt: &mut BlockMerkleTree,
94    ) -> anyhow::Result<()> {
95        // Timeout after a few batches
96        let timeout_duration = self.config.request_batch_interval * 3;
97
98        // Remember the blocks merkle tree
99        timeout(
100            timeout_duration,
101            self.remember_blocks_merkle_tree(instance, height, view, mt),
102        )
103        .await
104        .with_context(|| "timed out while remembering blocks merkle tree")?
105    }
106
107    async fn try_fetch_chain_config(
108        &self,
109        _retry: usize,
110        commitment: Commitment<ChainConfig>,
111    ) -> anyhow::Result<ChainConfig> {
112        // Timeout after a few batches
113        let timeout_duration = self.config.request_batch_interval * 3;
114
115        // Fetch the chain config
116        timeout(timeout_duration, self.fetch_chain_config(commitment))
117            .await
118            .with_context(|| "timed out while fetching chain config")?
119    }
120
121    async fn try_fetch_reward_merkle_tree_v2(
122        &self,
123        _retry: usize,
124        height: u64,
125        view: ViewNumber,
126        reward_merkle_tree_root: RewardMerkleCommitmentV2,
127        accounts: Arc<Vec<RewardAccountV2>>,
128    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
129        // Timeout after a few batches
130        let timeout_duration = self.config.request_batch_interval * 3;
131
132        // Fetch the reward accounts
133        timeout(
134            timeout_duration,
135            self.fetch_reward_merkle_tree_v2(height, view, reward_merkle_tree_root, accounts),
136        )
137        .await
138        .with_context(|| "timed out while fetching reward merkle tree v2")?
139    }
140
141    async fn try_fetch_reward_accounts_v1(
142        &self,
143        _retry: usize,
144        instance: &NodeState,
145        height: u64,
146        view: ViewNumber,
147        reward_merkle_tree_root: RewardMerkleCommitmentV1,
148        accounts: &[RewardAccountV1],
149    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
150        // Timeout after a few batches
151        let timeout_duration = self.config.request_batch_interval * 3;
152
153        // Fetch the reward accounts
154        timeout(
155            timeout_duration,
156            self.fetch_reward_accounts_v1(
157                instance,
158                height,
159                view,
160                reward_merkle_tree_root,
161                accounts.to_vec(),
162            ),
163        )
164        .await
165        .with_context(|| "timed out while fetching reward accounts")?
166    }
167
168    async fn try_fetch_state_cert(
169        &self,
170        _retry: usize,
171        epoch: u64,
172    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
173        let timeout_duration = self.config.request_batch_interval * 3;
174
175        timeout(timeout_duration, self.fetch_state_cert(epoch))
176            .await
177            .with_context(|| "timed out while fetching state cert")?
178    }
179
180    fn backoff(&self) -> &BackoffParams {
181        unreachable!()
182    }
183
184    fn name(&self) -> String {
185        "request-response".to_string()
186    }
187
188    async fn fetch_accounts(
189        &self,
190        _instance: &NodeState,
191        height: u64,
192        view: ViewNumber,
193        fee_merkle_tree_root: FeeMerkleCommitment,
194        accounts: Vec<FeeAccount>,
195    ) -> anyhow::Result<Vec<FeeAccountProof>> {
196        tracing::info!("Fetching accounts for height: {height}, view: {view}");
197
198        // Clone things we need in the first closure
199        let accounts_clone = accounts.clone();
200        let response_validation_fn = move |_request: &Request, response: Response| {
201            // Clone again
202            let accounts_clone = accounts_clone.clone();
203
204            async move {
205                // Make sure the response is an accounts response
206                let Response::Accounts(fee_merkle_tree) = response else {
207                    return Err(anyhow::anyhow!("expected accounts response"));
208                };
209
210                // Verify the merkle proofs
211                let mut proofs = Vec::new();
212                for account in accounts_clone {
213                    let (proof, _) = FeeAccountProof::prove(&fee_merkle_tree, account.into())
214                        .with_context(|| format!("response was missing account {account}"))?;
215                    proof.verify(&fee_merkle_tree_root).with_context(|| {
216                        format!(
217                            "invalid proof for fee account {account}, root: {fee_merkle_tree_root}"
218                        )
219                    })?;
220                    proofs.push(proof);
221                }
222
223                Ok(proofs)
224            }
225        };
226
227        // Wait for the protocol to send us the accounts
228        let response = self
229            .request_indefinitely(
230                Request::Accounts(height, *view, accounts),
231                RequestType::Batched,
232                response_validation_fn,
233            )
234            .await
235            .with_context(|| "failed to request accounts")?;
236
237        tracing::info!("Fetched accounts for height: {height}, view: {view}");
238
239        Ok(response)
240    }
241
242    async fn fetch_leaf(
243        &self,
244        height: u64,
245        stake_table: HSStakeTable<SeqTypes>,
246        success_threshold: U256,
247    ) -> anyhow::Result<Leaf2> {
248        tracing::info!("Fetching leaf for height: {height}");
249
250        // Fetch the leaf chain. For new-protocol heights the responder returns the leaf range
251        // `[height..=cert2_height]`; for legacy-protocol heights, it returns a 3-chain.
252        let leaf_chain = self
253            .request_indefinitely(
254                Request::Leaf(height),
255                RequestType::Batched,
256                move |_request: &Request, response: Response| async move {
257                    let Response::Leaf(leaves) = response else {
258                        return Err(anyhow::anyhow!("expected leaf response"));
259                    };
260                    if leaves.is_empty() {
261                        return Err(anyhow::anyhow!(
262                            "received empty leaf chain for height {height}"
263                        ));
264                    }
265                    Ok(leaves)
266                },
267            )
268            .await
269            .with_context(|| "failed to request leaf chain")?;
270
271        let result = if leaf_chain[0].block_header().version() >= NEW_PROTOCOL_VERSION {
272            // New protocol: pair the leaf range with its finalizing cert2 for verification.
273            let upgrade_lock =
274                UpgradeLock::<SeqTypes>::new(versions::Upgrade::trivial(NEW_PROTOCOL_VERSION));
275            let cert2 = self
276                .request_indefinitely(
277                    Request::Cert2(height),
278                    RequestType::Batched,
279                    move |_request: &Request, response: Response| async move {
280                        let Response::Cert2(cert2) = response else {
281                            return Err(anyhow::anyhow!("expected cert2 response"));
282                        };
283                        if cert2.data.block_number < height {
284                            return Err(anyhow::anyhow!(
285                                "received cert2 at height {} below requested height {height}",
286                                cert2.data.block_number
287                            ));
288                        }
289                        Ok(cert2)
290                    },
291                )
292                .await
293                .with_context(|| format!("failed to request cert2 at or above height {height}"))?;
294
295            verify_leaf_chain_with_cert2(
296                leaf_chain,
297                &stake_table,
298                success_threshold,
299                height,
300                &upgrade_lock,
301                cert2,
302            )
303            .await
304            .with_context(|| "leaf chain verification with cert2 failed")?
305        } else {
306            let upgrade_lock =
307                UpgradeLock::<SeqTypes>::new(versions::Upgrade::trivial(EPOCH_VERSION));
308            verify_leaf_chain(
309                leaf_chain,
310                &stake_table,
311                success_threshold,
312                height,
313                &upgrade_lock,
314            )
315            .await
316            .with_context(|| "leaf chain verification failed")?
317        };
318
319        tracing::info!("Fetched leaf for height: {height}");
320        Ok(result)
321    }
322
323    async fn fetch_chain_config(
324        &self,
325        commitment: Commitment<ChainConfig>,
326    ) -> anyhow::Result<ChainConfig> {
327        tracing::info!("Fetching chain config with commitment: {commitment}");
328
329        // Create the response validation function
330        let response_validation_fn = move |_request: &Request, response: Response| {
331            async move {
332                // Make sure the response is a chain config response
333                let Response::ChainConfig(chain_config) = response else {
334                    return Err(anyhow::anyhow!("expected chain config response"));
335                };
336
337                // Make sure the commitments match
338                if commitment != chain_config.commit() {
339                    return Err(anyhow::anyhow!("chain config commitment mismatch"));
340                }
341
342                Ok(chain_config)
343            }
344        };
345
346        // Wait for the protocol to send us the chain config
347        let response = self
348            .request_indefinitely(
349                Request::ChainConfig(commitment),
350                RequestType::Batched,
351                response_validation_fn,
352            )
353            .await
354            .with_context(|| "failed to request chain config")?;
355
356        tracing::info!("Fetched chain config with commitment: {commitment}");
357
358        Ok(response)
359    }
360
361    async fn remember_blocks_merkle_tree(
362        &self,
363        _instance: &NodeState,
364        height: u64,
365        view: ViewNumber,
366        mt: &mut BlockMerkleTree,
367    ) -> anyhow::Result<()> {
368        tracing::info!("Fetching blocks frontier for height: {height}, view: {view}");
369
370        // Clone the merkle tree
371        let mt_clone = mt.clone();
372
373        // Create the response validation function
374        let response_validation_fn = move |_request: &Request, response: Response| {
375            // Clone the merkle tree
376            let mut block_merkle_tree = mt_clone.clone();
377
378            async move {
379                // Make sure the response is a blocks frontier response
380                let Response::BlocksFrontier(blocks_frontier) = response else {
381                    return Err(anyhow::anyhow!("expected blocks frontier response"));
382                };
383
384                // Get the leaf element associated with the proof
385                let leaf_elem = blocks_frontier
386                    .elem()
387                    .with_context(|| "provided frontier is missing leaf element")?;
388
389                // Verify the block proof
390                block_merkle_tree
391                    .remember(
392                        block_merkle_tree.num_leaves() - 1,
393                        *leaf_elem,
394                        blocks_frontier,
395                    )
396                    .with_context(|| "merkle tree verification failed")?;
397
398                // Return the verified merkle tree
399                Ok(block_merkle_tree)
400            }
401        };
402
403        // Wait for the protocol to send us the blocks frontier
404        let response = self
405            .request_indefinitely(
406                Request::BlocksFrontier(height, *view),
407                RequestType::Batched,
408                response_validation_fn,
409            )
410            .await
411            .with_context(|| "failed to request blocks frontier")?;
412
413        // Replace the merkle tree
414        *mt = response;
415
416        tracing::info!("Fetched blocks frontier for height: {height}, view: {view}");
417
418        Ok(())
419    }
420
421    async fn fetch_reward_merkle_tree_v2(
422        &self,
423        height: u64,
424        view: ViewNumber,
425        reward_merkle_tree_root: RewardMerkleCommitmentV2,
426        accounts: Arc<Vec<RewardAccountV2>>,
427    ) -> anyhow::Result<PermittedRewardMerkleTreeV2> {
428        tracing::info!("Fetching RewardMerkleTreeV2 for height: {height}");
429
430        // Create the response validation function
431        let response_validation_fn = move |_request: &Request, response: Response| {
432            let accounts = accounts.clone();
433            async move {
434                // Make sure the response is a reward accounts response
435                let Response::RewardMerkleTreeV2(tree_bytes) = response else {
436                    return Err(anyhow::anyhow!("expected reward accounts response"));
437                };
438
439                let tree_data = bincode::deserialize::<RewardMerkleTreeV2Data>(&tree_bytes)
440                    .context(
441                        "Failed to deserialize RewardMerkleTreeV2 for height {height} from remote",
442                    )?;
443
444                // Verify the tree's commitment
445                let reward_merkle_tree: PermittedRewardMerkleTreeV2 =
446                    PermittedRewardMerkleTreeV2::try_from_kv_set(tree_data.balances).await?;
447
448                anyhow::ensure!(reward_merkle_tree.tree.commitment() == reward_merkle_tree_root);
449                anyhow::ensure!(!forgotten_accounts_include(&reward_merkle_tree, &accounts));
450
451                Ok(reward_merkle_tree)
452            }
453        };
454
455        // Wait for the protocol to send us the reward accounts
456        let response = self
457            .request_indefinitely(
458                Request::RewardMerkleTreeV2(height, *view),
459                RequestType::Batched,
460                response_validation_fn,
461            )
462            .await
463            .with_context(|| "failed to request reward accounts")?;
464
465        tracing::info!("Fetched RewardMerkleTreeV2 for height: {height}");
466
467        Ok(response)
468    }
469
470    async fn fetch_reward_accounts_v1(
471        &self,
472        _instance: &NodeState,
473        height: u64,
474        view: ViewNumber,
475        reward_merkle_tree_root: RewardMerkleCommitmentV1,
476        accounts: Vec<RewardAccountV1>,
477    ) -> anyhow::Result<Vec<RewardAccountProofV1>> {
478        tracing::info!("Fetching v1 reward accounts for height: {height}, view: {view}");
479
480        // Clone things we need in the first closure
481        let accounts_clone = accounts.clone();
482
483        // Create the response validation function
484        let response_validation_fn = move |_request: &Request, response: Response| {
485            // Clone again
486            let accounts_clone = accounts_clone.clone();
487
488            async move {
489                // Make sure the response is a reward accounts response
490                let Response::RewardAccountsV1(reward_merkle_tree) = response else {
491                    return Err(anyhow::anyhow!("expected v1 reward accounts response"));
492                };
493
494                // Verify the merkle proofs
495                let mut proofs = Vec::new();
496                for account in accounts_clone {
497                    let (proof, _) =
498                        RewardAccountProofV1::prove(&reward_merkle_tree, account.into())
499                            .with_context(|| format!("response was missing account {account}"))?;
500                    proof.verify(&reward_merkle_tree_root).with_context(|| {
501                        format!(
502                            "invalid proof for v1 reward account {account}, root: \
503                             {reward_merkle_tree_root}"
504                        )
505                    })?;
506                    proofs.push(proof);
507                }
508
509                Ok(proofs)
510            }
511        };
512
513        // Wait for the protocol to send us the reward accounts
514        let response = self
515            .request_indefinitely(
516                Request::RewardAccountsV1(height, *view, accounts),
517                RequestType::Batched,
518                response_validation_fn,
519            )
520            .await
521            .with_context(|| "failed to request v1 reward accounts")?;
522
523        tracing::info!("Fetched v1 reward accounts for height: {height}, view: {view}");
524
525        Ok(response)
526    }
527
528    async fn fetch_state_cert(
529        &self,
530        epoch: u64,
531    ) -> anyhow::Result<LightClientStateUpdateCertificateV2<SeqTypes>> {
532        tracing::info!("Fetching state cert for epoch: {epoch}");
533
534        // Create the response validation function
535        let response_validation_fn = move |_request: &Request, response: Response| async move {
536            // Make sure the response is a state cert response
537            let Response::StateCert(state_cert) = response else {
538                return Err(anyhow::anyhow!("expected state cert response"));
539            };
540
541            Ok(state_cert)
542        };
543
544        // Wait for the protocol to send us the state cert
545        let response = self
546            .request_indefinitely(
547                Request::StateCert(epoch),
548                RequestType::Batched,
549                response_validation_fn,
550            )
551            .await
552            .with_context(|| "failed to request state cert")?;
553
554        tracing::info!("Fetched state cert for epoch: {epoch}");
555
556        Ok(response)
557    }
558
559    fn is_local(&self) -> bool {
560        false
561    }
562}