1use std::{collections::VecDeque, num::NonZeroUsize};
16
17use async_trait::async_trait;
18use committable::{Commitment, Committable};
19use futures::stream::{self, StreamExt, TryStreamExt};
20use hotshot_types::traits::node_implementation::NodeType;
21use itertools::Itertools;
22use sqlx::{FromRow, Row};
23use tagged_base64::{Tagged, TaggedBase64};
24
25use super::{
26 super::transaction::{Transaction, TransactionMode, query},
27 BLOCK_COLUMNS,
28};
29use crate::{
30 Header, Payload, QueryError, QueryResult, Transaction as HotshotTransaction,
31 availability::{BlockQueryData, QueryableHeader, QueryablePayload},
32 data_source::storage::{ExplorerStorage, NodeStorage},
33 explorer::{
34 self, BalanceAmount, BlockDetail, BlockIdentifier, BlockRange, BlockSummary,
35 ExplorerHistograms, ExplorerSummary, GenesisOverview, GetBlockDetailError,
36 GetBlockSummariesError, GetBlockSummariesRequest, GetExplorerSummaryError,
37 GetSearchResultsError, GetTransactionDetailError, GetTransactionSummariesError,
38 GetTransactionSummariesRequest, SearchResult, TransactionIdentifier, TransactionRange,
39 TransactionSummary, TransactionSummaryFilter,
40 errors::{self, NotFound},
41 query_data::TransactionDetailResponse,
42 traits::ExplorerHeader,
43 },
44 types::HeightIndexed,
45};
46
47lazy_static::lazy_static! {
48 static ref GET_BLOCK_SUMMARIES_QUERY_FOR_LATEST: String = {
49 format!(
50 "SELECT {BLOCK_COLUMNS}
51 FROM header AS h
52 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
53 ORDER BY h.height DESC
54 LIMIT $1"
55 )
56 };
57
58 static ref GET_BLOCK_SUMMARIES_QUERY_FOR_HEIGHT: String = {
59 format!(
60 "SELECT {BLOCK_COLUMNS}
61 FROM header AS h
62 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
63 WHERE h.height <= $1
64 ORDER BY h.height DESC
65 LIMIT $2"
66 )
67 };
68
69 static ref GET_BLOCK_SUMMARIES_QUERY_FOR_HASH: String = {
75 format!(
76 "SELECT {BLOCK_COLUMNS}
77 FROM header AS h
78 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
79 WHERE h.height <= (SELECT h1.height FROM header AS h1 WHERE h1.hash = $1)
80 ORDER BY h.height DESC
81 LIMIT $2",
82 )
83 };
84
85 static ref GET_BLOCK_DETAIL_QUERY_FOR_LATEST: String = {
86 format!(
87 "SELECT {BLOCK_COLUMNS}
88 FROM header AS h
89 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
90 ORDER BY h.height DESC
91 LIMIT 1"
92 )
93 };
94
95 static ref GET_BLOCK_DETAIL_QUERY_FOR_HEIGHT: String = {
96 format!(
97 "SELECT {BLOCK_COLUMNS}
98 FROM header AS h
99 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
100 WHERE h.height = $1
101 ORDER BY h.height DESC
102 LIMIT 1"
103 )
104 };
105
106 static ref GET_BLOCK_DETAIL_QUERY_FOR_HASH: String = {
107 format!(
108 "SELECT {BLOCK_COLUMNS}
109 FROM header AS h
110 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
111 WHERE h.hash = $1
112 ORDER BY h.height DESC
113 LIMIT 1"
114 )
115 };
116
117
118 static ref GET_BLOCKS_CONTAINING_TRANSACTIONS_NO_FILTER_QUERY: String = {
119 format!(
120 "SELECT {BLOCK_COLUMNS}
121 FROM header AS h
122 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
123 WHERE h.height IN (
124 SELECT t.block_height
125 FROM transactions AS t
126 WHERE (t.block_height, t.ns_id, t.position) <= ($1, $2, $3)
127 ORDER BY t.block_height DESC, t.ns_id DESC, t.position DESC
128 LIMIT $4
129 )
130 ORDER BY h.height DESC"
131 )
132 };
133
134 static ref GET_BLOCKS_CONTAINING_TRANSACTIONS_IN_NAMESPACE_QUERY: String = {
135 format!(
136 "SELECT {BLOCK_COLUMNS}
137 FROM header AS h
138 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
139 WHERE h.height IN (
140 SELECT t.block_height
141 FROM transactions AS t
142 WHERE (t.block_height, t.ns_id, t.position) <= ($1, $2, $3)
143 AND t.ns_id = $5
144 ORDER BY t.block_height DESC, t.ns_id DESC, t.position DESC
145 LIMIT $4
146 )
147 ORDER BY h.height DESC"
148 )
149 };
150
151 static ref GET_TRANSACTION_SUMMARIES_QUERY_FOR_BLOCK: String = {
152 format!(
153 "SELECT {BLOCK_COLUMNS}
154 FROM header AS h
155 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
156 WHERE h.height = $1
157 ORDER BY h.height DESC"
158 )
159 };
160
161 static ref GET_TRANSACTION_DETAIL_QUERY_FOR_LATEST: String = {
162 format!(
163 "SELECT {BLOCK_COLUMNS}
164 FROM header AS h
165 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
166 WHERE h.height = (
167 SELECT MAX(t1.block_height)
168 FROM transactions AS t1
169 )
170 ORDER BY h.height DESC"
171 )
172 };
173
174 static ref GET_TRANSACTION_DETAIL_QUERY_FOR_HEIGHT_AND_OFFSET: String = {
175 format!(
176 "SELECT {BLOCK_COLUMNS}
177 FROM header AS h
178 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
179 WHERE h.height = (
180 SELECT t1.block_height
181 FROM transactions AS t1
182 WHERE t1.block_height = $1
183 ORDER BY t1.block_height, t1.ns_id, t1.position
184 LIMIT 1
185 OFFSET $2
186
187 )
188 ORDER BY h.height DESC",
189 )
190 };
191
192 static ref GET_TRANSACTION_DETAIL_QUERY_FOR_HASH: String = {
193 format!(
194 "SELECT {BLOCK_COLUMNS}
195 FROM header AS h
196 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
197 WHERE h.height = (
198 SELECT t1.block_height
199 FROM transactions AS t1
200 WHERE t1.hash = $1
201 ORDER BY t1.block_height DESC, t1.ns_id DESC, t1.position DESC
202 LIMIT 1
203 )
204 ORDER BY h.height DESC"
205 )
206 };
207}
208
209const EXPLORER_SUMMARY_HISTOGRAM_NUM_ENTRIES: usize = 50;
212
213const EXPLORER_SUMMARY_NUM_BLOCKS: usize = 10;
216
217const EXPLORER_SUMMARY_NUM_TRANSACTIONS: usize = 10;
220
221const MILLIS_PER_UNIT: f64 = 1_000.0;
224
225#[async_trait]
226impl<Mode, Types> ExplorerStorage<Types> for Transaction<Mode>
227where
228 Mode: TransactionMode,
229 Types: NodeType,
230 Payload<Types>: QueryablePayload<Types>,
231 Header<Types>: QueryableHeader<Types> + ExplorerHeader<Types>,
232 crate::Transaction<Types>: explorer::traits::ExplorerTransaction<Types>,
233 BalanceAmount<Types>: Into<explorer::monetary_value::MonetaryValue>,
234{
235 async fn get_block_summaries(
236 &mut self,
237 request: GetBlockSummariesRequest<Types>,
238 ) -> Result<Vec<BlockSummary<Types>>, GetBlockSummariesError> {
239 let request = &request.0;
240
241 let query_stmt = match request.target {
242 BlockIdentifier::Latest => {
243 query(&GET_BLOCK_SUMMARIES_QUERY_FOR_LATEST).bind(request.num_blocks.get() as i64)
244 },
245 BlockIdentifier::Height(height) => query(&GET_BLOCK_SUMMARIES_QUERY_FOR_HEIGHT)
246 .bind(height as i64)
247 .bind(request.num_blocks.get() as i64),
248 BlockIdentifier::Hash(hash) => query(&GET_BLOCK_SUMMARIES_QUERY_FOR_HASH)
249 .bind(hash.to_string())
250 .bind(request.num_blocks.get() as i64),
251 };
252
253 let row_stream = query_stmt.fetch(self.as_mut());
254 let result = row_stream.map(|row| BlockSummary::from_row(&row?));
255
256 Ok(result.try_collect().await?)
257 }
258
259 async fn get_block_detail(
260 &mut self,
261 request: BlockIdentifier<Types>,
262 ) -> Result<BlockDetail<Types>, GetBlockDetailError> {
263 let query_stmt = match request {
264 BlockIdentifier::Latest => query(&GET_BLOCK_DETAIL_QUERY_FOR_LATEST),
265 BlockIdentifier::Height(height) => {
266 query(&GET_BLOCK_DETAIL_QUERY_FOR_HEIGHT).bind(height as i64)
267 },
268 BlockIdentifier::Hash(hash) => {
269 query(&GET_BLOCK_DETAIL_QUERY_FOR_HASH).bind(hash.to_string())
270 },
271 };
272
273 let query_result = query_stmt.fetch_one(self.as_mut()).await?;
274 let block = BlockDetail::from_row(&query_result)?;
275
276 Ok(block)
277 }
278
279 async fn get_transaction_summaries(
280 &mut self,
281 request: GetTransactionSummariesRequest<Types>,
282 ) -> Result<Vec<TransactionSummary<Types>>, GetTransactionSummariesError> {
283 let range = &request.range;
284 let target = &range.target;
285 let filter = &request.filter;
286
287 let transaction_target_query = match target {
290 TransactionIdentifier::Latest => query(
291 "SELECT block_height AS height, ns_id, position FROM transactions ORDER BY \
292 block_height DESC, ns_id DESC, position DESC LIMIT 1",
293 ),
294 TransactionIdentifier::HeightAndOffset(height, _) => query(
295 "SELECT block_height AS height, ns_id, position FROM transactions WHERE \
296 block_height = $1 ORDER BY ns_id DESC, position DESC LIMIT 1",
297 )
298 .bind(*height as i64),
299 TransactionIdentifier::Hash(hash) => query(
300 "SELECT block_height AS height, ns_id, position FROM transactions WHERE hash = $1 \
301 ORDER BY block_height DESC, ns_id DESC, position DESC LIMIT 1",
302 )
303 .bind(hash.to_string()),
304 };
305 let Some(transaction_target) = transaction_target_query
306 .fetch_optional(self.as_mut())
307 .await?
308 else {
309 return Ok(vec![]);
312 };
313
314 let block_height = transaction_target.get::<i64, _>("height") as usize;
315 let namespace = transaction_target.get::<i64, _>("ns_id");
316 let position = transaction_target.get::<i64, _>("position");
317 let offset = if let TransactionIdentifier::HeightAndOffset(_, offset) = target {
318 *offset
319 } else {
320 0
321 };
322
323 let query_stmt = match filter {
331 TransactionSummaryFilter::RollUp(ns) => {
332 query(&GET_BLOCKS_CONTAINING_TRANSACTIONS_IN_NAMESPACE_QUERY)
333 .bind(block_height as i64)
334 .bind(namespace)
335 .bind(position)
336 .bind((range.num_transactions.get() + offset) as i64)
337 .bind((*ns).into())
338 },
339 TransactionSummaryFilter::None => {
340 query(&GET_BLOCKS_CONTAINING_TRANSACTIONS_NO_FILTER_QUERY)
341 .bind(block_height as i64)
342 .bind(namespace)
343 .bind(position)
344 .bind((range.num_transactions.get() + offset) as i64)
345 },
346
347 TransactionSummaryFilter::Block(block) => {
348 query(&GET_TRANSACTION_SUMMARIES_QUERY_FOR_BLOCK).bind(*block as i64)
349 },
350 };
351
352 let block_stream = query_stmt
353 .fetch(self.as_mut())
354 .map(|row| BlockQueryData::from_row(&row?));
355
356 let transaction_summary_stream = block_stream.flat_map(|row| match row {
357 Ok(block) => {
358 tracing::info!(height = block.height(), "selected block");
359 stream::iter(
360 block
361 .enumerate()
362 .filter(|(ix, _)| {
363 if let TransactionSummaryFilter::RollUp(ns) = filter {
364 let tx_ns = QueryableHeader::<Types>::namespace_id(
365 block.header(),
366 &ix.ns_index,
367 );
368 tx_ns.as_ref() == Some(ns)
369 } else {
370 true
371 }
372 })
373 .enumerate()
374 .map(|(index, (_, txn))| {
375 TransactionSummary::try_from((&block, index, txn)).map_err(|err| {
376 QueryError::Error {
377 message: err.to_string(),
378 }
379 })
380 })
381 .collect::<Vec<QueryResult<TransactionSummary<Types>>>>()
382 .into_iter()
383 .rev()
384 .collect::<Vec<QueryResult<TransactionSummary<Types>>>>(),
385 )
386 },
387 Err(err) => stream::iter(vec![Err(err.into())]),
388 });
389
390 let transaction_summary_vec = transaction_summary_stream
391 .try_collect::<Vec<TransactionSummary<Types>>>()
392 .await?;
393
394 Ok(transaction_summary_vec
395 .into_iter()
396 .skip(offset)
397 .skip_while(|txn| {
398 if let TransactionIdentifier::Hash(hash) = target {
399 txn.hash != *hash
400 } else {
401 false
402 }
403 })
404 .take(range.num_transactions.get())
405 .collect::<Vec<TransactionSummary<Types>>>())
406 }
407
408 async fn get_transaction_detail(
409 &mut self,
410 request: TransactionIdentifier<Types>,
411 ) -> Result<TransactionDetailResponse<Types>, GetTransactionDetailError> {
412 let target = request;
413
414 let query_stmt = match target {
415 TransactionIdentifier::Latest => query(&GET_TRANSACTION_DETAIL_QUERY_FOR_LATEST),
416 TransactionIdentifier::HeightAndOffset(height, offset) => {
417 query(&GET_TRANSACTION_DETAIL_QUERY_FOR_HEIGHT_AND_OFFSET)
418 .bind(height as i64)
419 .bind(offset as i64)
420 },
421 TransactionIdentifier::Hash(hash) => {
422 query(&GET_TRANSACTION_DETAIL_QUERY_FOR_HASH).bind(hash.to_string())
423 },
424 };
425
426 let query_row = query_stmt.fetch_one(self.as_mut()).await?;
427 let block = BlockQueryData::<Types>::from_row(&query_row)?;
428
429 let txns = block.enumerate().map(|(_, txn)| txn).collect::<Vec<_>>();
430
431 let (offset, txn) = match target {
432 TransactionIdentifier::Latest => txns.into_iter().enumerate().next_back().ok_or(
433 GetTransactionDetailError::TransactionNotFound(NotFound {
434 key: "Latest".to_string(),
435 }),
436 ),
437 TransactionIdentifier::HeightAndOffset(height, offset) => {
438 txns.into_iter().enumerate().nth(offset).ok_or(
439 GetTransactionDetailError::TransactionNotFound(NotFound {
440 key: format!("at {height} and {offset}"),
441 }),
442 )
443 },
444 TransactionIdentifier::Hash(hash) => txns
445 .into_iter()
446 .enumerate()
447 .find(|(_, txn)| txn.commit() == hash)
448 .ok_or(GetTransactionDetailError::TransactionNotFound(NotFound {
449 key: format!("hash {hash}"),
450 })),
451 }?;
452
453 Ok(TransactionDetailResponse::try_from((&block, offset, txn))?)
454 }
455
456 async fn get_explorer_summary(
457 &mut self,
458 ) -> Result<ExplorerSummary<Types>, GetExplorerSummaryError> {
459 let histograms = {
460 let histogram_query_result = query(
461 "SELECT
462 h.height AS height,
463 h.timestamp AS timestamp,
464 COALESCE(
465 CAST(h.data -> 'fields' ->> 'timestamp_millis' AS BIGINT),
466 CAST(h.data -> 'fields' ->> 'timestamp' AS BIGINT) * 1000
467 ) - LEAD(COALESCE(
468 CAST(h.data -> 'fields' ->> 'timestamp_millis' AS BIGINT),
469 CAST(h.data -> 'fields' ->> 'timestamp' AS BIGINT) * 1000
470 )) OVER (ORDER BY h.height DESC) as time,
471 p.size AS size,
472 p.num_transactions AS transactions
473 FROM header AS h
474 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
475 WHERE
476 h.height IN (SELECT height FROM header ORDER BY height DESC LIMIT $1)
477 ORDER BY h.height
478 ",
479 )
480 .bind((EXPLORER_SUMMARY_HISTOGRAM_NUM_ENTRIES + 1) as i64)
481 .fetch(self.as_mut());
482
483 let mut histograms: ExplorerHistograms = histogram_query_result
484 .map(|row_stream| {
485 row_stream.map(|row| {
486 let height: i64 = row.try_get("height")?;
487 let timestamp: i64 = row.try_get("timestamp")?;
488 let time: Option<i64> = row.try_get("time")?;
489 let size: Option<i32> = row.try_get("size")?;
490 let num_transactions: i32 = row.try_get("transactions")?;
491
492 Ok((height, timestamp, time, size, num_transactions))
493 })
494 })
495 .try_fold(
496 ExplorerHistograms {
497 block_time: VecDeque::with_capacity(EXPLORER_SUMMARY_HISTOGRAM_NUM_ENTRIES),
498 block_size: VecDeque::with_capacity(EXPLORER_SUMMARY_HISTOGRAM_NUM_ENTRIES),
499 block_transactions: VecDeque::with_capacity(EXPLORER_SUMMARY_HISTOGRAM_NUM_ENTRIES),
500 block_heights: VecDeque::with_capacity(EXPLORER_SUMMARY_HISTOGRAM_NUM_ENTRIES),
501 },
502 |mut histograms: ExplorerHistograms,
503 row: sqlx::Result<(i64, i64, Option<i64>, Option<i32>, i32)>| async {
504 let (height, _timestamp, time, size, num_transactions) = row?;
505
506 histograms.block_time.push_back(time.map(|i| i as f64 / MILLIS_PER_UNIT));
507 histograms.block_size.push_back(size.map(|i| i as u64));
508 histograms.block_transactions.push_back(num_transactions as u64);
509 histograms.block_heights.push_back(height as u64);
510 Ok(histograms)
511 },
512 )
513 .await?;
514
515 while histograms.block_time.len() > EXPLORER_SUMMARY_HISTOGRAM_NUM_ENTRIES {
516 histograms.block_time.pop_front();
517 histograms.block_size.pop_front();
518 histograms.block_transactions.pop_front();
519 histograms.block_heights.pop_front();
520 }
521
522 histograms
523 };
524
525 let genesis_overview = {
526 let blocks = NodeStorage::<Types>::block_height(self).await? as u64;
527 let transactions =
528 NodeStorage::<Types>::count_transactions_in_range(self, .., None).await? as u64;
529 GenesisOverview {
530 rollups: 0,
531 transactions,
532 blocks,
533 }
534 };
535
536 let latest_block: BlockDetail<Types> =
537 self.get_block_detail(BlockIdentifier::Latest).await?;
538
539 let latest_blocks: Vec<BlockSummary<Types>> = self
540 .get_block_summaries(GetBlockSummariesRequest(BlockRange {
541 target: BlockIdentifier::Latest,
542 num_blocks: NonZeroUsize::new(EXPLORER_SUMMARY_NUM_BLOCKS).unwrap(),
543 }))
544 .await?;
545
546 let latest_transactions: Vec<TransactionSummary<Types>> = self
547 .get_transaction_summaries(GetTransactionSummariesRequest {
548 range: TransactionRange {
549 target: TransactionIdentifier::Latest,
550 num_transactions: NonZeroUsize::new(EXPLORER_SUMMARY_NUM_TRANSACTIONS).unwrap(),
551 },
552 filter: TransactionSummaryFilter::None,
553 })
554 .await?;
555
556 Ok(ExplorerSummary {
557 genesis_overview,
558 latest_block,
559 latest_transactions,
560 latest_blocks,
561 histograms,
562 })
563 }
564
565 async fn get_search_results(
566 &mut self,
567 search_query: TaggedBase64,
568 ) -> Result<SearchResult<Types>, GetSearchResultsError> {
569 let search_tag = search_query.tag();
570 let header_tag = Commitment::<Header<Types>>::tag();
571 let tx_tag = Commitment::<HotshotTransaction<Types>>::tag();
572
573 if search_tag != header_tag && search_tag != tx_tag {
574 return Err(GetSearchResultsError::InvalidQuery(errors::BadQuery {}));
575 }
576
577 let search_query_string = search_query.to_string();
578 if search_tag == header_tag {
579 let block_query = format!(
580 "SELECT {BLOCK_COLUMNS}
581 FROM header AS h
582 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
583 WHERE h.hash = $1
584 ORDER BY h.height DESC
585 LIMIT 1"
586 );
587 let row = query(block_query.as_str())
588 .bind(&search_query_string)
589 .fetch_one(self.as_mut())
590 .await?;
591
592 let block = BlockSummary::from_row(&row)?;
593
594 Ok(SearchResult {
595 blocks: vec![block],
596 transactions: Vec::new(),
597 })
598 } else {
599 let transactions_query = format!(
600 "SELECT {BLOCK_COLUMNS}
601 FROM header AS h
602 JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, p.ns_table)
603 JOIN transactions AS t ON h.height = t.block_height
604 WHERE t.hash = $1
605 ORDER BY h.height DESC
606 LIMIT 5"
607 );
608 let transactions_query_rows = query(transactions_query.as_str())
609 .bind(&search_query_string)
610 .fetch(self.as_mut());
611 let transactions_query_result: Vec<TransactionSummary<Types>> = transactions_query_rows
612 .map(|row| -> Result<Vec<TransactionSummary<Types>>, QueryError>{
613 let block = BlockQueryData::<Types>::from_row(&row?)?;
614 let transactions = block
615 .enumerate()
616 .enumerate()
617 .filter(|(_, (_, txn))| txn.commit().to_string() == search_query_string)
618 .map(|(offset, (_, txn))| {
619 Ok(TransactionSummary::try_from((
620 &block, offset, txn,
621 ))?)
622 })
623 .try_collect::<TransactionSummary<Types>, Vec<TransactionSummary<Types>>, QueryError>()?;
624 Ok(transactions)
625 })
626 .try_collect::<Vec<Vec<TransactionSummary<Types>>>>()
627 .await?
628 .into_iter()
629 .flatten()
630 .collect();
631
632 Ok(SearchResult {
633 blocks: Vec::new(),
634 transactions: transactions_query_result,
635 })
636 }
637 }
638}