1use std::ops::{Bound, RangeBounds};
16
17use alloy::primitives::map::HashMap;
18use anyhow::anyhow;
19use async_trait::async_trait;
20use futures::stream::{StreamExt, TryStreamExt};
21use hotshot_types::{
22 data::VidShare,
23 simple_certificate::CertificatePair,
24 traits::{block_contents::BlockHeader, node_implementation::NodeType},
25};
26use snafu::OptionExt;
27use tracing::instrument;
28
29use super::{
30 super::transaction::{Transaction, TransactionMode, Write, query, query_as},
31 DecodeError, HEADER_COLUMNS, QueryBuilder, parse_header,
32};
33use crate::{
34 Header, MissingSnafu, QueryError, QueryResult,
35 availability::{Certificate2, NamespaceId, QueryableHeader},
36 data_source::storage::{
37 Aggregate, AggregatesStorage, NodeStorage, PayloadMetadata, UpdateAggregatesStorage,
38 },
39 node::{
40 BlockId, ResourceSyncStatus, SyncStatus, SyncStatusQueryData, SyncStatusRange,
41 TimeWindowQueryData, WindowStart,
42 },
43 types::HeightIndexed,
44};
45
46#[async_trait]
47impl<Mode, Types> NodeStorage<Types> for Transaction<Mode>
48where
49 Mode: TransactionMode,
50 Types: NodeType,
51 Header<Types>: QueryableHeader<Types>,
52{
53 async fn block_height(&mut self) -> QueryResult<usize> {
54 match query_as::<(Option<i64>,)>("SELECT max(height) FROM header")
55 .fetch_one(self.as_mut())
56 .await?
57 {
58 (Some(height),) => {
59 Ok(height as usize + 1)
62 },
63 (None,) => {
64 Ok(0)
66 },
67 }
68 }
69
70 async fn count_transactions_in_range(
71 &mut self,
72 range: impl RangeBounds<usize> + Send,
73 namespace: Option<NamespaceId<Types>>,
74 ) -> QueryResult<usize> {
75 let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
76 let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
77 return Ok(0);
78 };
79 let (count,) = query_as::<(i64,)>(
80 "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
81 )
82 .bind(to as i64)
83 .bind(namespace)
84 .fetch_one(self.as_mut())
85 .await?;
86 let mut count = count as usize;
87
88 if from > 0 {
89 let (prev_count,) = query_as::<(i64,)>(
90 "SELECT num_transactions FROM aggregate WHERE height = $1 AND namespace = $2",
91 )
92 .bind((from - 1) as i64)
93 .bind(namespace)
94 .fetch_one(self.as_mut())
95 .await?;
96 count = count.saturating_sub(prev_count as usize);
97 }
98
99 Ok(count)
100 }
101
102 async fn payload_size_in_range(
103 &mut self,
104 range: impl RangeBounds<usize> + Send,
105 namespace: Option<NamespaceId<Types>>,
106 ) -> QueryResult<usize> {
107 let namespace: i64 = namespace.map(|ns| ns.into()).unwrap_or(-1);
108 let Some((from, to)) = aggregate_range_bounds::<Types>(self, range).await? else {
109 return Ok(0);
110 };
111 let (size,) = query_as::<(i64,)>(
112 "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
113 )
114 .bind(to as i64)
115 .bind(namespace)
116 .fetch_one(self.as_mut())
117 .await?;
118 let mut size = size as usize;
119
120 if from > 0 {
121 let (prev_size,) = query_as::<(i64,)>(
122 "SELECT payload_size FROM aggregate WHERE height = $1 AND namespace = $2",
123 )
124 .bind((from - 1) as i64)
125 .bind(namespace)
126 .fetch_one(self.as_mut())
127 .await?;
128 size = size.saturating_sub(prev_size as usize);
129 }
130
131 Ok(size)
132 }
133
134 async fn vid_share<ID>(&mut self, id: ID) -> QueryResult<VidShare>
135 where
136 ID: Into<BlockId<Types>> + Send + Sync,
137 {
138 let mut query = QueryBuilder::default();
139 let where_clause = query.header_where_clause(id.into())?;
140 let sql = format!(
143 "SELECT vid_share FROM header AS h
144 WHERE {where_clause}
145 ORDER BY h.height
146 LIMIT 1"
147 );
148 let (share_data,) = query
149 .query_as::<(Option<Vec<u8>>,)>(&sql)
150 .fetch_one(self.as_mut())
151 .await?;
152 let share_data = share_data.context(MissingSnafu)?;
153 let share = bincode::deserialize(&share_data).decode_error("malformed VID share")?;
154 Ok(share)
155 }
156
157 async fn sync_status_for_range(
158 &mut self,
159 from: usize,
160 to: usize,
161 ) -> QueryResult<SyncStatusQueryData> {
162 let blocks = self
165 .sync_status_ranges(
166 "header AS h JOIN payload AS p ON (h.payload_hash, h.ns_table) = (p.hash, \
167 p.ns_table)",
168 "height",
169 from,
170 to,
171 )
172 .await?;
173
174 let leaves = if blocks.is_fully_synced() {
175 blocks.clone()
180 } else {
181 self.sync_status_ranges("leaf2", "height", from, to).await?
185 };
186
187 let vid_common = self
189 .sync_status_ranges(
190 "header AS h JOIN vid_common AS v ON h.payload_hash = v.hash",
191 "height",
192 from,
193 to,
194 )
195 .await?;
196
197 Ok(SyncStatusQueryData {
198 leaves,
199 blocks,
200 vid_common,
201 pruned_height: None,
202 })
203 }
204
205 async fn get_header_window(
206 &mut self,
207 start: impl Into<WindowStart<Types>> + Send + Sync,
208 end: u64,
209 limit: usize,
210 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
211 let first_block = match start.into() {
213 WindowStart::Time(t) => {
214 return self.time_window::<Types>(t, end, limit).await;
219 },
220 WindowStart::Height(h) => h,
221 WindowStart::Hash(h) => self.load_header::<Types>(h).await?.block_number(),
222 };
223
224 let sql = format!(
228 "SELECT {HEADER_COLUMNS}
229 FROM header AS h
230 WHERE h.height >= $1 AND h.timestamp < $2
231 ORDER BY h.height
232 LIMIT $3"
233 );
234 let rows = query(&sql)
235 .bind(first_block as i64)
236 .bind(end as i64)
237 .bind(limit as i64)
238 .fetch(self.as_mut());
239 let window = rows
240 .map(|row| parse_header::<Types, _>(row?))
241 .try_collect::<Vec<_>>()
242 .await?;
243
244 let prev = if first_block > 0 {
246 Some(self.load_header::<Types>(first_block as usize - 1).await?)
247 } else {
248 None
249 };
250
251 let next = if window.len() < limit {
252 let sql = format!(
261 "SELECT {HEADER_COLUMNS}
262 FROM header AS h
263 WHERE h.timestamp >= $1
264 ORDER BY h.timestamp, h.height
265 LIMIT 1"
266 );
267 query(&sql)
268 .bind(end as i64)
269 .fetch_optional(self.as_mut())
270 .await?
271 .map(parse_header::<Types, _>)
272 .transpose()?
273 } else {
274 tracing::debug!(limit, "cutting off header window request due to limit");
278 None
279 };
280
281 Ok(TimeWindowQueryData { window, prev, next })
282 }
283
284 async fn latest_qc_chain(&mut self) -> QueryResult<Option<[CertificatePair<Types>; 2]>> {
285 let Some((json,)) = query_as("SELECT qcs FROM latest_qc_chain LIMIT 1")
286 .fetch_optional(self.as_mut())
287 .await?
288 else {
289 return Ok(None);
290 };
291 let qcs = serde_json::from_value(json).decode_error("malformed QC")?;
292 Ok(qcs)
293 }
294
295 async fn load_cert2(&mut self, height: u64) -> QueryResult<Option<Certificate2<Types>>> {
296 let Some((json,)) = query_as("SELECT data FROM cert2 WHERE height = $1")
297 .bind(height as i64)
298 .fetch_optional(self.as_mut())
299 .await?
300 else {
301 return Ok(None);
302 };
303 let cert2 = serde_json::from_value(json).decode_error("malformed cert2")?;
304 Ok(cert2)
305 }
306
307 async fn load_earliest_cert2(
308 &mut self,
309 height: u64,
310 ) -> QueryResult<Option<Certificate2<Types>>> {
311 let Some((_h, json)): Option<(i64, serde_json::Value)> = query_as(
312 "SELECT height, data FROM cert2 WHERE height >= $1 ORDER BY height ASC LIMIT 1",
313 )
314 .bind(height as i64)
315 .fetch_optional(self.as_mut())
316 .await?
317 else {
318 return Ok(None);
319 };
320 let cert2 = serde_json::from_value(json).decode_error("malformed cert2")?;
321 Ok(Some(cert2))
322 }
323}
324
325impl<Mode> Transaction<Mode>
326where
327 Mode: TransactionMode,
328{
329 #[instrument(skip(self))]
347 async fn sync_status_ranges(
348 &mut self,
349 table: &str,
350 indicator_column: &str,
351 start: usize,
352 end: usize,
353 ) -> QueryResult<ResourceSyncStatus> {
354 let mut ranges = vec![];
355 tracing::debug!("searching for missing ranges");
356
357 let query = format!(
373 "WITH range AS (SELECT height, {indicator_column} AS indicator FROM {table}
374 WHERE height >= $1 AND height < $2)
375 SELECT successor.height FROM range AS predecessor
376 RIGHT JOIN range AS successor
377 ON successor.height = predecessor.height + 1
378 WHERE successor.indicator IS NOT NULL
379 AND predecessor.indicator IS NULL
380 ORDER BY successor.height"
381 );
382 let range_starts = query_as::<(i64,)>(&query)
383 .bind(start as i64)
384 .bind(end as i64)
385 .fetch_all(self.as_mut())
386 .await?;
387 tracing::debug!(
388 ?range_starts,
389 "found {} starting heights for present ranges",
390 range_starts.len()
391 );
392
393 let range_ends = if range_starts.len() <= 10 {
394 let mut ends = vec![];
400 for (i, &(start,)) in range_starts.iter().enumerate() {
401 let query = format!(
404 "SELECT height FROM {table}
405 WHERE height < $1 AND {indicator_column} IS NOT NULL
406 ORDER BY height DESC
407 LIMIT 1"
408 );
409 let upper_bound = if i + 1 < range_starts.len() {
410 range_starts[i + 1].0
411 } else {
412 end as i64
413 };
414 let (end,) = query_as::<(i64,)>(&query)
415 .bind(upper_bound)
416 .fetch_one(self.as_mut())
419 .await?;
420 tracing::debug!(start, end, "found end for present range");
421 ends.push((end,));
422 }
423 ends
424 } else {
425 let query = format!(
431 "WITH range AS (SELECT height, {indicator_column} AS indicator FROM {table}
432 WHERE height >= $1 AND height < $2)
433 SELECT predecessor.height FROM range AS predecessor
434 LEFT JOIN range AS successor
435 ON successor.height = predecessor.height + 1
436 WHERE predecessor.indicator IS NOT NULL
437 AND successor.indicator IS NULL
438 ORDER BY predecessor.height"
439 );
440 let ends = query_as::<(i64,)>(&query)
441 .bind(start as i64)
442 .bind(end as i64)
443 .fetch_all(self.as_mut())
444 .await?;
445 tracing::debug!(
446 ?ends,
447 "found {} ending heights for present ranges",
448 ends.len()
449 );
450 ends
451 };
452
453 if range_starts.len() != range_ends.len() {
455 return Err(QueryError::Error {
456 message: format!(
457 "number of present range starts ({}) does not match number of present range \
458 ends ({})",
459 range_starts.len(),
460 range_ends.len(),
461 ),
462 });
463 }
464
465 let mut prev = start;
469 for ((start,), (end,)) in range_starts.into_iter().zip(range_ends) {
470 let start = start as usize;
471 let end = end as usize;
472
473 if start < prev {
475 return Err(QueryError::Error {
476 message: format!(
477 "found present ranges out of order: range start {start} is before \
478 previous range end {prev}"
479 ),
480 });
481 }
482 if end < start {
483 return Err(QueryError::Error {
484 message: format!("malformed range: start={start}, end={end}"),
485 });
486 }
487
488 if start != prev {
489 tracing::debug!(start = prev, end = start, "found missing range");
492 ranges.push(SyncStatusRange {
493 start: prev,
494 end: start,
495 status: SyncStatus::Missing,
496 });
497 }
498
499 ranges.push(SyncStatusRange {
500 start,
501 end: end + 1, status: SyncStatus::Present,
503 });
504 prev = end + 1;
505 }
506
507 if prev != end {
510 tracing::debug!(start = prev, end, "found missing range");
511 ranges.push(SyncStatusRange {
512 start: prev,
513 end,
514 status: SyncStatus::Missing,
515 });
516 }
517
518 let missing = ranges
519 .iter()
520 .filter_map(|range| {
521 if range.status == SyncStatus::Missing {
522 Some(range.end - range.start)
523 } else {
524 None
525 }
526 })
527 .sum();
528 tracing::debug!(
529 missing,
530 "found missing objects in {} total ranges",
531 ranges.len()
532 );
533
534 Ok(ResourceSyncStatus { missing, ranges })
535 }
536}
537
538impl<Types, Mode: TransactionMode> AggregatesStorage<Types> for Transaction<Mode>
539where
540 Types: NodeType,
541 Header<Types>: QueryableHeader<Types>,
542{
543 async fn aggregates_height(&mut self) -> anyhow::Result<usize> {
544 let (height,): (i64,) = query_as("SELECT coalesce(max(height) + 1, 0) FROM aggregate")
545 .fetch_one(self.as_mut())
546 .await?;
547 Ok(height as usize)
548 }
549
550 async fn load_prev_aggregate(&mut self) -> anyhow::Result<Option<Aggregate<Types>>> {
551 let res: (Option<i64>,) =
554 query_as("SELECT max(height) FROM aggregate WHERE namespace = -1")
555 .fetch_one(self.as_mut())
556 .await?;
557
558 let (Some(max_height),) = res else {
559 return Ok(None);
560 };
561
562 let rows: Vec<(i64, i64, i64)> = query_as(
563 r#"
564 SELECT namespace, num_transactions, payload_size from aggregate WHERE height = $1
565 "#,
566 )
567 .bind(max_height)
568 .fetch_all(self.as_mut())
569 .await?;
570
571 let mut num_transactions = HashMap::default();
572 let mut payload_size = HashMap::default();
573
574 for (namespace_id, num_tx, payload_sz) in rows {
575 let key = if namespace_id == -1 {
579 None
580 } else {
581 Some(namespace_id.into())
582 };
583 num_transactions.insert(key, num_tx as usize);
584 payload_size.insert(key, payload_sz as usize);
585 }
586
587 Ok(Some(Aggregate {
588 height: max_height,
589 num_transactions,
590 payload_size,
591 }))
592 }
593}
594
595impl<Types: NodeType> UpdateAggregatesStorage<Types> for Transaction<Write>
596where
597 Header<Types>: QueryableHeader<Types>,
598{
599 async fn update_aggregates(
600 &mut self,
601 prev: Aggregate<Types>,
602 blocks: &[PayloadMetadata<Types>],
603 ) -> anyhow::Result<Aggregate<Types>> {
604 let height = blocks[0].height();
605 let (prev_tx_count, prev_size) = (prev.num_transactions, prev.payload_size);
606
607 let mut rows = Vec::new();
608
609 let aggregates = blocks
611 .iter()
612 .scan(
613 (height, prev_tx_count, prev_size),
614 |(height, tx_count, size), block| {
615 if *height != block.height {
616 return Some(Err(anyhow!(
617 "blocks in update_aggregates are not sequential; expected {}, got {}",
618 *height,
619 block.height()
620 )));
621 }
622 *height += 1;
623
624 *tx_count.entry(None).or_insert(0) += block.num_transactions as usize;
629 *size.entry(None).or_insert(0) += block.size as usize;
630
631 rows.push((
634 block.height as i64,
635 -1,
636 tx_count[&None] as i64,
637 size[&None] as i64,
638 ));
639
640 for (&ns_id, info) in &block.namespaces {
642 let key = Some(ns_id);
643
644 *tx_count.entry(key).or_insert(0) += info.num_transactions as usize;
645 *size.entry(key).or_insert(0) += info.size as usize;
646 }
647
648 for ns_id in tx_count.keys().filter_map(|k| k.as_ref()) {
652 let key = Some(*ns_id);
653 rows.push((
654 block.height as i64,
655 (*ns_id).into(),
656 tx_count[&key] as i64,
657 size[&key] as i64,
658 ));
659 }
660
661 Some(Ok((block.height as i64, tx_count.clone(), size.clone())))
662 },
663 )
664 .collect::<anyhow::Result<Vec<_>>>()?;
665 let last_aggregate = aggregates.last().cloned();
666
667 let (height, num_transactions, payload_size) =
668 last_aggregate.ok_or_else(|| anyhow!("no row"))?;
669
670 self.upsert(
671 "aggregate",
672 ["height", "namespace", "num_transactions", "payload_size"],
673 ["height", "namespace"],
674 rows,
675 )
676 .await?;
677 Ok(Aggregate {
678 height,
679 num_transactions,
680 payload_size,
681 })
682 }
683}
684
685impl<Mode: TransactionMode> Transaction<Mode> {
686 async fn time_window<Types: NodeType>(
687 &mut self,
688 start: u64,
689 end: u64,
690 limit: usize,
691 ) -> QueryResult<TimeWindowQueryData<Header<Types>>> {
692 let sql = format!(
704 "SELECT {HEADER_COLUMNS}
705 FROM header AS h
706 WHERE h.timestamp >= $1 AND h.timestamp < $2
707 ORDER BY h.timestamp, h.height
708 LIMIT $3"
709 );
710 let rows = query(&sql)
711 .bind(start as i64)
712 .bind(end as i64)
713 .bind(limit as i64)
714 .fetch(self.as_mut());
715 let window: Vec<_> = rows
716 .map(|row| parse_header::<Types, _>(row?))
717 .try_collect()
718 .await?;
719
720 let next = if window.len() < limit {
721 let sql = format!(
723 "SELECT {HEADER_COLUMNS}
724 FROM header AS h
725 WHERE h.timestamp >= $1
726 ORDER BY h.timestamp, h.height
727 LIMIT 1"
728 );
729 query(&sql)
730 .bind(end as i64)
731 .fetch_optional(self.as_mut())
732 .await?
733 .map(parse_header::<Types, _>)
734 .transpose()?
735 } else {
736 tracing::debug!(limit, "cutting off header window request due to limit");
740 None
741 };
742
743 if window.is_empty() && next.is_none() {
751 return Err(QueryError::NotFound);
752 }
753
754 let sql = format!(
756 "SELECT {HEADER_COLUMNS}
757 FROM header AS h
758 WHERE h.timestamp < $1
759 ORDER BY h.timestamp DESC, h.height DESC
760 LIMIT 1"
761 );
762 let prev = query(&sql)
763 .bind(start as i64)
764 .fetch_optional(self.as_mut())
765 .await?
766 .map(parse_header::<Types, _>)
767 .transpose()?;
768
769 Ok(TimeWindowQueryData { window, prev, next })
770 }
771}
772
773async fn aggregate_range_bounds<Types>(
778 tx: &mut Transaction<impl TransactionMode>,
779 range: impl RangeBounds<usize>,
780) -> QueryResult<Option<(usize, usize)>>
781where
782 Types: NodeType,
783 Header<Types>: QueryableHeader<Types>,
784{
785 let from = match range.start_bound() {
786 Bound::Included(from) => *from,
787 Bound::Excluded(from) => *from + 1,
788 Bound::Unbounded => 0,
789 };
790 let to = match range.end_bound() {
791 Bound::Included(to) => *to,
792 Bound::Excluded(0) => return Ok(None),
793 Bound::Excluded(to) => *to - 1,
794 Bound::Unbounded => {
795 let height = AggregatesStorage::<Types>::aggregates_height(tx)
796 .await
797 .map_err(|err| QueryError::Error {
798 message: format!("{err:#}"),
799 })?;
800 if height == 0 {
801 return Ok(None);
802 }
803 if height < from {
804 return Ok(None);
805 }
806 height - 1
807 },
808 };
809 Ok(Some((from, to)))
810}
811
812#[cfg(test)]
813mod test {
814 use hotshot_example_types::node_types::TEST_VERSIONS;
815 use hotshot_types::vid::advz::advz_scheme;
816 use itertools::Itertools;
817 use jf_advz::VidScheme;
818 use pretty_assertions::assert_eq;
819
820 use super::*;
821 use crate::{
822 availability::{BlockQueryData, LeafQueryData, VidCommonQueryData},
823 data_source::{
824 Transaction as _, VersionedDataSource,
825 sql::testing::TmpDb,
826 storage::{SqlStorage, StorageConnectionType, UpdateAvailabilityStorage},
827 },
828 testing::mocks::MockTypes,
829 };
830
831 async fn test_sync_status_ranges(start: usize, end: usize, present_ranges: &[(usize, usize)]) {
832 let storage = TmpDb::init().await;
833 let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
834 .await
835 .unwrap();
836
837 let mut leaves: Vec<LeafQueryData<MockTypes>> = vec![
839 LeafQueryData::<MockTypes>::genesis(
840 &Default::default(),
841 &Default::default(),
842 TEST_VERSIONS.test,
843 )
844 .await,
845 ];
846 for i in 1..end {
847 let mut leaf = leaves[i - 1].clone();
848 leaf.leaf.block_header_mut().block_number = i as u64;
849 leaves.push(leaf);
850 }
851
852 {
854 let mut tx = db.write().await.unwrap();
855
856 for &(start, end) in present_ranges {
857 for leaf in &leaves[start..end] {
858 tx.insert_leaf(leaf).await.unwrap();
859 }
860 }
861
862 tx.commit().await.unwrap();
863 }
864
865 let sync_status = db
866 .read()
867 .await
868 .unwrap()
869 .sync_status_ranges("leaf2", "height", start, end)
870 .await
871 .unwrap();
872
873 let present: usize = present_ranges.iter().map(|(start, end)| end - start).sum();
875 let total = end - start;
876 assert_eq!(sync_status.missing, total - present);
877
878 let mut ranges = sync_status.ranges.into_iter();
880 let mut prev = start;
881 for &(start, end) in present_ranges {
882 if start != prev {
883 let range = ranges.next().unwrap();
884 assert_eq!(
885 range,
886 SyncStatusRange {
887 start: prev,
888 end: start,
889 status: SyncStatus::Missing,
890 }
891 );
892 }
893 let range = ranges.next().unwrap();
894 assert_eq!(
895 range,
896 SyncStatusRange {
897 start,
898 end,
899 status: SyncStatus::Present,
900 }
901 );
902 prev = end;
903 }
904
905 if prev != end {
906 let range = ranges.next().unwrap();
907 assert_eq!(
908 range,
909 SyncStatusRange {
910 start: prev,
911 end,
912 status: SyncStatus::Missing,
913 }
914 );
915 }
916
917 assert_eq!(ranges.next(), None);
918 }
919
920 #[tokio::test]
921 #[test_log::test]
922 async fn test_sync_status_ranges_bookends_present() {
923 test_sync_status_ranges(0, 6, &[(0, 2), (4, 6)]).await;
924 }
925
926 #[tokio::test]
927 #[test_log::test]
928 async fn test_sync_status_ranges_bookends_missing() {
929 test_sync_status_ranges(0, 6, &[(2, 4)]).await;
930 }
931
932 #[tokio::test]
933 #[test_log::test]
934 async fn test_sync_status_ranges_start_offset_bookends_present() {
935 test_sync_status_ranges(1, 8, &[(2, 4), (6, 8)]).await;
936 }
937
938 #[tokio::test]
939 #[test_log::test]
940 async fn test_sync_status_ranges_start_offset_bookends_missing() {
941 test_sync_status_ranges(1, 8, &[(4, 6)]).await;
942 }
943
944 #[tokio::test]
945 #[test_log::test]
946 async fn test_sync_status_ranges_singleton_ranges() {
947 test_sync_status_ranges(0, 3, &[(0, 1), (2, 3)]).await;
948 }
949
950 #[tokio::test]
951 #[test_log::test]
952 async fn test_sync_status_ranges_many_ranges_bookends_present() {
953 let ranges = (0..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
954 test_sync_status_ranges(0, 201, &ranges).await;
955 }
956
957 #[tokio::test]
958 #[test_log::test]
959 async fn test_sync_status_ranges_many_ranges_bookends_missing() {
960 let ranges = (1..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
961 test_sync_status_ranges(0, 202, &ranges).await;
962 }
963
964 #[tokio::test]
965 #[test_log::test]
966 async fn test_sync_status_ranges_many_ranges_start_offset_bookends_present() {
967 let ranges = (1..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
968 test_sync_status_ranges(1, 201, &ranges).await;
969 }
970
971 #[tokio::test]
972 #[test_log::test]
973 async fn test_sync_status_ranges_many_ranges_start_offset_bookends_missing() {
974 let ranges = (2..=100).map(|i| (2 * i, 2 * i + 1)).collect_vec();
975 test_sync_status_ranges(1, 202, &ranges).await;
976 }
977
978 #[tokio::test]
979 #[test_log::test]
980 async fn test_sync_status_duplicate_payload() {
981 let storage = TmpDb::init().await;
982 let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
983 .await
984 .unwrap();
985 let mut vid = advz_scheme(2);
986
987 let mut leaves = vec![
989 LeafQueryData::<MockTypes>::genesis(
990 &Default::default(),
991 &Default::default(),
992 TEST_VERSIONS.test,
993 )
994 .await,
995 ];
996 let mut blocks = vec![
997 BlockQueryData::<MockTypes>::genesis(
998 &Default::default(),
999 &Default::default(),
1000 TEST_VERSIONS.test.base,
1001 )
1002 .await,
1003 ];
1004 let dispersal = vid.disperse([]).unwrap();
1005
1006 let mut leaf = leaves[0].clone();
1007 leaf.leaf.block_header_mut().block_number += 1;
1008 let block = BlockQueryData::new(leaf.header().clone(), blocks[0].payload().clone());
1009 leaves.push(leaf);
1010 blocks.push(block);
1011
1012 {
1014 let mut tx = db.write().await.unwrap();
1015 tx.insert_leaf(&leaves[0]).await.unwrap();
1016 tx.commit().await.unwrap();
1017 }
1018
1019 let missing = ResourceSyncStatus {
1021 missing: 1,
1022 ranges: vec![SyncStatusRange {
1023 status: SyncStatus::Missing,
1024 start: 0,
1025 end: 1,
1026 }],
1027 };
1028 assert_eq!(
1029 NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 1)
1030 .await
1031 .unwrap(),
1032 SyncStatusQueryData {
1033 leaves: ResourceSyncStatus {
1034 missing: 0,
1035 ranges: vec![SyncStatusRange {
1036 status: SyncStatus::Present,
1037 start: 0,
1038 end: 1,
1039 }]
1040 },
1041 blocks: missing.clone(),
1042 vid_common: missing.clone(),
1043 pruned_height: None,
1044 }
1045 );
1046
1047 {
1049 let mut tx = db.write().await.unwrap();
1050 tx.insert_leaf(&leaves[1]).await.unwrap();
1051 tx.insert_block(&blocks[1]).await.unwrap();
1052 tx.insert_vid(
1053 &VidCommonQueryData::<MockTypes>::new(
1054 leaves[1].header().clone(),
1055 hotshot_types::data::VidCommon::V0(dispersal.common),
1056 ),
1057 Some(&VidShare::V0(dispersal.shares[0].clone())),
1058 )
1059 .await
1060 .unwrap();
1061 tx.commit().await.unwrap();
1062 }
1063
1064 let present = ResourceSyncStatus {
1066 missing: 0,
1067 ranges: vec![SyncStatusRange {
1068 status: SyncStatus::Present,
1069 start: 0,
1070 end: 2,
1071 }],
1072 };
1073 assert_eq!(
1074 NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 2)
1075 .await
1076 .unwrap(),
1077 SyncStatusQueryData {
1078 leaves: present.clone(),
1079 blocks: present.clone(),
1080 vid_common: present,
1081 pruned_height: None,
1082 }
1083 );
1084 }
1085
1086 #[tokio::test]
1087 #[test_log::test]
1088 async fn test_sync_status_same_payload_different_ns_table() {
1089 let storage = TmpDb::init().await;
1090 let db = SqlStorage::connect(storage.config(), StorageConnectionType::Query)
1091 .await
1092 .unwrap();
1093 let mut vid = advz_scheme(2);
1094
1095 let mut leaves = vec![
1099 LeafQueryData::<MockTypes>::genesis(
1100 &Default::default(),
1101 &Default::default(),
1102 TEST_VERSIONS.test,
1103 )
1104 .await,
1105 ];
1106 let mut blocks = vec![
1107 BlockQueryData::<MockTypes>::genesis(
1108 &Default::default(),
1109 &Default::default(),
1110 TEST_VERSIONS.test.base,
1111 )
1112 .await,
1113 ];
1114 let dispersal = vid.disperse([]).unwrap();
1115
1116 let mut leaf = leaves[0].clone();
1117 leaf.leaf.block_header_mut().block_number += 1;
1118 leaf.leaf.block_header_mut().metadata.num_transactions += 1;
1119 let block = BlockQueryData::new(leaf.header().clone(), blocks[0].payload().clone());
1120 leaves.push(leaf);
1121 blocks.push(block);
1122
1123 {
1125 let mut tx = db.write().await.unwrap();
1126 tx.insert_leaf(&leaves[0]).await.unwrap();
1127 tx.commit().await.unwrap();
1128 }
1129
1130 let missing = ResourceSyncStatus {
1132 missing: 1,
1133 ranges: vec![SyncStatusRange {
1134 status: SyncStatus::Missing,
1135 start: 0,
1136 end: 1,
1137 }],
1138 };
1139 assert_eq!(
1140 NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 1)
1141 .await
1142 .unwrap(),
1143 SyncStatusQueryData {
1144 leaves: ResourceSyncStatus {
1145 missing: 0,
1146 ranges: vec![SyncStatusRange {
1147 status: SyncStatus::Present,
1148 start: 0,
1149 end: 1,
1150 }]
1151 },
1152 blocks: missing.clone(),
1153 vid_common: missing.clone(),
1154 pruned_height: None,
1155 }
1156 );
1157
1158 {
1160 let mut tx = db.write().await.unwrap();
1161 tx.insert_leaf(&leaves[1]).await.unwrap();
1162 tx.insert_block(&blocks[1]).await.unwrap();
1163 tx.insert_vid(
1164 &VidCommonQueryData::<MockTypes>::new(
1165 leaves[1].header().clone(),
1166 hotshot_types::data::VidCommon::V0(dispersal.common),
1167 ),
1168 Some(&VidShare::V0(dispersal.shares[0].clone())),
1169 )
1170 .await
1171 .unwrap();
1172 tx.commit().await.unwrap();
1173 }
1174
1175 let present = ResourceSyncStatus {
1179 missing: 0,
1180 ranges: vec![SyncStatusRange {
1181 status: SyncStatus::Present,
1182 start: 0,
1183 end: 2,
1184 }],
1185 };
1186 let missing = ResourceSyncStatus {
1187 missing: 1,
1188 ranges: vec![
1189 SyncStatusRange {
1190 status: SyncStatus::Missing,
1191 start: 0,
1192 end: 1,
1193 },
1194 SyncStatusRange {
1195 status: SyncStatus::Present,
1196 start: 1,
1197 end: 2,
1198 },
1199 ],
1200 };
1201 assert_eq!(
1202 NodeStorage::<MockTypes>::sync_status_for_range(&mut db.read().await.unwrap(), 0, 2)
1203 .await
1204 .unwrap(),
1205 SyncStatusQueryData {
1206 leaves: present.clone(),
1207 blocks: missing.clone(),
1208 vid_common: present,
1209 pruned_height: None,
1210 }
1211 );
1212 }
1213}