1use std::ops::{Bound, RangeBounds};
16
17use alloy::primitives::map::HashMap;
18use anyhow::anyhow;
19use async_trait::async_trait;
20use futures::stream::{StreamExt, TryStreamExt};
21use hotshot_types::{
22 data::VidShare,
23 simple_certificate::CertificatePair,
24 traits::{block_contents::BlockHeader, node_implementation::NodeType},
25};
26use snafu::OptionExt;
27use tracing::instrument;
28
29use super::{
30 super::transaction::{Transaction, TransactionMode, Write, query, query_as},
31 DecodeError, HEADER_COLUMNS, QueryBuilder, parse_header,
32};
33use crate::{
34 Header, MissingSnafu, QueryError, QueryResult,
35 availability::{NamespaceId, QueryableHeader},
36 data_source::storage::{
37 Aggregate, AggregatesStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage,
38 },
39 node::{
40 BlockId, ResourceSyncStatus, SyncStatus, SyncStatusQueryData, SyncStatusRange,
41 TimeWindowQueryData, WindowStart,
42 },
43 types::HeightIndexed,
44};
45
46#[async_trait]
47impl<Mode, Types> NodeStorage<Types> for Transaction<Mode>
48where
49 Mode: TransactionMode,
50 Types: NodeType,
51 Header<Types>: QueryableHeader<Types>,
52{
53 async fn block_height(&mut self) -> QueryResult<usize> {
54 match query_as::<(Option<i64>,)>("SELECT max(height) FROM header")
55 .fetch_one(self.as_mut())
56 .await?
57 {
58 (Some(height),) => {
59 Ok(height as usize + 1)
62 },
63 (None,) => {
64 Ok(0)
66 },
67 }
68 }
69
70 async fn count_transactions_in_range(
71 &mut self,
72 range: impl RangeBounds<usize> + Send,
73 namespace: Option<NamespaceId<Types>>,
74 ) -> QueryResult<usize> {
75 let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
76 let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
77 return Ok(0);
78 };
79 let (count,) = query_as::<(i64,)>(
80 "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
81 )
82 .bind(to as i64)
83 .bind(namespace)
84 .fetch_one(self.as_mut())
85 .await?;
86 let mut count = count as usize;
87
88 if from > 0 {
89 let (prev_count,) = query_as::<(i64,)>(
90 "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
91 )
92 .bind((from - 1) as i64)
93 .bind(namespace)
94 .fetch_one(self.as_mut())
95 .await?;
96 count = count.saturating_sub(prev_count as usize);
97 }
98
99 Ok(count)
100 }
101
102 async fn payload_size_in_range(
103 &mut self,
104 range: impl RangeBounds<usize> + Send,
105 namespace: Option<NamespaceId<Types>>,
106 ) -> QueryResult<usize> {
107 let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
108 let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
109 return Ok(0);
110 };
111 let (size,) = query_as::<(i64,)>(
112 "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
113 )
114 .bind(to as i64)
115 .bind(namespace)
116 .fetch_one(self.as_mut())
117 .await?;
118 let mut size = size as usize;
119
120 if from > 0 {
121 let (prev_size,) = query_as::<(i64,)>(
122 "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
123 )
124 .bind((from - 1) as i64)
125 .bind(namespace)
126 .fetch_one(self.as_mut())
127 .await?;
128 size = size.saturating_sub(prev_size as usize);
129 }
130
131 Ok(size)
132 }
133
134 async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
135 where
136 ID: Into<BlockId<Types>> + Send + Sync,
137 {
138 let mut query = QueryBuilder::default();
139 let where_clause = query.header_where_clause(id.into())?;
140 let sql = format!(
143 "SELECT vid_share FROM header AS h
144 WHERE {where_clause}
145 ORDER BY h.height
146 LIMIT 1"
147 );
148 let (share_data,) = query
149 .query_as::<(Option<Vec<u8>>,)>(&sql)
150 .fetch_one(self.as_mut())
151 .await?;
152 let share_data = share_data.context(MissingSnafu)?;
153 let share = bincode::deserialize(&share_data).decode_error("malformed VID share")?;
154 Ok(share)
155 }
156
157 async fn sync_status_for_range(
158 &mut self,
159 from: usize,
160 to: usize,
161 ) -> QueryResult<SyncStatusQueryData> {
162 let blocks = self
165 .sync_status_ranges(
166 "header AS h JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, \
167 p.ns_table)",
168 "height",
169 from,
170 to,
171 )
172 .await?;
173
174 let leaves = if blocks.is_fully_synced() {
175 blocks.clone()
180 } else {
181 self.sync_status_ranges("leaf2", "height", from, to).await?
185 };
186
187 let vid_common = self
189 .sync_status_ranges(
190 "header AS h JOIN vid_common AS v ON h.payload_hash = v.hash",
191 "height",
192 from,
193 to,
194 )
195 .await?;
196
197 Ok(SyncStatusQueryData {
198 leaves,
199 blocks,
200 vid_common,
201 pruned_height: None,
202 })
203 }
204
205 async fn get_header_window(
206 &mut self,
207 start: impl Into<WindowStart<Types>> + Send + Sync,
208 end: u64,
209 limit: usize,
210 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
211 let first_block = match start.into() {
213 WindowStart::Time(t) => {
214 return self.time_window::<Types>(t, end, limit).await;
219 },
220 WindowStart::Height(h) => h,
221 WindowStart::Hash(h) => self.load_header::<Types>(h).await?.block_number(),
222 };
223
224 let sql = format!(
228 "SELECT {HEADER_COLUMNS}
229 FROM header AS h
230 WHERE h.height >= $1 AND h.timestamp < $2
231 ORDER BY h.height
232 LIMIT $3"
233 );
234 let rows = query(&sql)
235 .bind(first_block as i64)
236 .bind(end as i64)
237 .bind(limit as i64)
238 .fetch(self.as_mut());
239 let window = rows
240 .map(|row| parse_header::<Types>(row?))
241 .try_collect::<Vec<_>>()
242 .await?;
243
244 let prev = if first_block > 0 {
246 Some(self.load_header::<Types>(first_block as usize - 1).await?)
247 } else {
248 None
249 };
250
251 let next = if window.len() < limit {
252 let sql = format!(
261 "SELECT {HEADER_COLUMNS}
262 FROM header AS h
263 WHERE h.timestamp >= $1
264 ORDER BY h.timestamp, h.height
265 LIMIT 1"
266 );
267 query(&sql)
268 .bind(end as i64)
269 .fetch_optional(self.as_mut())
270 .await?
271 .map(parse_header::<Types>)
272 .transpose()?
273 } else {
274 tracing::debug!(limit, "cutting off header window request due to limit");
278 None
279 };
280
281 Ok(TimeWindowQueryData { window, prev, next })
282 }
283
284 async fn latest_qc_chain(&mut self) -> QueryResult<Option<[CertificatePair<Types>; 2]>> {
285 let Some((json,)) = query_as("SELECT qcs FROM latest_qc_chain LIMIT 1")
286 .fetch_optional(self.as_mut())
287 .await?
288 else {
289 return Ok(None);
290 };
291 let qcs = serde_json::from_value(json).decode_error("malformed QC")?;
292 Ok(qcs)
293 }
294}
295
296impl<Mode> Transaction<Mode>
297where
298 Mode: TransactionMode,
299{
300 #[instrument(skip(self))]
318 async fn sync_status_ranges(
319 &mut self,
320 table: &str,
321 indicator_column: &str,
322 start: usize,
323 end: usize,
324 ) -> QueryResult<ResourceSyncStatus> {
325 let mut ranges = vec![];
326 tracing::debug!("searching for missing ranges");
327
328 let query = format!(
344 "WITH range AS (SELECT height, {indicator_column} AS indicator FROM {table}
345 WHERE height >= $1 AND height < $2)
346 SELECT successor.height FROM range AS predecessor
347 RIGHT JOIN range AS successor
348 ON successor.height = predecessor.height + 1
349 WHERE successor.indicator IS NOT NULL
350 AND predecessor.indicator IS NULL
351 ORDER BY successor.height"
352 );
353 let range_starts = query_as::<(i64,)>(&query)
354 .bind(start as i64)
355 .bind(end as i64)
356 .fetch_all(self.as_mut())
357 .await?;
358 tracing::debug!(
359 ?range_starts,
360 "found {} starting heights for present ranges",
361 range_starts.len()
362 );
363
364 let range_ends = if range_starts.len() <= 10 {
365 let mut ends = vec![];
371 for (i, &(start,)) in range_starts.iter().enumerate() {
372 let query = format!(
375 "SELECT max(height) from {table}
376 WHERE height < $1 AND {indicator_column} IS NOT NULL"
377 );
378 let upper_bound = if i + 1 < range_starts.len() {
379 range_starts[i + 1].0
380 } else {
381 end as i64
382 };
383 let (end,) = query_as::<(i64,)>(&query)
384 .bind(upper_bound)
385 .fetch_one(self.as_mut())
388 .await?;
389 tracing::debug!(start, end, "found end for present range");
390 ends.push((end,));
391 }
392 ends
393 } else {
394 let query = format!(
400 "WITH range AS (SELECT height, {indicator_column} AS indicator FROM {table}
401 WHERE height >= $1 AND height < $2)
402 SELECT predecessor.height FROM range AS predecessor
403 LEFT JOIN range AS successor
404 ON successor.height = predecessor.height + 1
405 WHERE predecessor.indicator IS NOT NULL
406 AND successor.indicator IS NULL
407 ORDER BY predecessor.height"
408 );
409 let ends = query_as::<(i64,)>(&query)
410 .bind(start as i64)
411 .bind(end as i64)
412 .fetch_all(self.as_mut())
413 .await?;
414 tracing::debug!(
415 ?ends,
416 "found {} ending heights for present ranges",
417 ends.len()
418 );
419 ends
420 };
421
422 if range_starts.len() != range_ends.len() {
424 return Err(QueryError::Error {
425 message: format!(
426 "number of present range starts ({}) does not match number of present range \
427 ends ({})",
428 range_starts.len(),
429 range_ends.len(),
430 ),
431 });
432 }
433
434 let mut prev = start;
438 for ((start,), (end,)) in range_starts.into_iter().zip(range_ends) {
439 let start = start as usize;
440 let end = end as usize;
441
442 if start < prev {
444 return Err(QueryError::Error {
445 message: format!(
446 "found present ranges out of order: range start {start} is before \
447 previous range end {prev}"
448 ),
449 });
450 }
451 if end < start {
452 return Err(QueryError::Error {
453 message: format!("malformed range: start={start}, end={end}"),
454 });
455 }
456
457 if start != prev {
458 tracing::debug!(start = prev, end = start, "found missing range");
461 ranges.push(SyncStatusRange {
462 start: prev,
463 end: start,
464 status: SyncStatus::Missing,
465 });
466 }
467
468 ranges.push(SyncStatusRange {
469 start,
470 end: end + 1, status: SyncStatus::Present,
472 });
473 prev = end + 1;
474 }
475
476 if prev != end {
479 tracing::debug!(start = prev, end, "found missing range");
480 ranges.push(SyncStatusRange {
481 start: prev,
482 end,
483 status: SyncStatus::Missing,
484 });
485 }
486
487 let missing = ranges
488 .iter()
489 .filter_map(|range| {
490 if range.status == SyncStatus::Missing {
491 Some(range.end - range.start)
492 } else {
493 None
494 }
495 })
496 .sum();
497 tracing::debug!(
498 missing,
499 "found missing objects in {} total ranges",
500 ranges.len()
501 );
502
503 Ok(ResourceSyncStatus { missing, ranges })
504 }
505}
506
507impl<Types, Mode: TransactionMode> AggregatesStorage<Types> for Transaction<Mode>
508where
509 Types: NodeType,
510 Header<Types>: QueryableHeader<Types>,
511{
512 async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
513 let (height,): (i64,) = query_as("SELECT coalesce(max(height) + 1, 0) FROM aggregate")
514 .fetch_one(self.as_mut())
515 .await?;
516 Ok(height as usize)
517 }
518
519 async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
520 let res: (Option<i64>,) =
523 query_as("SELECT max(height) FROM aggregate WHERE namespace = -1")
524 .fetch_one(self.as_mut())
525 .await?;
526
527 let (Some(max_height),) = res else {
528 return Ok(None);
529 };
530
531 let rows: Vec<(i64, i64, i64)> = query_as(
532 r#"
533 SELECT namespace, num_transactions, payload_size from aggregate WHERE height = $1
534 "#,
535 )
536 .bind(max_height)
537 .fetch_all(self.as_mut())
538 .await?;
539
540 let mut num_transactions = HashMap::default();
541 let mut payload_size = HashMap::default();
542
543 for (namespace_id, num_tx, payload_sz) in rows {
544 let key = if namespace_id == -1 {
548 None
549 } else {
550 Some(namespace_id.into())
551 };
552 num_transactions.insert(key, num_tx as usize);
553 payload_size.insert(key, payload_sz as usize);
554 }
555
556 Ok(Some(Aggregate {
557 height: max_height,
558 num_transactions,
559 payload_size,
560 }))
561 }
562}
563
564impl<Types: NodeType> UpdateAggregatesStorage<Types> for Transaction<Write>
565where
566 Header<Types>: QueryableHeader<Types>,
567{
568 async fn update_aggregates(
569 &mut self,
570 prev: Aggregate<Types>,
571 blocks: &[PayloadMetadata<Types>],
572 ) -> anyhow::Result<Aggregate<Types>> {
573 let height = blocks[0].height();
574 let (prev_tx_count, prev_size) = (prev.num_transactions, prev.payload_size);
575
576 let mut rows = Vec::new();
577
578 let aggregates = blocks
580 .iter()
581 .scan(
582 (height, prev_tx_count, prev_size),
583 |(height, tx_count, size), block| {
584 if *height != block.height {
585 return Some(Err(anyhow!(
586 "blocks in update_aggregates are not sequential; expected {}, got {}",
587 *height,
588 block.height()
589 )));
590 }
591 *height += 1;
592
593 *tx_count.entry(None).or_insert(0) += block.num_transactions as usize;
598 *size.entry(None).or_insert(0) += block.size as usize;
599
600 rows.push((
603 block.height as i64,
604 -1,
605 tx_count[&None] as i64,
606 size[&None] as i64,
607 ));
608
609 for (&ns_id, info) in &block.namespaces {
611 let key = Some(ns_id);
612
613 *tx_count.entry(key).or_insert(0) += info.num_transactions as usize;
614 *size.entry(key).or_insert(0) += info.size as usize;
615 }
616
617 for ns_id in tx_count.keys().filter_map(|k| k.as_ref()) {
621 let key = Some(*ns_id);
622 rows.push((
623 block.height as i64,
624 (*ns_id).into(),
625 tx_count[&key] as i64,
626 size[&key] as i64,
627 ));
628 }
629
630 Some(Ok((block.height as i64, tx_count.clone(), size.clone())))
631 },
632 )
633 .collect::<anyhow::Result<Vec<_>>>()?;
634 let last_aggregate = aggregates.last().cloned();
635
636 let (height, num_transactions, payload_size) =
637 last_aggregate.ok_or_else(|| anyhow!("no row"))?;
638
639 self.upsert(
640 "aggregate",
641 ["height", "namespace", "num_transactions", "payload_size"],
642 ["height", "namespace"],
643 rows,
644 )
645 .await?;
646 Ok(Aggregate {
647 height,
648 num_transactions,
649 payload_size,
650 })
651 }
652}
653
654impl<Mode: TransactionMode> Transaction<Mode> {
655 async fn time_window<Types: NodeType>(
656 &mut self,
657 start: u64,
658 end: u64,
659 limit: usize,
660 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
661 let sql = format!(
673 "SELECT {HEADER_COLUMNS}
674 FROM header AS h
675 WHERE h.timestamp >= $1 AND h.timestamp < $2
676 ORDER BY h.timestamp, h.height
677 LIMIT $3"
678 );
679 let rows = query(&sql)
680 .bind(start as i64)
681 .bind(end as i64)
682 .bind(limit as i64)
683 .fetch(self.as_mut());
684 let window: Vec<_> = rows
685 .map(|row| parse_header::<Types>(row?))
686 .try_collect()
687 .await?;
688
689 let next = if window.len() < limit {
690 let sql = format!(
692 "SELECT {HEADER_COLUMNS}
693 FROM header AS h
694 WHERE h.timestamp >= $1
695 ORDER BY h.timestamp, h.height
696 LIMIT 1"
697 );
698 query(&sql)
699 .bind(end as i64)
700 .fetch_optional(self.as_mut())
701 .await?
702 .map(parse_header::<Types>)
703 .transpose()?
704 } else {
705 tracing::debug!(limit, "cutting off header window request due to limit");
709 None
710 };
711
712 if window.is_empty() && next.is_none() {
720 return Err(QueryError::NotFound);
721 }
722
723 let sql = format!(
725 "SELECT {HEADER_COLUMNS}
726 FROM header AS h
727 WHERE h.timestamp < $1
728 ORDER BY h.timestamp DESC, h.height DESC
729 LIMIT 1"
730 );
731 let prev = query(&sql)
732 .bind(start as i64)
733 .fetch_optional(self.as_mut())
734 .await?
735 .map(parse_header::<Types>)
736 .transpose()?;
737
738 Ok(TimeWindowQueryData { window, prev, next })
739 }
740}
741
742async fn aggregate_range_bounds<Types>(
747 tx: &mut Transaction<impl TransactionMode>,
748 range: impl RangeBounds<usize>,
749) -> QueryResult<Option<(usize, usize)>>
750where
751 Types: NodeType,
752 Header<Types>: QueryableHeader<Types>,
753{
754 let from = match range.start_bound() {
755 Bound::Included(from) => *from,
756 Bound::Excluded(from) => *from + 1,
757 Bound::Unbounded => 0,
758 };
759 let to = match range.end_bound() {
760 Bound::Included(to) => *to,
761 Bound::Excluded(0) => return Ok(None),
762 Bound::Excluded(to) => *to - 1,
763 Bound::Unbounded => {
764 let height = AggregatesStorage::<Types>::aggregates_height(tx)
765 .await
766 .map_err(|err| QueryError::Error {
767 message: format!("{err:#}"),
768 })?;
769 if height == 0 {
770 return Ok(None);
771 }
772 if height < from {
773 return Ok(None);
774 }
775 height - 1
776 },
777 };
778 Ok(Some((from, to)))
779}
780
781#[cfg(test)]
782mod test {
783 use hotshot_example_types::node_types::TEST_VERSIONS;
784 use hotshot_types::vid::advz::advz_scheme;
785 use itertools::Itertools;
786 use jf_advz::VidScheme;
787 use pretty_assertions::assert_eq;
788
789 use super::*;
790 use crate::{
791 availability::{BlockQueryData, LeafQueryData, VidCommonQueryData},
792 data_source::{
793 Transaction as _, VersionedDataSource,
794 sql::testing::TmpDb,
795 storage::{SqlStorage, StorageConnectionType, UpdateAvailabilityStorage},
796 },
797 testing::mocks::MockTypes,
798 };
799
800 async fn test_sync_status_ranges(start: usize, end: usize, present_ranges: &[(usize, usize)]) {
801 let storage = TmpDb::init().await;
802 let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
803 .await
804 .unwrap();
805
806 let mut leaves: Vec<LeafQueryData<MockTypes>> = vec![
808 LeafQueryData::<MockTypes>::genesis(
809 &Default::default(),
810 &Default::default(),
811 TEST_VERSIONS.test,
812 )
813 .await,
814 ];
815 for i in 1..end {
816 let mut leaf = leaves[i - 1].clone();
817 leaf.leaf.block_header_mut().block_number = i as u64;
818 leaves.push(leaf);
819 }
820
821 {
823 let mut tx = db.write().await.unwrap();
824
825 for &(start, end) in present_ranges {
826 for leaf in &leaves[start..end] {
827 tx.insert_leaf(leaf).await.unwrap();
828 }
829 }
830
831 tx.commit().await.unwrap();
832 }
833
834 let sync_status = db
835 .read()
836 .await
837 .unwrap()
838 .sync_status_ranges("leaf2", "height", start, end)
839 .await
840 .unwrap();
841
842 let present: usize = present_ranges.iter().map(|(start, end)| end - start).sum();
844 let total = end - start;
845 assert_eq!(sync_status.missing, total - present);
846
847 let mut ranges = sync_status.ranges.into_iter();
849 let mut prev = start;
850 for &(start, end) in present_ranges {
851 if start != prev {
852 let range = ranges.next().unwrap();
853 assert_eq!(
854 range,
855 SyncStatusRange {
856 start: prev,
857 end: start,
858 status: SyncStatus::Missing,
859 }
860 );
861 }
862 let range = ranges.next().unwrap();
863 assert_eq!(
864 range,
865 SyncStatusRange {
866 start,
867 end,
868 status: SyncStatus::Present,
869 }
870 );
871 prev = end;
872 }
873
874 if prev != end {
875 let range = ranges.next().unwrap();
876 assert_eq!(
877 range,
878 SyncStatusRange {
879 start: prev,
880 end,
881 status: SyncStatus::Missing,
882 }
883 );
884 }
885
886 assert_eq!(ranges.next(), None);
887 }
888
889 #[tokio::test]
890 #[test_log::test]
891 async fn test_sync_status_ranges_bookends_present() {
892 test_sync_status_ranges(0, 6, &[(0, 2), (4, 6)]).await;
893 }
894
895 #[tokio::test]
896 #[test_log::test]
897 async fn test_sync_status_ranges_bookends_missing() {
898 test_sync_status_ranges(0, 6, &[(2, 4)]).await;
899 }
900
901 #[tokio::test]
902 #[test_log::test]
903 async fn test_sync_status_ranges_start_offset_bookends_present() {
904 test_sync_status_ranges(1, 8, &[(2, 4), (6, 8)]).await;
905 }
906
907 #[tokio::test]
908 #[test_log::test]
909 async fn test_sync_status_ranges_start_offset_bookends_missing() {
910 test_sync_status_ranges(1, 8, &[(4, 6)]).await;
911 }
912
913 #[tokio::test]
914 #[test_log::test]
915 async fn test_sync_status_ranges_singleton_ranges() {
916 test_sync_status_ranges(0, 3, &[(0, 1), (2, 3)]).await;
917 }
918
919 #[tokio::test]
920 #[test_log::test]
921 async fn test_sync_status_ranges_many_ranges_bookends_present() {
922 let ranges = (0..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
923 test_sync_status_ranges(0, 201, &ranges).await;
924 }
925
926 #[tokio::test]
927 #[test_log::test]
928 async fn test_sync_status_ranges_many_ranges_bookends_missing() {
929 let ranges = (1..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
930 test_sync_status_ranges(0, 202, &ranges).await;
931 }
932
933 #[tokio::test]
934 #[test_log::test]
935 async fn test_sync_status_ranges_many_ranges_start_offset_bookends_present() {
936 let ranges = (1..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
937 test_sync_status_ranges(1, 201, &ranges).await;
938 }
939
940 #[tokio::test]
941 #[test_log::test]
942 async fn test_sync_status_ranges_many_ranges_start_offset_bookends_missing() {
943 let ranges = (2..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
944 test_sync_status_ranges(1, 202, &ranges).await;
945 }
946
947 #[tokio::test]
948 #[test_log::test]
949 async fn test_sync_status_duplicate_payload() {
950 let storage = TmpDb::init().await;
951 let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
952 .await
953 .unwrap();
954 let mut vid = advz_scheme(2);
955
956 let mut leaves = vec![
958 LeafQueryData::<MockTypes>::genesis(
959 &Default::default(),
960 &Default::default(),
961 TEST_VERSIONS.test,
962 )
963 .await,
964 ];
965 let mut blocks = vec![
966 BlockQueryData::<MockTypes>::genesis(
967 &Default::default(),
968 &Default::default(),
969 TEST_VERSIONS.test.base,
970 )
971 .await,
972 ];
973 let dispersal = vid.disperse([]).unwrap();
974
975 let mut leaf = leaves[0].clone();
976 leaf.leaf.block_header_mut().block_number += 1;
977 let block = BlockQueryData::new(leaf.header().clone(), blocks[0].payload().clone());
978 leaves.push(leaf);
979 blocks.push(block);
980
981 {
983 let mut tx = db.write().await.unwrap();
984 tx.insert_leaf(&leaves[0]).await.unwrap();
985 tx.commit().await.unwrap();
986 }
987
988 let missing = ResourceSyncStatus {
990 missing: 1,
991 ranges: vec![SyncStatusRange {
992 status: SyncStatus::Missing,
993 start: 0,
994 end: 1,
995 }],
996 };
997 assert_eq!(
998 NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 1)
999 .await
1000 .unwrap(),
1001 SyncStatusQueryData {
1002 leaves: ResourceSyncStatus {
1003 missing: 0,
1004 ranges: vec![SyncStatusRange {
1005 status: SyncStatus::Present,
1006 start: 0,
1007 end: 1,
1008 }]
1009 },
1010 blocks: missing.clone(),
1011 vid_common: missing.clone(),
1012 pruned_height: None,
1013 }
1014 );
1015
1016 {
1018 let mut tx = db.write().await.unwrap();
1019 tx.insert_leaf(&leaves[1]).await.unwrap();
1020 tx.insert_block(&blocks[1]).await.unwrap();
1021 tx.insert_vid(
1022 &VidCommonQueryData::<MockTypes>::new(
1023 leaves[1].header().clone(),
1024 hotshot_types::data::VidCommon::V0(dispersal.common),
1025 ),
1026 Some(&VidShare::V0(dispersal.shares[0].clone())),
1027 )
1028 .await
1029 .unwrap();
1030 tx.commit().await.unwrap();
1031 }
1032
1033 let present = ResourceSyncStatus {
1035 missing: 0,
1036 ranges: vec![SyncStatusRange {
1037 status: SyncStatus::Present,
1038 start: 0,
1039 end: 2,
1040 }],
1041 };
1042 assert_eq!(
1043 NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 2)
1044 .await
1045 .unwrap(),
1046 SyncStatusQueryData {
1047 leaves: present.clone(),
1048 blocks: present.clone(),
1049 vid_common: present,
1050 pruned_height: None,
1051 }
1052 );
1053 }
1054
1055 #[tokio::test]
1056 #[test_log::test]
1057 async fn test_sync_status_same_payload_different_ns_table() {
1058 let storage = TmpDb::init().await;
1059 let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
1060 .await
1061 .unwrap();
1062 let mut vid = advz_scheme(2);
1063
1064 let mut leaves = vec![
1068 LeafQueryData::<MockTypes>::genesis(
1069 &Default::default(),
1070 &Default::default(),
1071 TEST_VERSIONS.test,
1072 )
1073 .await,
1074 ];
1075 let mut blocks = vec![
1076 BlockQueryData::<MockTypes>::genesis(
1077 &Default::default(),
1078 &Default::default(),
1079 TEST_VERSIONS.test.base,
1080 )
1081 .await,
1082 ];
1083 let dispersal = vid.disperse([]).unwrap();
1084
1085 let mut leaf = leaves[0].clone();
1086 leaf.leaf.block_header_mut().block_number += 1;
1087 leaf.leaf.block_header_mut().metadata.num_transactions += 1;
1088 let block = BlockQueryData::new(leaf.header().clone(), blocks[0].payload().clone());
1089 leaves.push(leaf);
1090 blocks.push(block);
1091
1092 {
1094 let mut tx = db.write().await.unwrap();
1095 tx.insert_leaf(&leaves[0]).await.unwrap();
1096 tx.commit().await.unwrap();
1097 }
1098
1099 let missing = ResourceSyncStatus {
1101 missing: 1,
1102 ranges: vec![SyncStatusRange {
1103 status: SyncStatus::Missing,
1104 start: 0,
1105 end: 1,
1106 }],
1107 };
1108 assert_eq!(
1109 NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 1)
1110 .await
1111 .unwrap(),
1112 SyncStatusQueryData {
1113 leaves: ResourceSyncStatus {
1114 missing: 0,
1115 ranges: vec![SyncStatusRange {
1116 status: SyncStatus::Present,
1117 start: 0,
1118 end: 1,
1119 }]
1120 },
1121 blocks: missing.clone(),
1122 vid_common: missing.clone(),
1123 pruned_height: None,
1124 }
1125 );
1126
1127 {
1129 let mut tx = db.write().await.unwrap();
1130 tx.insert_leaf(&leaves[1]).await.unwrap();
1131 tx.insert_block(&blocks[1]).await.unwrap();
1132 tx.insert_vid(
1133 &VidCommonQueryData::<MockTypes>::new(
1134 leaves[1].header().clone(),
1135 hotshot_types::data::VidCommon::V0(dispersal.common),
1136 ),
1137 Some(&VidShare::V0(dispersal.shares[0].clone())),
1138 )
1139 .await
1140 .unwrap();
1141 tx.commit().await.unwrap();
1142 }
1143
1144 let present = ResourceSyncStatus {
1148 missing: 0,
1149 ranges: vec![SyncStatusRange {
1150 status: SyncStatus::Present,
1151 start: 0,
1152 end: 2,
1153 }],
1154 };
1155 let missing = ResourceSyncStatus {
1156 missing: 1,
1157 ranges: vec![
1158 SyncStatusRange {
1159 status: SyncStatus::Missing,
1160 start: 0,
1161 end: 1,
1162 },
1163 SyncStatusRange {
1164 status: SyncStatus::Present,
1165 start: 1,
1166 end: 2,
1167 },
1168 ],
1169 };
1170 assert_eq!(
1171 NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 2)
1172 .await
1173 .unwrap(),
1174 SyncStatusQueryData {
1175 leaves: present.clone(),
1176 blocks: missing.clone(),
1177 vid_common: present,
1178 pruned_height: None,
1179 }
1180 );
1181 }
1182}