1use std::{marker::PhantomData, sync::Arc};
6
7use anyhow::{Context, Result, bail};
8use async_trait::async_trait;
9use espresso_types::{
10 NodeState, PubKey, SeqTypes, retain_accounts,
11 traits::SequencerPersistence,
12 v0_3::{RewardAccountV1, RewardMerkleTreeV1},
13 v0_4::{RewardAccountV2, RewardMerkleTreeV2},
14};
15use hotshot::traits::NodeImplementation;
16use hotshot_query_service::{
17 data_source::{
18 VersionedDataSource,
19 storage::{FileSystemStorage, NodeStorage, SqlStorage},
20 },
21 node::BlockId,
22};
23use hotshot_types::{data::ViewNumber, traits::network::ConnectedNetwork, vote::HasViewNumber};
24use itertools::Itertools;
25use jf_merkle_tree_compat::{
26 ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, LookupResult,
27 MerkleTreeScheme, UniversalMerkleTreeScheme,
28};
29use request_response::data_source::DataSource as DataSourceTrait;
30
31use super::request::{Request, Response};
32use crate::{
33 api::{BlocksFrontier, RewardMerkleTreeDataSource, RewardMerkleTreeV2Data},
34 catchup::{
35 CatchupStorage, add_fee_accounts_to_state, add_v1_reward_accounts_to_state,
36 add_v2_reward_accounts_to_state,
37 },
38 consensus_handle::ConsensusHandle,
39};
40
41#[derive(Clone)]
43pub enum Storage {
44 Sql(Arc<SqlStorage>),
45 Fs(Arc<FileSystemStorage<SeqTypes>>),
46}
47
48#[derive(Clone)]
49pub struct DataSource<
50 I: NodeImplementation<SeqTypes>,
51 N: ConnectedNetwork<PubKey>,
52 P: SequencerPersistence,
53> {
54 pub consensus_handle: Arc<ConsensusHandle<SeqTypes, I>>,
56 pub node_state: NodeState,
58 pub storage: Option<Storage>,
60 pub persistence: Arc<P>,
62 pub phantom: PhantomData<N>,
64}
65
66#[async_trait]
68impl<I: NodeImplementation<SeqTypes>, N: ConnectedNetwork<PubKey>, P: SequencerPersistence>
69 DataSourceTrait<Request> for DataSource<I, N, P>
70{
71 async fn derive_response_for(&self, request: &Request) -> Result<Response> {
72 match request {
73 Request::Accounts(height, view, accounts) => {
74 if let Some(state) = self.consensus_handle.state(ViewNumber::new(*view)).await
76 && let Ok(accounts) =
77 retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
78 {
79 return Ok(Response::Accounts(accounts));
80 }
81
82 let (merkle_tree, leaf) = match &self.storage {
84 Some(Storage::Sql(storage)) => storage
85 .get_accounts(&self.node_state, *height, ViewNumber::new(*view), accounts)
86 .await
87 .with_context(|| "failed to get accounts from sql storage")?,
88 Some(Storage::Fs(_)) => bail!("fs storage not supported for accounts"),
89 _ => bail!("storage was not initialized"),
90 };
91
92 if let Err(err) = add_fee_accounts_to_state(
95 &*self.consensus_handle,
96 &ViewNumber::new(*view),
97 accounts,
98 &merkle_tree,
99 leaf,
100 )
101 .await
102 {
103 tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
104 }
105
106 Ok(Response::Accounts(merkle_tree))
107 },
108
109 Request::Leaf(height) => {
110 if let Ok(leaf_chain) =
113 legacy_leaf_chain_from_memory(&*self.consensus_handle, *height).await
114 {
115 return Ok(Response::Leaf(leaf_chain));
116 }
117
118 let leaf_chain = match &self.storage {
119 Some(Storage::Sql(storage)) => storage
120 .get_leaf_chain(*height)
121 .await
122 .with_context(|| "failed to get leaf from sql storage")?,
123 Some(Storage::Fs(_)) => bail!("fs storage not supported for leaf"),
124 _ => bail!("storage was not initialized"),
125 };
126
127 Ok(Response::Leaf(leaf_chain))
128 },
129 Request::ChainConfig(commitment) => {
130 if let Some(state) = self.consensus_handle.decided_state().await {
132 let chain_config_from_memory = state.chain_config;
133 if chain_config_from_memory.commit() == *commitment
134 && let Some(chain_config) = chain_config_from_memory.resolve()
135 {
136 return Ok(Response::ChainConfig(chain_config));
137 }
138 }
139
140 Ok(Response::ChainConfig(match &self.storage {
142 Some(Storage::Sql(storage)) => storage
143 .get_chain_config(*commitment)
144 .await
145 .with_context(|| "failed to get chain config from sql storage")?,
146 Some(Storage::Fs(_)) => {
147 bail!("fs storage not supported for chain config")
148 },
149 _ => bail!("storage was not initialized"),
150 }))
151 },
152 Request::BlocksFrontier(height, view) => {
153 let blocks_frontier_from_memory: Option<Result<BlocksFrontier>> = self
155 .consensus_handle
156 .state(ViewNumber::new(*view))
157 .await
158 .map(|state| {
159 let tree = &state.block_merkle_tree;
160 let frontier = tree.lookup(tree.num_leaves() - 1).expect_ok()?.1;
161 Ok(frontier)
162 });
163
164 if let Some(Ok(blocks_frontier_from_memory)) = blocks_frontier_from_memory {
165 return Ok(Response::BlocksFrontier(blocks_frontier_from_memory));
166 } else {
167 let blocks_frontier_from_storage = match &self.storage {
169 Some(Storage::Sql(storage)) => storage
170 .get_frontier(&self.node_state, *height, ViewNumber::new(*view))
171 .await
172 .with_context(|| "failed to get blocks frontier from sql storage")?,
173 Some(Storage::Fs(_)) => {
174 bail!("fs storage not supported for blocks frontier")
175 },
176 _ => bail!("storage was not initialized"),
177 };
178
179 Ok(Response::BlocksFrontier(blocks_frontier_from_storage))
180 }
181 },
182 Request::RewardAccountsV2(height, view, accounts) => {
183 if let Some(state) = self.consensus_handle.state(ViewNumber::new(*view)).await
185 && let Ok(reward_accounts) = retain_v2_reward_accounts(
186 &state.reward_merkle_tree_v2,
187 accounts.iter().copied(),
188 )
189 {
190 return Ok(Response::RewardAccountsV2(reward_accounts));
191 }
192
193 let (merkle_tree, leaf) = match &self.storage {
195 Some(Storage::Sql(storage)) => storage
196 .get_reward_accounts_v2(
197 &self.node_state,
198 *height,
199 ViewNumber::new(*view),
200 accounts,
201 )
202 .await
203 .with_context(|| "failed to get accounts from sql storage")?,
204 Some(Storage::Fs(_)) => {
205 bail!("fs storage not supported for reward accounts")
206 },
207 _ => bail!("storage was not initialized"),
208 };
209
210 if let Err(err) = add_v2_reward_accounts_to_state(
213 &*self.consensus_handle,
214 &ViewNumber::new(*view),
215 accounts,
216 &merkle_tree,
217 leaf,
218 )
219 .await
220 {
221 tracing::warn!(?view, "Cannot update fetched account state: {err:#}");
222 }
223
224 Ok(Response::RewardAccountsV2(merkle_tree))
225 },
226
227 Request::RewardAccountsV1(height, view, accounts) => {
228 if let Some(state) = self.consensus_handle.state(ViewNumber::new(*view)).await
230 && let Ok(reward_accounts) = retain_v1_reward_accounts(
231 &state.reward_merkle_tree_v1,
232 accounts.iter().copied(),
233 )
234 {
235 return Ok(Response::RewardAccountsV1(reward_accounts));
236 }
237
238 let (merkle_tree, leaf) = match &self.storage {
240 Some(Storage::Sql(storage)) => storage
241 .get_reward_accounts_v1(
242 &self.node_state,
243 *height,
244 ViewNumber::new(*view),
245 accounts,
246 )
247 .await
248 .with_context(|| "failed to get v1 reward accounts from sql storage")?,
249 Some(Storage::Fs(_)) => {
250 bail!("fs storage not supported for v1 reward accounts")
251 },
252 _ => bail!("storage was not initialized"),
253 };
254
255 if let Err(err) = add_v1_reward_accounts_to_state(
258 &*self.consensus_handle,
259 &ViewNumber::new(*view),
260 accounts,
261 &merkle_tree,
262 leaf,
263 )
264 .await
265 {
266 tracing::warn!(
267 ?view,
268 "Cannot update fetched v1 reward account state: {err:#}"
269 );
270 }
271
272 Ok(Response::RewardAccountsV1(merkle_tree))
273 },
274 Request::VidShare(block_number, _request_id) => {
275 let vid_share = match &self.storage {
277 Some(Storage::Sql(storage)) => storage
278 .get_vid_share::<SeqTypes>(BlockId::Number(*block_number as usize))
279 .await
280 .with_context(|| "failed to get vid share from sql storage")?,
281 Some(Storage::Fs(storage)) => {
282 let mut transaction = storage
284 .read()
285 .await
286 .with_context(|| "failed to open fs storage transaction")?;
287
288 transaction
290 .vid_share(BlockId::Number(*block_number as usize))
291 .await
292 .with_context(|| "failed to get vid share from fs storage")?
293 },
294 _ => bail!("storage was not initialized"),
295 };
296
297 Ok(Response::VidShare(vid_share))
298 },
299 Request::StateCert(epoch) => {
300 let state_cert = self
301 .persistence
302 .get_state_cert_by_epoch(*epoch)
303 .await
304 .with_context(|| {
305 format!("failed to get state cert for epoch {epoch} from persistence")
306 })?;
307
308 match state_cert {
309 Some(cert) => Ok(Response::StateCert(cert)),
310 None => bail!("State certificate for epoch {epoch} not found"),
311 }
312 },
313 Request::Cert2(height) => {
314 let cert2 = match &self.storage {
315 Some(Storage::Sql(storage)) => storage
316 .load_earliest_cert2(*height)
317 .await
318 .with_context(|| "failed to load cert2 from sql storage")?,
319 Some(Storage::Fs(_)) => bail!("fs storage not supported for cert2"),
320 _ => bail!("storage was not initialized"),
321 };
322
323 match cert2 {
324 Some(cert2) => Ok(Response::Cert2(cert2)),
325 None => bail!("no cert2 available for height {height}"),
326 }
327 },
328 Request::RewardMerkleTreeV2(height, view) => {
329 if let Some(state) = self.consensus_handle.state(ViewNumber::new(*view)).await {
331 let merkle_tree_bytes = bincode::serialize(
332 &TryInto::<RewardMerkleTreeV2Data>::try_into(&state.reward_merkle_tree_v2)?,
333 )
334 .context("Merkle tree serialization failed; this should never happen.")?;
335
336 return Ok(Response::RewardMerkleTreeV2(merkle_tree_bytes));
337 }
338
339 let merkle_tree_bytes = match &self.storage {
341 Some(Storage::Sql(storage)) => storage
342 .load_tree(*height)
343 .await
344 .with_context(|| "failed to get reward merkle tree from sql storage")?,
345 Some(Storage::Fs(_)) => {
346 bail!("fs storage not supported for reward merkle tree catchup")
347 },
348 _ => bail!("storage was not initialized"),
349 };
350
351 Ok(Response::RewardMerkleTreeV2(merkle_tree_bytes))
352 },
353 }
354 }
355}
356
357async fn legacy_leaf_chain_from_memory<I: NodeImplementation<SeqTypes>>(
362 consensus_handle: &ConsensusHandle<SeqTypes, I>,
363 height: u64,
364) -> anyhow::Result<Vec<espresso_types::Leaf2>> {
365 let mut leaves = consensus_handle.undecided_leaves().await;
366 leaves.sort_by_key(|l| l.view_number());
367
368 let (position, mut last_leaf) = leaves
369 .iter()
370 .find_position(|l| l.height() == height)
371 .ok_or_else(|| anyhow::anyhow!("leaf at height {height} not in memory"))?;
372
373 let mut leaf_chain = vec![last_leaf.clone()];
374 for leaf in leaves.iter().skip(position + 1) {
375 if leaf.justify_qc().view_number() == last_leaf.view_number() {
376 leaf_chain.push(leaf.clone());
377 } else {
378 continue;
379 }
380 if leaf.view_number() == last_leaf.view_number() + 1 {
381 last_leaf = leaf;
382 break;
383 }
384 last_leaf = leaf;
385 }
386
387 for leaf in leaves
388 .iter()
389 .skip_while(|l| l.view_number() <= last_leaf.view_number())
390 {
391 if leaf.justify_qc().view_number() == last_leaf.view_number() {
392 leaf_chain.push(leaf.clone());
393 return Ok(leaf_chain);
394 }
395 }
396
397 anyhow::bail!("incomplete leaf chain in memory for height {height}")
398}
399
400pub fn retain_v2_reward_accounts(
404 state: &RewardMerkleTreeV2,
405 accounts: impl IntoIterator<Item = RewardAccountV2>,
406) -> anyhow::Result<RewardMerkleTreeV2> {
407 let mut snapshot = RewardMerkleTreeV2::from_commitment(state.commitment());
408 for account in accounts {
409 match state.universal_lookup(account) {
410 LookupResult::Ok(elem, proof) => {
411 snapshot.remember(account, elem, proof).unwrap();
414 },
415 LookupResult::NotFound(proof) => {
416 snapshot.non_membership_remember(account, proof).unwrap()
418 },
419 LookupResult::NotInMemory => {
420 bail!("missing account {account}");
421 },
422 }
423 }
424
425 Ok(snapshot)
426}
427
428pub fn retain_v1_reward_accounts(
432 state: &RewardMerkleTreeV1,
433 accounts: impl IntoIterator<Item = RewardAccountV1>,
434) -> anyhow::Result<RewardMerkleTreeV1> {
435 let mut snapshot = RewardMerkleTreeV1::from_commitment(state.commitment());
436 for account in accounts {
437 match state.universal_lookup(account) {
438 LookupResult::Ok(elem, proof) => {
439 snapshot.remember(account, elem, proof).unwrap();
442 },
443 LookupResult::NotFound(proof) => {
444 snapshot.non_membership_remember(account, proof).unwrap()
446 },
447 LookupResult::NotInMemory => {
448 bail!("missing account {account}");
449 },
450 }
451 }
452
453 Ok(snapshot)
454}