Skip to main content

espresso_node/request_response/
data_source.rs

1//! This file contains the [`DataSource`] trait. This trait allows the [`RequestResponseProtocol`]
2//! to calculate/derive a response for a specific request. In the confirmation layer the implementer
3//! would be something like a [`FeeMerkleTree`] for fee catchup
4
5use std::{marker::PhantomData, sync::Arc};
6
7use anyhow::{Context, Result, bail};
8use async_trait::async_trait;
9use espresso_types::{
10    NodeState, PubKey, SeqTypes, retain_accounts,
11    traits::SequencerPersistence,
12    v0_3::{RewardAccountV1, RewardMerkleTreeV1},
13    v0_4::{RewardAccountV2, RewardMerkleTreeV2},
14};
15use hotshot::traits::NodeImplementation;
16use hotshot_query_service::{
17    data_source::{
18        VersionedDataSource,
19        storage::{FileSystemStorage, NodeStorage, SqlStorage},
20    },
21    node::BlockId,
22};
23use hotshot_types::{data::ViewNumber, traits::network::ConnectedNetwork, vote::HasViewNumber};
24use itertools::Itertools;
25use jf_merkle_tree_compat::{
26    ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, LookupResult,
27    MerkleTreeScheme, UniversalMerkleTreeScheme,
28};
29use request_response::data_source::DataSource as DataSourceTrait;
30
31use super::request::{Request, Response};
32use crate::{
33    api::{BlocksFrontier, RewardMerkleTreeDataSource, RewardMerkleTreeV2Data},
34    catchup::{
35        CatchupStorage, add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
36        add_v2_reward_accounts_to_state,
37    },
38    consensus_handle::ConsensusHandle,
39};
40
41/// Query Service Storage types that can be used for request-response data source
42#[derive(Clone)]
43pub enum Storage {
44    Sql(Arc<SqlStorage>),
45    Fs(Arc<FileSystemStorage<SeqTypes>>),
46}
47
48#[derive(Clone)]
49pub struct DataSource<
50    I: NodeImplementation<SeqTypes>,
51    N: ConnectedNetwork<PubKey>,
52    P: SequencerPersistence,
53> {
54    /// The consensus adapter handle
55    pub consensus_handle: Arc<ConsensusHandle<SeqTypes, I>>,
56    /// The node's state
57    pub node_state: NodeState,
58    /// The storage
59    pub storage: Option<Storage>,
60    /// sequencer persistence
61    pub persistence: Arc<P>,
62    /// Phantom data
63    pub phantom: PhantomData<N>,
64}
65
66/// Implement the trait that allows the [`RequestResponseProtocol`] to calculate/derive a response for a specific request
67#[async_trait]
68impl<I: NodeImplementation<SeqTypes>, N: ConnectedNetwork<PubKey>, P: SequencerPersistence>
69    DataSourceTrait<Request> for DataSource<I, N, P>
70{
71    async fn derive_response_for(&self, request: &Request) -> Result<Response> {
72        match request {
73            Request::Accounts(height, view, accounts) => {
74                // Try to get accounts from memory first, then fall back to storage
75                if let Some(state) = self.consensus_handle.state(ViewNumber::new(*view)).await
76                    && let Ok(accounts) =
77                        retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
78                {
79                    return Ok(Response::Accounts(accounts));
80                }
81
82                // Fall back to storage
83                let (merkle_tree, leaf) = match &self.storage {
84                    Some(Storage::Sql(storage)) => storage
85                        .get_accounts(&self.node_state, *height, ViewNumber::new(*view), accounts)
86                        .await
87                        .with_context(|| "failed to get accounts from sql storage")?,
88                    Some(Storage::Fs(_)) => bail!("fs storage not supported for accounts"),
89                    _ => bail!("storage was not initialized"),
90                };
91
92                // If we successfully fetched accounts from storage, try to add them back into the in-memory
93                // state.
94                if let Err(err) = add_fee_accounts_to_state(
95                    &*self.consensus_handle,
96                    &ViewNumber::new(*view),
97                    accounts,
98                    &merkle_tree,
99                    leaf,
100                )
101                .await
102                {
103                    tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
104                }
105
106                Ok(Response::Accounts(merkle_tree))
107            },
108
109            Request::Leaf(height) => {
110                // Legacy heights can be served from in-memory undecided leaves; new-protocol
111                // heights always fall through to storage.
112                if let Ok(leaf_chain) =
113                    legacy_leaf_chain_from_memory(&*self.consensus_handle, *height).await
114                {
115                    return Ok(Response::Leaf(leaf_chain));
116                }
117
118                let leaf_chain = match &self.storage {
119                    Some(Storage::Sql(storage)) => storage
120                        .get_leaf_chain(*height)
121                        .await
122                        .with_context(|| "failed to get leaf from sql storage")?,
123                    Some(Storage::Fs(_)) => bail!("fs storage not supported for leaf"),
124                    _ => bail!("storage was not initialized"),
125                };
126
127                Ok(Response::Leaf(leaf_chain))
128            },
129            Request::ChainConfig(commitment) => {
130                // Try to get the chain config from memory first, then fall back to storage
131                if let Some(state) = self.consensus_handle.decided_state().await {
132                    let chain_config_from_memory = state.chain_config;
133                    if chain_config_from_memory.commit() == *commitment
134                        && let Some(chain_config) = chain_config_from_memory.resolve()
135                    {
136                        return Ok(Response::ChainConfig(chain_config));
137                    }
138                }
139
140                // Fall back to storage
141                Ok(Response::ChainConfig(match &self.storage {
142                    Some(Storage::Sql(storage)) => storage
143                        .get_chain_config(*commitment)
144                        .await
145                        .with_context(|| "failed to get chain config from sql storage")?,
146                    Some(Storage::Fs(_)) => {
147                        bail!("fs storage not supported for chain config")
148                    },
149                    _ => bail!("storage was not initialized"),
150                }))
151            },
152            Request::BlocksFrontier(height, view) => {
153                // First try to respond from memory
154                let blocks_frontier_from_memory: Option<Result<BlocksFrontier>> = self
155                    .consensus_handle
156                    .state(ViewNumber::new(*view))
157                    .await
158                    .map(|state| {
159                        let tree = &state.block_merkle_tree;
160                        let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
161                        Ok(frontier)
162                    });
163
164                if let Some(Ok(blocks_frontier_from_memory)) = blocks_frontier_from_memory {
165                    return Ok(Response::BlocksFrontier(blocks_frontier_from_memory));
166                } else {
167                    // If we can't get the blocks frontier from memory, fall through to storage
168                    let blocks_frontier_from_storage = match &self.storage {
169                        Some(Storage::Sql(storage)) => storage
170                            .get_frontier(&self.node_state, *height, ViewNumber::new(*view))
171                            .await
172                            .with_context(|| "failed to get blocks frontier from sql storage")?,
173                        Some(Storage::Fs(_)) => {
174                            bail!("fs storage not supported for blocks frontier")
175                        },
176                        _ => bail!("storage was not initialized"),
177                    };
178
179                    Ok(Response::BlocksFrontier(blocks_frontier_from_storage))
180                }
181            },
182            Request::RewardAccountsV2(height, view, accounts) => {
183                // Try to get the reward accounts from memory first, then fall back to storage
184                if let Some(state) = self.consensus_handle.state(ViewNumber::new(*view)).await
185                    && let Ok(reward_accounts) = retain_v2_reward_accounts(
186                        &state.reward_merkle_tree_v2,
187                        accounts.iter().copied(),
188                    )
189                {
190                    return Ok(Response::RewardAccountsV2(reward_accounts));
191                }
192
193                // Fall back to storage
194                let (merkle_tree, leaf) = match &self.storage {
195                    Some(Storage::Sql(storage)) => storage
196                        .get_reward_accounts_v2(
197                            &self.node_state,
198                            *height,
199                            ViewNumber::new(*view),
200                            accounts,
201                        )
202                        .await
203                        .with_context(|| "failed to get accounts from sql storage")?,
204                    Some(Storage::Fs(_)) => {
205                        bail!("fs storage not supported for reward accounts")
206                    },
207                    _ => bail!("storage was not initialized"),
208                };
209
210                // If we successfully fetched accounts from storage, try to add them back into the in-memory
211                // state.
212                if let Err(err) = add_v2_reward_accounts_to_state(
213                    &*self.consensus_handle,
214                    &ViewNumber::new(*view),
215                    accounts,
216                    &merkle_tree,
217                    leaf,
218                )
219                .await
220                {
221                    tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
222                }
223
224                Ok(Response::RewardAccountsV2(merkle_tree))
225            },
226
227            Request::RewardAccountsV1(height, view, accounts) => {
228                // Try to get the reward accounts from memory first, then fall back to storage
229                if let Some(state) = self.consensus_handle.state(ViewNumber::new(*view)).await
230                    && let Ok(reward_accounts) = retain_v1_reward_accounts(
231                        &state.reward_merkle_tree_v1,
232                        accounts.iter().copied(),
233                    )
234                {
235                    return Ok(Response::RewardAccountsV1(reward_accounts));
236                }
237
238                // Fall back to storage
239                let (merkle_tree, leaf) = match &self.storage {
240                    Some(Storage::Sql(storage)) => storage
241                        .get_reward_accounts_v1(
242                            &self.node_state,
243                            *height,
244                            ViewNumber::new(*view),
245                            accounts,
246                        )
247                        .await
248                        .with_context(|| "failed to get v1 reward accounts from sql storage")?,
249                    Some(Storage::Fs(_)) => {
250                        bail!("fs storage not supported for v1 reward accounts")
251                    },
252                    _ => bail!("storage was not initialized"),
253                };
254
255                // If we successfully fetched accounts from storage, try to add them back into the in-memory
256                // state.
257                if let Err(err) = add_v1_reward_accounts_to_state(
258                    &*self.consensus_handle,
259                    &ViewNumber::new(*view),
260                    accounts,
261                    &merkle_tree,
262                    leaf,
263                )
264                .await
265                {
266                    tracing::warn!(
267                        ?view,
268                        "Cannot update fetched v1 reward account state: {err:#}"
269                    );
270                }
271
272                Ok(Response::RewardAccountsV1(merkle_tree))
273            },
274            Request::VidShare(block_number, _request_id) => {
275                // Load the VID share from storage
276                let vid_share = match &self.storage {
277                    Some(Storage::Sql(storage)) => storage
278                        .get_vid_share::<SeqTypes>(BlockId::Number(*block_number as usize))
279                        .await
280                        .with_context(|| "failed to get vid share from sql storage")?,
281                    Some(Storage::Fs(storage)) => {
282                        // Open a read transaction
283                        let mut transaction = storage
284                            .read()
285                            .await
286                            .with_context(|| "failed to open fs storage transaction")?;
287
288                        // Get the VID share
289                        transaction
290                            .vid_share(BlockId::Number(*block_number as usize))
291                            .await
292                            .with_context(|| "failed to get vid share from fs storage")?
293                    },
294                    _ => bail!("storage was not initialized"),
295                };
296
297                Ok(Response::VidShare(vid_share))
298            },
299            Request::StateCert(epoch) => {
300                let state_cert = self
301                    .persistence
302                    .get_state_cert_by_epoch(*epoch)
303                    .await
304                    .with_context(|| {
305                        format!("failed to get state cert for epoch {epoch} from persistence")
306                    })?;
307
308                match state_cert {
309                    Some(cert) => Ok(Response::StateCert(cert)),
310                    None => bail!("State certificate for epoch {epoch} not found"),
311                }
312            },
313            Request::Cert2(height) => {
314                let cert2 = match &self.storage {
315                    Some(Storage::Sql(storage)) => storage
316                        .load_earliest_cert2(*height)
317                        .await
318                        .with_context(|| "failed to load cert2 from sql storage")?,
319                    Some(Storage::Fs(_)) => bail!("fs storage not supported for cert2"),
320                    _ => bail!("storage was not initialized"),
321                };
322
323                match cert2 {
324                    Some(cert2) => Ok(Response::Cert2(cert2)),
325                    None => bail!("no cert2 available for height {height}"),
326                }
327            },
328            Request::RewardMerkleTreeV2(height, view) => {
329                // Try to get the reward merkle tree from memory first, then fall back to storage
330                if let Some(state) = self.consensus_handle.state(ViewNumber::new(*view)).await {
331                    let merkle_tree_bytes = bincode::serialize(
332                        &TryInto::<RewardMerkleTreeV2Data>::try_into(&state.reward_merkle_tree_v2)?,
333                    )
334                    .context("Merkle tree serialization failed; this should never happen.")?;
335
336                    return Ok(Response::RewardMerkleTreeV2(merkle_tree_bytes));
337                }
338
339                // Fall back to storage
340                let merkle_tree_bytes = match &self.storage {
341                    Some(Storage::Sql(storage)) => storage
342                        .load_tree(*height)
343                        .await
344                        .with_context(|| "failed to get reward merkle tree from sql storage")?,
345                    Some(Storage::Fs(_)) => {
346                        bail!("fs storage not supported for reward merkle tree catchup")
347                    },
348                    _ => bail!("storage was not initialized"),
349                };
350
351                Ok(Response::RewardMerkleTreeV2(merkle_tree_bytes))
352            },
353        }
354    }
355}
356
357/// Build a legacy-protocol 3-chain leaf chain decided at `height` from in-memory undecided leaves.
358///
359/// Returns an error if the chain cannot be assembled from memory (e.g. the height is below the
360/// latest decided leaf).
361async fn legacy_leaf_chain_from_memory<I: NodeImplementation<SeqTypes>>(
362    consensus_handle: &ConsensusHandle<SeqTypes, I>,
363    height: u64,
364) -> anyhow::Result<Vec<espresso_types::Leaf2>> {
365    let mut leaves = consensus_handle.undecided_leaves().await;
366    leaves.sort_by_key(|l| l.view_number());
367
368    let (position, mut last_leaf) = leaves
369        .iter()
370        .find_position(|l| l.height() == height)
371        .ok_or_else(|| anyhow::anyhow!("leaf at height {height} not in memory"))?;
372
373    let mut leaf_chain = vec![last_leaf.clone()];
374    for leaf in leaves.iter().skip(position + 1) {
375        if leaf.justify_qc().view_number() == last_leaf.view_number() {
376            leaf_chain.push(leaf.clone());
377        } else {
378            continue;
379        }
380        if leaf.view_number() == last_leaf.view_number() + 1 {
381            last_leaf = leaf;
382            break;
383        }
384        last_leaf = leaf;
385    }
386
387    for leaf in leaves
388        .iter()
389        .skip_while(|l| l.view_number() <= last_leaf.view_number())
390    {
391        if leaf.justify_qc().view_number() == last_leaf.view_number() {
392            leaf_chain.push(leaf.clone());
393            return Ok(leaf_chain);
394        }
395    }
396
397    anyhow::bail!("incomplete leaf chain in memory for height {height}")
398}
399
400/// Get a partial snapshot of the given reward state, which contains only the specified accounts.
401///
402/// Fails if one of the requested accounts is not represented in the original `state`.
403pub fn retain_v2_reward_accounts(
404    state: &RewardMerkleTreeV2,
405    accounts: impl IntoIterator<Item = RewardAccountV2>,
406) -> anyhow::Result<RewardMerkleTreeV2> {
407    let mut snapshot = RewardMerkleTreeV2::from_commitment(state.commitment());
408    for account in accounts {
409        match state.universal_lookup(account) {
410            LookupResult::Ok(elem, proof) => {
411                // This remember cannot fail, since we just constructed a valid proof, and are
412                // remembering into a tree with the same commitment.
413                snapshot.remember(account, elem, proof).unwrap();
414            },
415            LookupResult::NotFound(proof) => {
416                // Likewise this cannot fail.
417                snapshot.non_membership_remember(account, proof).unwrap()
418            },
419            LookupResult::NotInMemory => {
420                bail!("missing account {account}");
421            },
422        }
423    }
424
425    Ok(snapshot)
426}
427
428/// Get a partial snapshot of the given reward state, which contains only the specified accounts.
429///
430/// Fails if one of the requested accounts is not represented in the original `state`.
431pub fn retain_v1_reward_accounts(
432    state: &RewardMerkleTreeV1,
433    accounts: impl IntoIterator<Item = RewardAccountV1>,
434) -> anyhow::Result<RewardMerkleTreeV1> {
435    let mut snapshot = RewardMerkleTreeV1::from_commitment(state.commitment());
436    for account in accounts {
437        match state.universal_lookup(account) {
438            LookupResult::Ok(elem, proof) => {
439                // This remember cannot fail, since we just constructed a valid proof, and are
440                // remembering into a tree with the same commitment.
441                snapshot.remember(account, elem, proof).unwrap();
442            },
443            LookupResult::NotFound(proof) => {
444                // Likewise this cannot fail.
445                snapshot.non_membership_remember(account, proof).unwrap()
446            },
447            LookupResult::NotInMemory => {
448                bail!("missing account {account}");
449            },
450        }
451    }
452
453    Ok(snapshot)
454}