espresso_node/request_response/
data_source.rs1use std::{marker::PhantomData, sync::Arc};
6
7use anyhow::{Context, Result, bail};
8use async_trait::async_trait;
9use espresso_types::{
10 NodeState, PubKey, SeqTypes, retain_accounts,
11 traits::SequencerPersistence,
12 v0_3::{RewardAccountV1, RewardMerkleTreeV1},
13 v0_4::{RewardAccountV2, RewardMerkleTreeV2},
14};
15use hotshot::{SystemContext, traits::NodeImplementation};
16use hotshot_query_service::{
17 data_source::{
18 VersionedDataSource,
19 storage::{FileSystemStorage, NodeStorage, SqlStorage},
20 },
21 node::BlockId,
22};
23use hotshot_types::{data::ViewNumber, traits::network::ConnectedNetwork, vote::HasViewNumber};
24use itertools::Itertools;
25use jf_merkle_tree_compat::{
26 ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, LookupResult,
27 MerkleTreeScheme, UniversalMerkleTreeScheme,
28};
29use request_response::data_source::DataSource as DataSourceTrait;
30
31use super::request::{Request, Response};
32use crate::{
33 api::{BlocksFrontier, RewardMerkleTreeDataSource, RewardMerkleTreeV2Data},
34 catchup::{
35 CatchupStorage, add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
36 add_v2_reward_accounts_to_state,
37 },
38};
39
40#[derive(Clone)]
42pub enum Storage {
43 Sql(Arc<SqlStorage>),
44 Fs(Arc<FileSystemStorage<SeqTypes>>),
45}
46
47type Consensus<I> = Arc<SystemContext<SeqTypes, I>>;
49
50#[derive(Clone)]
51pub struct DataSource<
52 I: NodeImplementation<SeqTypes>,
53 N: ConnectedNetwork<PubKey>,
54 P: SequencerPersistence,
55> {
56 pub consensus: Consensus<I>,
58 pub node_state: NodeState,
60 pub storage: Option<Storage>,
62 pub persistence: Arc<P>,
64 pub phantom: PhantomData<N>,
66}
67
68#[async_trait]
70impl<I: NodeImplementation<SeqTypes>, N: ConnectedNetwork<PubKey>, P: SequencerPersistence>
71 DataSourceTrait<Request> for DataSource<I, N, P>
72{
73 async fn derive_response_for(&self, request: &Request) -> Result<Response> {
74 match request {
75 Request::Accounts(height, view, accounts) => {
76 if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await
78 && let Ok(accounts) =
79 retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
80 {
81 return Ok(Response::Accounts(accounts));
82 }
83
84 let (merkle_tree, leaf) = match &self.storage {
86 Some(Storage::Sql(storage)) => storage
87 .get_accounts(&self.node_state, *height, ViewNumber::new(*view), accounts)
88 .await
89 .with_context(|| "failed to get accounts from sql storage")?,
90 Some(Storage::Fs(_)) => bail!("fs storage not supported for accounts"),
91 _ => bail!("storage was not initialized"),
92 };
93
94 if let Err(err) = add_fee_accounts_to_state::<N, P>(
97 &self.consensus.consensus(),
98 &ViewNumber::new(*view),
99 accounts,
100 &merkle_tree,
101 leaf,
102 )
103 .await
104 {
105 tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
106 }
107
108 Ok(Response::Accounts(merkle_tree))
109 },
110
111 Request::Leaf(height) => {
112 let mut leaves = self.consensus.consensus().read().await.undecided_leaves();
114 leaves.sort_by_key(|l| l.view_number());
115
116 if let Some((position, mut last_leaf)) =
117 leaves.iter().find_position(|l| l.height() == *height)
118 {
119 let mut leaf_chain = vec![last_leaf.clone()];
120 for leaf in leaves.iter().skip(position + 1) {
121 if leaf.justify_qc().view_number() == last_leaf.view_number() {
122 leaf_chain.push(leaf.clone());
123 } else {
124 continue;
125 }
126 if leaf.view_number() == last_leaf.view_number() + 1 {
127 last_leaf = leaf;
129 break;
130 }
131 last_leaf = leaf;
132 }
133
134 for leaf in leaves
136 .iter()
137 .skip_while(|l| l.view_number() <= last_leaf.view_number())
138 {
139 if leaf.justify_qc().view_number() == last_leaf.view_number() {
140 leaf_chain.push(leaf.clone());
141 return Ok(Response::Leaf(leaf_chain));
142 }
143 }
144 }
145
146 let leaf_chain = match &self.storage {
148 Some(Storage::Sql(storage)) => storage
149 .get_leaf_chain(*height)
150 .await
151 .with_context(|| "failed to get leaf from sql storage")?,
152 Some(Storage::Fs(_)) => bail!("fs storage not supported for leaf"),
154 _ => bail!("storage was not initialized"),
155 };
156
157 Ok(Response::Leaf(leaf_chain))
158 },
159 Request::ChainConfig(commitment) => {
160 let chain_config_from_memory = self.consensus.decided_state().await.chain_config;
162 if chain_config_from_memory.commit() == *commitment
163 && let Some(chain_config) = chain_config_from_memory.resolve()
164 {
165 return Ok(Response::ChainConfig(chain_config));
166 }
167
168 Ok(Response::ChainConfig(match &self.storage {
170 Some(Storage::Sql(storage)) => storage
171 .get_chain_config(*commitment)
172 .await
173 .with_context(|| "failed to get chain config from sql storage")?,
174 Some(Storage::Fs(_)) => {
175 bail!("fs storage not supported for chain config")
176 },
177 _ => bail!("storage was not initialized"),
178 }))
179 },
180 Request::BlocksFrontier(height, view) => {
181 let blocks_frontier_from_memory: Option<Result<BlocksFrontier>> = self
183 .consensus
184 .state(ViewNumber::new(*view))
185 .await
186 .map(|state| {
187 let tree = &state.block_merkle_tree;
188 let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
189 Ok(frontier)
190 });
191
192 if let Some(Ok(blocks_frontier_from_memory)) = blocks_frontier_from_memory {
193 return Ok(Response::BlocksFrontier(blocks_frontier_from_memory));
194 } else {
195 let blocks_frontier_from_storage = match &self.storage {
197 Some(Storage::Sql(storage)) => storage
198 .get_frontier(&self.node_state, *height, ViewNumber::new(*view))
199 .await
200 .with_context(|| "failed to get blocks frontier from sql storage")?,
201 Some(Storage::Fs(_)) => {
202 bail!("fs storage not supported for blocks frontier")
203 },
204 _ => bail!("storage was not initialized"),
205 };
206
207 Ok(Response::BlocksFrontier(blocks_frontier_from_storage))
208 }
209 },
210 Request::RewardAccountsV2(height, view, accounts) => {
211 if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await
213 && let Ok(reward_accounts) = retain_v2_reward_accounts(
214 &state.reward_merkle_tree_v2,
215 accounts.iter().copied(),
216 )
217 {
218 return Ok(Response::RewardAccountsV2(reward_accounts));
219 }
220
221 let (merkle_tree, leaf) = match &self.storage {
223 Some(Storage::Sql(storage)) => storage
224 .get_reward_accounts_v2(
225 &self.node_state,
226 *height,
227 ViewNumber::new(*view),
228 accounts,
229 )
230 .await
231 .with_context(|| "failed to get accounts from sql storage")?,
232 Some(Storage::Fs(_)) => {
233 bail!("fs storage not supported for reward accounts")
234 },
235 _ => bail!("storage was not initialized"),
236 };
237
238 if let Err(err) = add_v2_reward_accounts_to_state::<N, P>(
241 &self.consensus.consensus(),
242 &ViewNumber::new(*view),
243 accounts,
244 &merkle_tree,
245 leaf,
246 )
247 .await
248 {
249 tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
250 }
251
252 Ok(Response::RewardAccountsV2(merkle_tree))
253 },
254
255 Request::RewardAccountsV1(height, view, accounts) => {
256 if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await
258 && let Ok(reward_accounts) = retain_v1_reward_accounts(
259 &state.reward_merkle_tree_v1,
260 accounts.iter().copied(),
261 )
262 {
263 return Ok(Response::RewardAccountsV1(reward_accounts));
264 }
265
266 let (merkle_tree, leaf) = match &self.storage {
268 Some(Storage::Sql(storage)) => storage
269 .get_reward_accounts_v1(
270 &self.node_state,
271 *height,
272 ViewNumber::new(*view),
273 accounts,
274 )
275 .await
276 .with_context(|| "failed to get v1 reward accounts from sql storage")?,
277 Some(Storage::Fs(_)) => {
278 bail!("fs storage not supported for v1 reward accounts")
279 },
280 _ => bail!("storage was not initialized"),
281 };
282
283 if let Err(err) = add_v1_reward_accounts_to_state::<N, P>(
286 &self.consensus.consensus(),
287 &ViewNumber::new(*view),
288 accounts,
289 &merkle_tree,
290 leaf,
291 )
292 .await
293 {
294 tracing::warn!(
295 ?view,
296 "Cannot update fetched v1 reward account state: {err:#}"
297 );
298 }
299
300 Ok(Response::RewardAccountsV1(merkle_tree))
301 },
302 Request::VidShare(block_number, _request_id) => {
303 let vid_share = match &self.storage {
305 Some(Storage::Sql(storage)) => storage
306 .get_vid_share::<SeqTypes>(BlockId::Number(*block_number as usize))
307 .await
308 .with_context(|| "failed to get vid share from sql storage")?,
309 Some(Storage::Fs(storage)) => {
310 let mut transaction = storage
312 .read()
313 .await
314 .with_context(|| "failed to open fs storage transaction")?;
315
316 transaction
318 .vid_share(BlockId::Number(*block_number as usize))
319 .await
320 .with_context(|| "failed to get vid share from fs storage")?
321 },
322 _ => bail!("storage was not initialized"),
323 };
324
325 Ok(Response::VidShare(vid_share))
326 },
327 Request::StateCert(epoch) => {
328 let state_cert = self
329 .persistence
330 .get_state_cert_by_epoch(*epoch)
331 .await
332 .with_context(|| {
333 format!("failed to get state cert for epoch {epoch} from persistence")
334 })?;
335
336 match state_cert {
337 Some(cert) => Ok(Response::StateCert(cert)),
338 None => bail!("State certificate for epoch {epoch} not found"),
339 }
340 },
341 Request::RewardMerkleTreeV2(height, view) => {
342 if let Some(state) = self.consensus.state(ViewNumber::new(*view)).await {
344 let merkle_tree_bytes = bincode::serialize(
345 &TryInto::<RewardMerkleTreeV2Data>::try_into(&state.reward_merkle_tree_v2)?,
346 )
347 .context("Merkle tree serialization failed; this should never happen.")?;
348
349 return Ok(Response::RewardMerkleTreeV2(merkle_tree_bytes));
350 }
351
352 let merkle_tree_bytes = match &self.storage {
354 Some(Storage::Sql(storage)) => storage
355 .load_tree(*height)
356 .await
357 .with_context(|| "failed to get reward merkle tree from sql storage")?,
358 Some(Storage::Fs(_)) => {
359 bail!("fs storage not supported for reward merkle tree catchup")
360 },
361 _ => bail!("storage was not initialized"),
362 };
363
364 Ok(Response::RewardMerkleTreeV2(merkle_tree_bytes))
365 },
366 }
367 }
368}
369
370pub fn retain_v2_reward_accounts(
374 state: &RewardMerkleTreeV2,
375 accounts: impl IntoIterator<Item = RewardAccountV2>,
376) -> anyhow::Result<RewardMerkleTreeV2> {
377 let mut snapshot = RewardMerkleTreeV2::from_commitment(state.commitment());
378 for account in accounts {
379 match state.universal_lookup(account) {
380 LookupResult::Ok(elem, proof) => {
381 snapshot.remember(account, elem, proof).unwrap();
384 },
385 LookupResult::NotFound(proof) => {
386 snapshot.non_membership_remember(account, proof).unwrap()
388 },
389 LookupResult::NotInMemory => {
390 bail!("missing account {account}");
391 },
392 }
393 }
394
395 Ok(snapshot)
396}
397
398pub fn retain_v1_reward_accounts(
402 state: &RewardMerkleTreeV1,
403 accounts: impl IntoIterator<Item = RewardAccountV1>,
404) -> anyhow::Result<RewardMerkleTreeV1> {
405 let mut snapshot = RewardMerkleTreeV1::from_commitment(state.commitment());
406 for account in accounts {
407 match state.universal_lookup(account) {
408 LookupResult::Ok(elem, proof) => {
409 snapshot.remember(account, elem, proof).unwrap();
412 },
413 LookupResult::NotFound(proof) => {
414 snapshot.non_membership_remember(account, proof).unwrap()
416 },
417 LookupResult::NotInMemory => {
418 bail!("missing account {account}");
419 },
420 }
421 }
422
423 Ok(snapshot)
424}