espresso_node/request_response/
data_source.rs

1//! This file contains the [`DataSource`] trait. This trait allows the [`RequestResponseProtocol`]
2//! to calculate/derive a response for a specific request. In the confirmation layer the implementer
3//! would be something like a [`FeeMerkleTree`] for fee catchup
4
5use std::{marker::PhantomData, sync::Arc};
6
7use anyhow::{Context, Result, bail};
8use async_trait::async_trait;
9use espresso_types::{
10    NodeState, PubKey, SeqTypes, retain_accounts,
11    traits::SequencerPersistence,
12    v0_3::{RewardAccountV1, RewardMerkleTreeV1},
13    v0_4::{RewardAccountV2, RewardMerkleTreeV2},
14};
15use hotshot::{SystemContext, traits::NodeImplementation};
16use hotshot_query_service::{
17    data_source::{
18        VersionedDataSource,
19        storage::{FileSystemStorage, NodeStorage, SqlStorage},
20    },
21    node::BlockId,
22};
23use hotshot_types::{data::ViewNumber, traits::network::ConnectedNetwork, vote::HasViewNumber};
24use itertools::Itertools;
25use jf_merkle_tree_compat::{
26    ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, LookupResult,
27    MerkleTreeScheme, UniversalMerkleTreeScheme,
28};
29use request_response::data_source::DataSource as DataSourceTrait;
30
31use super::request::{Request, Response};
32use crate::{
33    api::{BlocksFrontier, RewardMerkleTreeDataSource, RewardMerkleTreeV2Data},
34    catchup::{
35        CatchupStorage, add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
36        add_v2_reward_accounts_to_state,
37    },
38};
39
40/// Query Service Storage types that can be used for request-response data source
41#[derive(Clone)]
42pub enum Storage {
43    Sql(Arc<SqlStorage>),
44    Fs(Arc<FileSystemStorage<SeqTypes>>),
45}
46
47/// A type alias for the consensus handle
48type Consensus<I> = Arc<SystemContext<SeqTypes, I>>;
49
50#[derive(Clone)]
51pub struct DataSource<
52    I: NodeImplementation<SeqTypes>,
53    N: ConnectedNetwork<PubKey>,
54    P: SequencerPersistence,
55> {
56    /// The consensus handle
57    pub consensus: Consensus<I>,
58    /// The node's state
59    pub node_state: NodeState,
60    /// The storage
61    pub storage: Option<Storage>,
62    /// sequencer persistence
63    pub persistence: Arc<P>,
64    /// Phantom data
65    pub phantom: PhantomData<N>,
66}
67
68/// Implement the trait that allows the [`RequestResponseProtocol`] to calculate/derive a response for a specific request
69#[async_trait]
70impl<I: NodeImplementation<SeqTypes>, N: ConnectedNetwork<PubKey>, P: SequencerPersistence>
71    DataSourceTrait<Request> for DataSource<I, N, P>
72{
73    async fn derive_response_for(&self, request: &Request) -> Result<Response> {
74        match request {
75            Request::Accounts(height, view, accounts) => {
76                // Try to get accounts from memory first, then fall back to storage
77                if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await
78                    && let Ok(accounts) =
79                        retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
80                {
81                    return Ok(Response::Accounts(accounts));
82                }
83
84                // Fall back to storage
85                let (merkle_tree, leaf) = match &self.storage {
86                    Some(Storage::Sql(storage)) => storage
87                        .get_accounts(&self.node_state, *height, ViewNumber::new(*view), accounts)
88                        .await
89                        .with_context(|| "failed to get accounts from sql storage")?,
90                    Some(Storage::Fs(_)) => bail!("fs storage not supported for accounts"),
91                    _ => bail!("storage was not initialized"),
92                };
93
94                // If we successfully fetched accounts from storage, try to add them back into the in-memory
95                // state.
96                if let Err(err) = add_fee_accounts_to_state::<N, P>(
97                    &self.consensus.consensus(),
98                    &ViewNumber::new(*view),
99                    accounts,
100                    &merkle_tree,
101                    leaf,
102                )
103                .await
104                {
105                    tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
106                }
107
108                Ok(Response::Accounts(merkle_tree))
109            },
110
111            Request::Leaf(height) => {
112                // Try to get the leaves from memory first, then fall back to storage
113                let mut leaves = self.consensus.consensus().read().await.undecided_leaves();
114                leaves.sort_by_key(|l| l.view_number());
115
116                if let Some((position, mut last_leaf)) =
117                    leaves.iter().find_position(|l| l.height() == *height)
118                {
119                    let mut leaf_chain = vec![last_leaf.clone()];
120                    for leaf in leaves.iter().skip(position + 1) {
121                        if leaf.justify_qc().view_number() == last_leaf.view_number() {
122                            leaf_chain.push(leaf.clone());
123                        } else {
124                            continue;
125                        }
126                        if leaf.view_number() == last_leaf.view_number() + 1 {
127                            // one away from decide
128                            last_leaf = leaf;
129                            break;
130                        }
131                        last_leaf = leaf;
132                    }
133
134                    // Make sure we got one more leaf to confirm the decide
135                    for leaf in leaves
136                        .iter()
137                        .skip_while(|l| l.view_number() <= last_leaf.view_number())
138                    {
139                        if leaf.justify_qc().view_number() == last_leaf.view_number() {
140                            leaf_chain.push(leaf.clone());
141                            return Ok(Response::Leaf(leaf_chain));
142                        }
143                    }
144                }
145
146                // Fall back to storage
147                let leaf_chain = match &self.storage {
148                    Some(Storage::Sql(storage)) => storage
149                        .get_leaf_chain(*height)
150                        .await
151                        .with_context(|| "failed to get leaf from sql storage")?,
152                    // TODO: Actually implement FS storage for some of these
153                    Some(Storage::Fs(_)) => bail!("fs storage not supported for leaf"),
154                    _ => bail!("storage was not initialized"),
155                };
156
157                Ok(Response::Leaf(leaf_chain))
158            },
159            Request::ChainConfig(commitment) => {
160                // Try to get the chain config from memory first, then fall back to storage
161                let chain_config_from_memory = self.consensus.decided_state().await.chain_config;
162                if chain_config_from_memory.commit() == *commitment
163                    && let Some(chain_config) = chain_config_from_memory.resolve()
164                {
165                    return Ok(Response::ChainConfig(chain_config));
166                }
167
168                // Fall back to storage
169                Ok(Response::ChainConfig(match &self.storage {
170                    Some(Storage::Sql(storage)) => storage
171                        .get_chain_config(*commitment)
172                        .await
173                        .with_context(|| "failed to get chain config from sql storage")?,
174                    Some(Storage::Fs(_)) => {
175                        bail!("fs storage not supported for chain config")
176                    },
177                    _ => bail!("storage was not initialized"),
178                }))
179            },
180            Request::BlocksFrontier(height, view) => {
181                // First try to respond from memory
182                let blocks_frontier_from_memory: Option<Result<BlocksFrontier>> = self
183                    .consensus
184                    .state(ViewNumber::new(*view))
185                    .await
186                    .map(|state| {
187                        let tree = &state.block_merkle_tree;
188                        let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
189                        Ok(frontier)
190                    });
191
192                if let Some(Ok(blocks_frontier_from_memory)) = blocks_frontier_from_memory {
193                    return Ok(Response::BlocksFrontier(blocks_frontier_from_memory));
194                } else {
195                    // If we can't get the blocks frontier from memory, fall through to storage
196                    let blocks_frontier_from_storage = match &self.storage {
197                        Some(Storage::Sql(storage)) => storage
198                            .get_frontier(&self.node_state, *height, ViewNumber::new(*view))
199                            .await
200                            .with_context(|| "failed to get blocks frontier from sql storage")?,
201                        Some(Storage::Fs(_)) => {
202                            bail!("fs storage not supported for blocks frontier")
203                        },
204                        _ => bail!("storage was not initialized"),
205                    };
206
207                    Ok(Response::BlocksFrontier(blocks_frontier_from_storage))
208                }
209            },
210            Request::RewardAccountsV2(height, view, accounts) => {
211                // Try to get the reward accounts from memory first, then fall back to storage
212                if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await
213                    && let Ok(reward_accounts) = retain_v2_reward_accounts(
214                        &state.reward_merkle_tree_v2,
215                        accounts.iter().copied(),
216                    )
217                {
218                    return Ok(Response::RewardAccountsV2(reward_accounts));
219                }
220
221                // Fall back to storage
222                let (merkle_tree, leaf) = match &self.storage {
223                    Some(Storage::Sql(storage)) => storage
224                        .get_reward_accounts_v2(
225                            &self.node_state,
226                            *height,
227                            ViewNumber::new(*view),
228                            accounts,
229                        )
230                        .await
231                        .with_context(|| "failed to get accounts from sql storage")?,
232                    Some(Storage::Fs(_)) => {
233                        bail!("fs storage not supported for reward accounts")
234                    },
235                    _ => bail!("storage was not initialized"),
236                };
237
238                // If we successfully fetched accounts from storage, try to add them back into the in-memory
239                // state.
240                if let Err(err) = add_v2_reward_accounts_to_state::<N, P>(
241                    &self.consensus.consensus(),
242                    &ViewNumber::new(*view),
243                    accounts,
244                    &merkle_tree,
245                    leaf,
246                )
247                .await
248                {
249                    tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
250                }
251
252                Ok(Response::RewardAccountsV2(merkle_tree))
253            },
254
255            Request::RewardAccountsV1(height, view, accounts) => {
256                // Try to get the reward accounts from memory first, then fall back to storage
257                if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await
258                    && let Ok(reward_accounts) = retain_v1_reward_accounts(
259                        &state.reward_merkle_tree_v1,
260                        accounts.iter().copied(),
261                    )
262                {
263                    return Ok(Response::RewardAccountsV1(reward_accounts));
264                }
265
266                // Fall back to storage
267                let (merkle_tree, leaf) = match &self.storage {
268                    Some(Storage::Sql(storage)) => storage
269                        .get_reward_accounts_v1(
270                            &self.node_state,
271                            *height,
272                            ViewNumber::new(*view),
273                            accounts,
274                        )
275                        .await
276                        .with_context(|| "failed to get v1 reward accounts from sql storage")?,
277                    Some(Storage::Fs(_)) => {
278                        bail!("fs storage not supported for v1 reward accounts")
279                    },
280                    _ => bail!("storage was not initialized"),
281                };
282
283                // If we successfully fetched accounts from storage, try to add them back into the in-memory
284                // state.
285                if let Err(err) = add_v1_reward_accounts_to_state::<N, P>(
286                    &self.consensus.consensus(),
287                    &ViewNumber::new(*view),
288                    accounts,
289                    &merkle_tree,
290                    leaf,
291                )
292                .await
293                {
294                    tracing::warn!(
295                        ?view,
296                        "Cannot update fetched v1 reward account state: {err:#}"
297                    );
298                }
299
300                Ok(Response::RewardAccountsV1(merkle_tree))
301            },
302            Request::VidShare(block_number, _request_id) => {
303                // Load the VID share from storage
304                let vid_share = match &self.storage {
305                    Some(Storage::Sql(storage)) => storage
306                        .get_vid_share::<SeqTypes>(BlockId::Number(*block_number as usize))
307                        .await
308                        .with_context(|| "failed to get vid share from sql storage")?,
309                    Some(Storage::Fs(storage)) => {
310                        // Open a read transaction
311                        let mut transaction = storage
312                            .read()
313                            .await
314                            .with_context(|| "failed to open fs storage transaction")?;
315
316                        // Get the VID share
317                        transaction
318                            .vid_share(BlockId::Number(*block_number as usize))
319                            .await
320                            .with_context(|| "failed to get vid share from fs storage")?
321                    },
322                    _ => bail!("storage was not initialized"),
323                };
324
325                Ok(Response::VidShare(vid_share))
326            },
327            Request::StateCert(epoch) => {
328                let state_cert = self
329                    .persistence
330                    .get_state_cert_by_epoch(*epoch)
331                    .await
332                    .with_context(|| {
333                        format!("failed to get state cert for epoch {epoch} from persistence")
334                    })?;
335
336                match state_cert {
337                    Some(cert) => Ok(Response::StateCert(cert)),
338                    None => bail!("State certificate for epoch {epoch} not found"),
339                }
340            },
341            Request::RewardMerkleTreeV2(height, view) => {
342                // Try to get the reward merkle tree from memory first, then fall back to storage
343                if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await {
344                    let merkle_tree_bytes = bincode::serialize(
345                        &TryInto::<RewardMerkleTreeV2Data>::try_into(&state.reward_merkle_tree_v2)?,
346                    )
347                    .context("Merkle tree serialization failed; this should never happen.")?;
348
349                    return Ok(Response::RewardMerkleTreeV2(merkle_tree_bytes));
350                }
351
352                // Fall back to storage
353                let merkle_tree_bytes = match &self.storage {
354                    Some(Storage::Sql(storage)) => storage
355                        .load_tree(*height)
356                        .await
357                        .with_context(|| "failed to get reward merkle tree from sql storage")?,
358                    Some(Storage::Fs(_)) => {
359                        bail!("fs storage not supported for reward merkle tree catchup")
360                    },
361                    _ => bail!("storage was not initialized"),
362                };
363
364                Ok(Response::RewardMerkleTreeV2(merkle_tree_bytes))
365            },
366        }
367    }
368}
369
370/// Get a partial snapshot of the given reward state, which contains only the specified accounts.
371///
372/// Fails if one of the requested accounts is not represented in the original `state`.
373pub fn retain_v2_reward_accounts(
374    state: &RewardMerkleTreeV2,
375    accounts: impl IntoIterator<Item = RewardAccountV2>,
376) -> anyhow::Result<RewardMerkleTreeV2> {
377    let mut snapshot = RewardMerkleTreeV2::from_commitment(state.commitment());
378    for account in accounts {
379        match state.universal_lookup(account) {
380            LookupResult::Ok(elem, proof) => {
381                // This remember cannot fail, since we just constructed a valid proof, and are
382                // remembering into a tree with the same commitment.
383                snapshot.remember(account, elem, proof).unwrap();
384            },
385            LookupResult::NotFound(proof) => {
386                // Likewise this cannot fail.
387                snapshot.non_membership_remember(account, proof).unwrap()
388            },
389            LookupResult::NotInMemory => {
390                bail!("missing account {account}");
391            },
392        }
393    }
394
395    Ok(snapshot)
396}
397
398/// Get a partial snapshot of the given reward state, which contains only the specified accounts.
399///
400/// Fails if one of the requested accounts is not represented in the original `state`.
401pub fn retain_v1_reward_accounts(
402    state: &RewardMerkleTreeV1,
403    accounts: impl IntoIterator<Item = RewardAccountV1>,
404) -> anyhow::Result<RewardMerkleTreeV1> {
405    let mut snapshot = RewardMerkleTreeV1::from_commitment(state.commitment());
406    for account in accounts {
407        match state.universal_lookup(account) {
408            LookupResult::Ok(elem, proof) => {
409                // This remember cannot fail, since we just constructed a valid proof, and are
410                // remembering into a tree with the same commitment.
411                snapshot.remember(account, elem, proof).unwrap();
412            },
413            LookupResult::NotFound(proof) => {
414                // Likewise this cannot fail.
415                snapshot.non_membership_remember(account, proof).unwrap()
416            },
417            LookupResult::NotInMemory => {
418                bail!("missing account {account}");
419            },
420        }
421    }
422
423    Ok(snapshot)
424}