1mod extension;
30pub mod fetching;
31pub mod fs;
32mod metrics;
33mod notifier;
34pub mod sql;
35pub mod storage;
36mod update;
37
38pub use extension::ExtensibleDataSource;
39pub use fetching::{AvailabilityProvider, FetchingDataSource};
40#[cfg(feature = "file-system-data-source")]
41pub use fs::FileSystemDataSource;
42#[cfg(feature = "metrics-data-source")]
43pub use metrics::MetricsDataSource;
44#[cfg(feature = "sql-data-source")]
45pub use sql::SqlDataSource;
46pub use update::{Transaction, UpdateDataSource, VersionedDataSource};
47
48#[cfg(any(test, feature = "testing"))]
49mod test_helpers {
50 use std::ops::{Bound, RangeBounds};
51
52 use futures::{
53 future,
54 stream::{BoxStream, StreamExt},
55 };
56
57 use crate::{
58 availability::{BlockQueryData, Fetch, LeafQueryData},
59 node::NodeDataSource,
60 testing::{consensus::TestableDataSource, mocks::MockTypes},
61 };
62
63 async fn bound_range<R, D>(ds: &D, range: R) -> impl RangeBounds<usize> + use<R, D>
65 where
66 D: TestableDataSource,
67 R: RangeBounds<usize>,
68 {
69 let start = range.start_bound().cloned();
70 let mut end = range.end_bound().cloned();
71 if end == Bound::Unbounded {
72 end = Bound::Excluded(NodeDataSource::block_height(ds).await.unwrap());
73 }
74 (start, end)
75 }
76
77 pub async fn block_range<R, D>(
79 ds: &D,
80 range: R,
81 ) -> BoxStream<'static, BlockQueryData<MockTypes>>
82 where
83 D: TestableDataSource,
84 R: RangeBounds<usize> + Send + 'static,
85 {
86 ds.get_block_range(bound_range(ds, range).await)
87 .await
88 .then(Fetch::resolve)
89 .boxed()
90 }
91
92 pub async fn leaf_range<R, D>(ds: &D, range: R) -> BoxStream<'static, LeafQueryData<MockTypes>>
94 where
95 D: TestableDataSource,
96 R: RangeBounds<usize> + Send + 'static,
97 {
98 ds.get_leaf_range(bound_range(ds, range).await)
99 .await
100 .then(Fetch::resolve)
101 .boxed()
102 }
103
104 pub async fn get_non_empty_blocks<D>(
105 ds: &D,
106 ) -> Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>
107 where
108 D: TestableDataSource,
109 {
110 leaf_range(ds, 1..)
112 .await
113 .zip(block_range(ds, 1..).await)
114 .filter(|(_, block)| future::ready(!block.is_empty()))
115 .collect()
116 .await
117 }
118}
119
120#[cfg(any(test, feature = "testing"))]
122#[espresso_macros::generic_tests]
123pub mod availability_tests {
124 use std::{
125 collections::HashMap,
126 fmt::Debug,
127 ops::{Bound, RangeBounds},
128 };
129
130 use committable::Committable;
131 use futures::stream::StreamExt;
132 use hotshot_example_types::node_types::TEST_VERSIONS;
133 use hotshot_types::{data::Leaf2, vote::HasViewNumber};
134
135 use super::test_helpers::*;
136 use crate::{
137 availability::{BlockId, BlockQueryData, LeafQueryData, VidCommonQueryData, payload_size},
138 data_source::{
139 Transaction,
140 storage::{AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage},
141 },
142 node::NodeDataSource,
143 testing::{
144 consensus::{MockNetwork, TestableDataSource},
145 mocks::{MockTypes, mock_transaction},
146 },
147 types::HeightIndexed,
148 };
149
150 async fn validate<D: TestableDataSource>(ds: &D)
151 where
152 for<'a> D::ReadOnly<'a>: AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
153 {
154 let mut seen_payloads = HashMap::new();
157 let mut seen_transactions = HashMap::new();
158 let mut leaves = leaf_range(ds, ..).await.enumerate();
159 while let Some((i, leaf)) = leaves.next().await {
160 assert_eq!(leaf.height(), i as u64);
161 assert_eq!(
162 leaf.hash(),
163 <Leaf2<MockTypes> as Committable>::commit(&leaf.leaf)
164 );
165
166 tracing::info!("looking up leaf {i} various ways");
168 assert_eq!(leaf, ds.get_leaf(i).await.await);
169 assert_eq!(leaf, ds.get_leaf(leaf.hash()).await.await);
170
171 tracing::info!("looking up block {i} various ways");
172 let block = ds.get_block(i).await.await;
173 assert_eq!(leaf.block_hash(), block.hash());
174 assert_eq!(block.height(), i as u64);
175 assert_eq!(block.hash(), block.header().commit());
176 assert_eq!(block.size(), payload_size::<MockTypes>(block.payload()));
177
178 assert_eq!(block, ds.get_block(i).await.await);
180 assert_eq!(ds.get_block(block.hash()).await.await.height(), i as u64);
181 let ix = seen_payloads
191 .entry(block.payload_hash())
192 .or_insert(i as u64);
193 if let Ok(block) = ds
194 .get_block(BlockId::PayloadHash(block.payload_hash()))
195 .await
196 .try_resolve()
197 {
198 assert_eq!(block.height(), *ix);
199 } else {
200 tracing::warn!(
201 "skipping block by payload index check for missing payload {:?}",
202 block.header()
203 );
204 ds.get_block(BlockId::PayloadHash(block.payload_hash()))
206 .await
207 .await;
208 }
209
210 tracing::info!("looking up payload {i} various ways");
212 let expected_payload = block.clone().into();
213 assert_eq!(ds.get_payload(i).await.await, expected_payload);
214 assert_eq!(ds.get_payload(block.hash()).await.await, expected_payload);
215 if let Ok(payload) = ds
218 .get_payload(BlockId::PayloadHash(block.payload_hash()))
219 .await
220 .try_resolve()
221 {
222 if *ix == i as u64 {
223 assert_eq!(payload, expected_payload);
224 }
225 } else {
226 tracing::warn!(
227 "skipping payload index check for missing payload {:?}",
228 block.header()
229 );
230 ds.get_payload(BlockId::PayloadHash(block.payload_hash()))
232 .await
233 .await;
234 }
235
236 tracing::info!("looking up VID common {i} various ways");
238 let common = ds.get_vid_common(block.height() as usize).await.await;
239 assert_eq!(common, ds.get_vid_common(block.hash()).await.await);
240 if let Ok(res) = ds
243 .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
244 .await
245 .try_resolve()
246 {
247 if *ix == i as u64 {
248 assert_eq!(res, common);
249 }
250 } else {
251 tracing::warn!(
252 "skipping VID common index check for missing data {:?}",
253 block.header()
254 );
255 let res = ds
257 .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
258 .await
259 .await;
260 assert_eq!(res.payload_hash(), common.payload_hash());
261 }
262
263 for (j, txn) in block.enumerate() {
264 tracing::info!("looking up transaction {i},{j:?}");
265
266 let ix = seen_transactions
274 .entry(txn.commit())
275 .or_insert((i as u64, j.clone()));
276 if let Ok(tx_data) = ds
277 .get_block_containing_transaction(txn.commit())
278 .await
279 .try_resolve()
280 {
281 assert_eq!(tx_data.transaction.transaction(), &txn);
282 assert_eq!(tx_data.transaction.block_height(), ix.0);
283 assert_eq!(tx_data.transaction.index(), ix.1.position as u64);
284 assert_eq!(tx_data.index, ix.1);
285 assert_eq!(tx_data.block, block);
286 } else {
287 tracing::warn!(
288 "skipping transaction index check for missing transaction {j:?} {txn:?}"
289 );
290 ds.get_block_containing_transaction(txn.commit())
292 .await
293 .await;
294 }
295 }
296 }
297
298 {
300 let mut tx = ds.read().await.unwrap();
301 let block_height = NodeStorage::block_height(&mut tx).await.unwrap();
302 let last_leaf = tx.get_leaf((block_height - 1).into()).await.unwrap();
303
304 if last_leaf.qc().data.epoch.is_some() {
305 tracing::info!(block_height, "checking QC chain");
306 let qc_chain = tx.latest_qc_chain().await.unwrap().unwrap();
307
308 assert_eq!(last_leaf.height(), (block_height - 1) as u64);
309 assert_eq!(qc_chain[0].view_number(), last_leaf.leaf().view_number());
310 assert_eq!(qc_chain[0].leaf_commit(), last_leaf.hash());
311 assert_eq!(qc_chain[1].view_number(), qc_chain[0].view_number() + 1);
312 }
313 }
314 }
315
316 #[test_log::test(tokio::test(flavor = "multi_thread"))]
317 pub async fn test_update<D: TestableDataSource>()
318 where
319 for<'a> D::ReadOnly<'a>: AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
320 {
321 let mut network = MockNetwork::<D>::init().await;
322 let ds = network.data_source();
323
324 network.start().await;
325 assert_eq!(get_non_empty_blocks(&ds).await, vec![]);
326
327 let mut blocks = ds.subscribe_blocks(0).await.enumerate();
330 for nonce in 0..3 {
331 let txn = mock_transaction(vec![nonce]);
332 network.submit_transaction(txn).await;
333
334 let (i, block) = loop {
336 tracing::info!("waiting for tx {nonce}");
337 let (i, block) = blocks.next().await.unwrap();
338 if !block.is_empty() {
339 break (i, block);
340 }
341 tracing::info!("block {i} is empty");
342 };
343
344 tracing::info!("got tx {nonce} in block {i}");
345 assert_eq!(ds.get_block(i).await.await, block);
346 validate(&ds).await;
347 }
348
349 {
353 tracing::info!("checking persisted storage");
354 let storage = D::connect(network.storage()).await;
355
356 let block_height = NodeDataSource::block_height(&ds).await.unwrap();
360 assert_eq!(
361 ds.get_block_range(..block_height)
362 .await
363 .map(|fetch| fetch.try_resolve().ok())
364 .collect::<Vec<_>>()
365 .await,
366 storage
367 .get_block_range(..block_height)
368 .await
369 .map(|fetch| fetch.try_resolve().ok())
370 .collect::<Vec<_>>()
371 .await
372 );
373 assert_eq!(
374 ds.get_leaf_range(..block_height)
375 .await
376 .map(|fetch| fetch.try_resolve().ok())
377 .collect::<Vec<_>>()
378 .await,
379 storage
380 .get_leaf_range(..block_height)
381 .await
382 .map(|fetch| fetch.try_resolve().ok())
383 .collect::<Vec<_>>()
384 .await
385 );
386 }
387 }
388
389 #[test_log::test(tokio::test(flavor = "multi_thread"))]
390 pub async fn test_range<D: TestableDataSource>()
391 where
392 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
393 {
394 let mut network = MockNetwork::<D>::init().await;
395 let ds = network.data_source();
396 network.start().await;
397
398 let block_height = loop {
400 let mut tx = ds.read().await.unwrap();
401 let block_height = tx.block_height().await.unwrap();
402 if block_height >= 3 {
403 break block_height as u64;
404 }
405 };
406
407 do_range_test(&ds, 1..=2, 1..3).await; do_range_test(&ds, 1..3, 1..3).await; do_range_test(&ds, 1.., 1..block_height).await; do_range_test(&ds, ..=2, 0..3).await; do_range_test(&ds, ..3, 0..3).await; do_range_test(&ds, .., 0..block_height).await; do_range_test(&ds, ExRange(0..=2), 1..3).await; do_range_test(&ds, ExRange(0..3), 1..3).await; do_range_test(&ds, ExRange(0..), 1..block_height).await; }
419
420 async fn do_range_test<D, R, I>(ds: &D, range: R, expected_indices: I)
421 where
422 D: TestableDataSource,
423 R: RangeBounds<usize> + Clone + Debug + Send + 'static,
424 I: IntoIterator<Item = u64>,
425 {
426 tracing::info!("testing range {range:?}");
427
428 let mut leaves = ds.get_leaf_range(range.clone()).await;
429 let mut blocks = ds.get_block_range(range.clone()).await;
430 let mut payloads = ds.get_payload_range(range.clone()).await;
431 let mut payloads_meta = ds.get_payload_metadata_range(range.clone()).await;
432 let mut vid_common = ds.get_vid_common_range(range.clone()).await;
433 let mut vid_common_meta = ds.get_vid_common_metadata_range(range.clone()).await;
434
435 for i in expected_indices {
436 tracing::info!(i, "check entries");
437 let leaf = leaves.next().await.unwrap().await;
438 let block = blocks.next().await.unwrap().await;
439 let payload = payloads.next().await.unwrap().await;
440 let payload_meta = payloads_meta.next().await.unwrap().await;
441 let common = vid_common.next().await.unwrap().await;
442 let common_meta = vid_common_meta.next().await.unwrap().await;
443 assert_eq!(leaf.height(), i);
444 assert_eq!(block.height(), i);
445 assert_eq!(payload, ds.get_payload(i as usize).await.await);
446 assert_eq!(payload_meta, block.into());
447 assert_eq!(common, ds.get_vid_common(i as usize).await.await);
448 assert_eq!(common_meta, common.into());
449 }
450
451 if range.end_bound() == Bound::Unbounded {
452 loop {
455 let fetch_leaf = leaves.next().await.unwrap();
456 let fetch_block = blocks.next().await.unwrap();
457 let fetch_payload = payloads.next().await.unwrap();
458 let fetch_payload_meta = payloads_meta.next().await.unwrap();
459 let fetch_common = vid_common.next().await.unwrap();
460 let fetch_common_meta = vid_common_meta.next().await.unwrap();
461
462 if fetch_leaf.try_resolve().is_ok()
463 && fetch_block.try_resolve().is_ok()
464 && fetch_payload.try_resolve().is_ok()
465 && fetch_payload_meta.try_resolve().is_ok()
466 && fetch_common.try_resolve().is_ok()
467 && fetch_common_meta.try_resolve().is_ok()
468 {
469 tracing::info!("searching for end of available objects");
470 } else {
471 break;
472 }
473 }
474 } else {
475 assert!(leaves.next().await.is_none());
477 assert!(blocks.next().await.is_none());
478 assert!(payloads.next().await.is_none());
479 assert!(payloads_meta.next().await.is_none());
480 assert!(vid_common.next().await.is_none());
481 assert!(vid_common_meta.next().await.is_none());
482 }
483 }
484
485 #[test_log::test(tokio::test(flavor = "multi_thread"))]
486 pub async fn test_range_rev<D: TestableDataSource>()
487 where
488 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
489 {
490 let mut network = MockNetwork::<D>::init().await;
491 let ds = network.data_source();
492 network.start().await;
493
494 ds.subscribe_leaves(5).await.next().await.unwrap();
496
497 do_range_rev_test(&ds, Bound::Included(1), 5, 1..=5).await;
499 do_range_rev_test(&ds, Bound::Excluded(1), 5, 2..=5).await;
500 do_range_rev_test(&ds, Bound::Unbounded, 5, 0..=5).await;
501 }
502
503 async fn do_range_rev_test<D>(
504 ds: &D,
505 start: Bound<usize>,
506 end: usize,
507 expected_indices: impl DoubleEndedIterator<Item = u64>,
508 ) where
509 D: TestableDataSource,
510 {
511 tracing::info!("testing range {start:?}-{end}");
512
513 let mut leaves = ds.get_leaf_range_rev(start, end).await;
514 let mut blocks = ds.get_block_range_rev(start, end).await;
515 let mut payloads = ds.get_payload_range_rev(start, end).await;
516 let mut payloads_meta = ds.get_payload_metadata_range_rev(start, end).await;
517 let mut vid_common = ds.get_vid_common_range_rev(start, end).await;
518 let mut vid_common_meta = ds.get_vid_common_metadata_range_rev(start, end).await;
519
520 for i in expected_indices.rev() {
521 tracing::info!(i, "check entries");
522 let leaf = leaves.next().await.unwrap().await;
523 let block = blocks.next().await.unwrap().await;
524 let payload = payloads.next().await.unwrap().await;
525 let payload_meta = payloads_meta.next().await.unwrap().await;
526 let common = vid_common.next().await.unwrap().await;
527 let common_meta = vid_common_meta.next().await.unwrap().await;
528 assert_eq!(leaf.height(), i);
529 assert_eq!(block.height(), i);
530 assert_eq!(payload.height(), i);
531 assert_eq!(payload_meta.height(), i);
532 assert_eq!(common, ds.get_vid_common(i as usize).await.await);
533 assert_eq!(
534 common_meta,
535 ds.get_vid_common_metadata(i as usize).await.await
536 );
537 }
538
539 assert!(leaves.next().await.is_none());
541 assert!(blocks.next().await.is_none());
542 assert!(payloads.next().await.is_none());
543 assert!(payloads_meta.next().await.is_none());
544 assert!(vid_common.next().await.is_none());
545 assert!(vid_common_meta.next().await.is_none());
546 }
547
548 #[derive(Clone, Copy, Debug)]
550 struct ExRange<R>(R);
551
552 impl<R: RangeBounds<usize>> RangeBounds<usize> for ExRange<R> {
553 fn start_bound(&self) -> Bound<&usize> {
554 match self.0.start_bound() {
555 Bound::Included(x) => Bound::Excluded(x),
556 Bound::Excluded(x) => Bound::Excluded(x),
557 Bound::Unbounded => Bound::Excluded(&0),
558 }
559 }
560
561 fn end_bound(&self) -> Bound<&usize> {
562 self.0.end_bound()
563 }
564 }
565
566 #[tokio::test]
571 #[test_log::test]
572 pub async fn test_insert_consecutive_identical_blocks<D: TestableDataSource>()
573 where
574 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
575 {
576 let storage = D::create(0).await;
577 let ds = D::connect(&storage).await;
578
579 let leaf = LeafQueryData::<MockTypes>::genesis(
580 &Default::default(),
581 &Default::default(),
582 TEST_VERSIONS.test,
583 )
584 .await;
585 let block = BlockQueryData::<MockTypes>::genesis(
586 &Default::default(),
587 &Default::default(),
588 TEST_VERSIONS.test.base,
589 )
590 .await;
591 let vid = VidCommonQueryData::<MockTypes>::genesis(
592 &Default::default(),
593 &Default::default(),
594 TEST_VERSIONS.test.base,
595 )
596 .await;
597
598 let mut leaf2 = leaf.clone();
599 leaf2.leaf.block_header_mut().block_number += 1;
600 let block2 =
601 BlockQueryData::<MockTypes>::new(leaf2.header().clone(), block.payload.clone());
602 let vid2 = VidCommonQueryData::<MockTypes>::new(leaf2.header().clone(), vid.common.clone());
603
604 {
605 let mut tx = ds.write().await.unwrap();
606 tx.insert_leaf_range([&leaf, &leaf2]).await.unwrap();
607 tx.insert_block_range([&block, &block2]).await.unwrap();
608 tx.insert_vid_range([(&vid, None), (&vid2, None)])
609 .await
610 .unwrap();
611 tx.commit().await.unwrap();
612 }
613
614 assert_eq!(ds.get_leaf(0).await.await, leaf);
615 assert_eq!(ds.get_leaf(1).await.await, leaf2);
616 assert_eq!(ds.get_block(0).await.await, block);
617 assert_eq!(ds.get_block(1).await.await, block2);
618 assert_eq!(ds.get_vid_common(0).await.await, vid);
619 assert_eq!(ds.get_vid_common(1).await.await, vid2);
620 }
621}
622
623#[cfg(any(test, feature = "testing"))]
625#[espresso_macros::generic_tests]
626pub mod persistence_tests {
627 use committable::Committable;
628 use hotshot_example_types::{
629 node_types::TEST_VERSIONS,
630 state_types::{TestInstanceState, TestValidatedState},
631 };
632 use hotshot_types::simple_certificate::QuorumCertificate2;
633
634 use crate::{
635 Leaf2,
636 availability::{BlockQueryData, LeafQueryData},
637 data_source::{
638 Transaction,
639 storage::{AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage},
640 },
641 node::NodeDataSource,
642 testing::{
643 consensus::TestableDataSource,
644 mocks::{MockPayload, MockTypes},
645 },
646 types::HeightIndexed,
647 };
648
649 #[test_log::test(tokio::test(flavor = "multi_thread"))]
650 pub async fn test_revert<D: TestableDataSource>()
651 where
652 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
653 + AvailabilityStorage<MockTypes>
654 + NodeStorage<MockTypes>,
655 {
656 let storage = D::create(0).await;
657 let ds = D::connect(&storage).await;
658
659 let mut qc = QuorumCertificate2::<MockTypes>::genesis(
661 &TestValidatedState::default(),
662 &TestInstanceState::default(),
663 TEST_VERSIONS.test,
664 )
665 .await;
666 let mut leaf = Leaf2::<MockTypes>::genesis(
667 &TestValidatedState::default(),
668 &TestInstanceState::default(),
669 TEST_VERSIONS.test.base,
670 )
671 .await;
672 leaf.block_header_mut().block_number += 1;
675 qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
676
677 let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
678 let leaf = LeafQueryData::new(leaf, qc).unwrap();
679
680 let mut tx = ds.write().await.unwrap();
682 tx.insert_leaf(&leaf).await.unwrap();
683 tx.insert_block(&block).await.unwrap();
684
685 assert_eq!(tx.block_height().await.unwrap(), 2);
686 assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
687 assert_eq!(block, tx.get_block(1.into()).await.unwrap());
688
689 tx.revert().await;
691 assert_eq!(
692 NodeDataSource::<MockTypes>::block_height(&ds)
693 .await
694 .unwrap(),
695 0
696 );
697 ds.get_leaf(1).await.try_resolve().unwrap_err();
698 ds.get_block(1).await.try_resolve().unwrap_err();
699 }
700
701 #[test_log::test(tokio::test(flavor = "multi_thread"))]
702 pub async fn test_reset<D: TestableDataSource>()
703 where
704 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
705 {
706 let storage = D::create(0).await;
707 let ds = D::connect(&storage).await;
708
709 let mut qc = QuorumCertificate2::<MockTypes>::genesis(
711 &TestValidatedState::default(),
712 &TestInstanceState::default(),
713 TEST_VERSIONS.test,
714 )
715 .await;
716 let mut leaf = Leaf2::<MockTypes>::genesis(
717 &TestValidatedState::default(),
718 &TestInstanceState::default(),
719 TEST_VERSIONS.test.base,
720 )
721 .await;
722 leaf.block_header_mut().block_number += 1;
725 qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
726
727 let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
728 let leaf = LeafQueryData::new(leaf, qc).unwrap();
729
730 let mut tx = ds.write().await.unwrap();
732 tx.insert_leaf(&leaf).await.unwrap();
733 tx.insert_block(&block).await.unwrap();
734 tx.commit().await.unwrap();
735
736 assert_eq!(
737 NodeDataSource::<MockTypes>::block_height(&ds)
738 .await
739 .unwrap(),
740 2
741 );
742 assert_eq!(leaf, ds.get_leaf(1).await.await);
743 assert_eq!(block, ds.get_block(1).await.await);
744
745 drop(ds);
746
747 let ds = D::reset(&storage).await;
749 assert_eq!(
750 NodeDataSource::<MockTypes>::block_height(&ds)
751 .await
752 .unwrap(),
753 0
754 );
755 ds.get_leaf(1).await.try_resolve().unwrap_err();
756 ds.get_block(1).await.try_resolve().unwrap_err();
757 }
758
759 #[test_log::test(tokio::test(flavor = "multi_thread"))]
760 pub async fn test_drop_tx<D: TestableDataSource>()
761 where
762 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
763 + AvailabilityStorage<MockTypes>
764 + NodeStorage<MockTypes>,
765 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
766 {
767 let storage = D::create(0).await;
768 let ds = D::connect(&storage).await;
769
770 let mut mock_qc = QuorumCertificate2::<MockTypes>::genesis(
772 &TestValidatedState::default(),
773 &TestInstanceState::default(),
774 TEST_VERSIONS.test,
775 )
776 .await;
777 let mut mock_leaf = Leaf2::<MockTypes>::genesis(
778 &TestValidatedState::default(),
779 &TestInstanceState::default(),
780 TEST_VERSIONS.test.base,
781 )
782 .await;
783 mock_leaf.block_header_mut().block_number += 1;
786 mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
787
788 let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
789 let leaf = LeafQueryData::new(mock_leaf.clone(), mock_qc.clone()).unwrap();
790
791 tracing::info!("write");
793 let mut tx = ds.write().await.unwrap();
794 tx.insert_leaf(&leaf).await.unwrap();
795 tx.insert_block(&block).await.unwrap();
796
797 assert_eq!(tx.block_height().await.unwrap(), 2);
798 assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
799 assert_eq!(block, tx.get_block(1.into()).await.unwrap());
800
801 drop(tx);
803
804 tracing::info!("read");
806 let mut tx = ds.read().await.unwrap();
807 assert_eq!(tx.block_height().await.unwrap(), 0);
808 drop(tx);
809
810 mock_leaf.block_header_mut().block_number += 1;
812 mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
813 let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
814 let leaf = LeafQueryData::new(mock_leaf, mock_qc).unwrap();
815
816 tracing::info!("write again");
817 let mut tx = ds.write().await.unwrap();
818 tx.insert_leaf(&leaf).await.unwrap();
819 tx.insert_block(&block).await.unwrap();
820 tx.commit().await.unwrap();
821
822 tracing::info!("read again");
825 let height = leaf.height() as usize;
826 assert_eq!(
827 NodeDataSource::<MockTypes>::block_height(&ds)
828 .await
829 .unwrap(),
830 height + 1
831 );
832 assert_eq!(leaf, ds.get_leaf(height).await.await);
833 assert_eq!(block, ds.get_block(height).await.await);
834 ds.get_leaf(height - 1).await.try_resolve().unwrap_err();
835 ds.get_block(height - 1).await.try_resolve().unwrap_err();
836 }
837}
838
839#[cfg(any(test, feature = "testing"))]
841#[espresso_macros::generic_tests]
842pub mod node_tests {
843 use std::time::Duration;
844
845 use committable::Committable;
846 use futures::{future::join_all, stream::StreamExt};
847 use hotshot::traits::BlockPayload;
848 use hotshot_example_types::{
849 block_types::{TestBlockHeader, TestBlockPayload, TestMetadata},
850 node_types::{TEST_VERSIONS, TestTypes},
851 state_types::{TestInstanceState, TestValidatedState},
852 };
853 use hotshot_types::{
854 data::{VidCommitment, VidCommon, VidShare, ViewNumber, vid_commitment},
855 simple_certificate::{CertificatePair, QuorumCertificate2},
856 traits::block_contents::{BlockHeader, EncodeBytes},
857 vid::advz::{ADVZScheme, advz_scheme},
858 };
859 use jf_advz::VidScheme;
860 use pretty_assertions::assert_eq;
861
862 use crate::{
863 Header, Leaf2,
864 availability::{BlockInfo, BlockQueryData, LeafQueryData, VidCommonQueryData},
865 data_source::{
866 storage::{NodeStorage, UpdateAvailabilityStorage},
867 update::Transaction,
868 },
869 node::{
870 BlockId, NodeDataSource, ResourceSyncStatus, SyncStatus, SyncStatusQueryData,
871 SyncStatusRange, TimeWindowQueryData, WindowStart,
872 },
873 testing::{
874 consensus::{MockNetwork, TestableDataSource},
875 mocks::{MockPayload, MockTypes, mock_transaction},
876 sleep,
877 },
878 types::HeightIndexed,
879 };
880
881 fn block_header_timestamp(header: &Header<MockTypes>) -> u64 {
882 <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
883 }
884
885 #[test_log::test(tokio::test(flavor = "multi_thread"))]
886 pub async fn test_sync_status<D: TestableDataSource>()
887 where
888 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
889 {
890 let storage = D::create(0).await;
891 let ds = D::build(&storage, |builder| {
892 builder.with_sync_status_ttl(Duration::ZERO)
893 })
894 .await;
895
896 let mut vid = advz_scheme(2);
898
899 let mut leaves = vec![
901 LeafQueryData::<MockTypes>::genesis(
902 &TestValidatedState::default(),
903 &TestInstanceState::default(),
904 TEST_VERSIONS.test,
905 )
906 .await,
907 ];
908 let mut blocks = vec![
909 BlockQueryData::<MockTypes>::genesis(
910 &TestValidatedState::default(),
911 &TestInstanceState::default(),
912 TEST_VERSIONS.test.base,
913 )
914 .await,
915 ];
916 let dispersal = vid.disperse([]).unwrap();
917 let mut vid_commons = vec![VidCommonQueryData::new(
918 leaves[0].header().clone(),
919 VidCommon::V0(dispersal.common.clone()),
920 )];
921 for i in 0..2 {
922 let (payload, metadata) = <MockPayload as BlockPayload<MockTypes>>::from_transactions(
925 vec![mock_transaction(vec![i as u8])],
926 &Default::default(),
927 &Default::default(),
928 )
929 .await
930 .unwrap();
931 let dispersal = vid.disperse(payload.encode()).unwrap();
932
933 let mut leaf = leaves[i].clone();
934 leaf.leaf.block_header_mut().block_number += 1;
935 leaf.leaf.block_header_mut().payload_commitment = VidCommitment::V0(dispersal.commit);
936 leaf.leaf.block_header_mut().metadata = metadata;
937 let block = BlockQueryData::new(leaf.header().clone(), payload);
938 let vid_common = VidCommonQueryData::new(
939 leaf.header().clone(),
940 VidCommon::V0(dispersal.common.clone()),
941 );
942
943 leaves.push(leaf);
944 blocks.push(block);
945 vid_commons.push(vid_common);
946 }
947
948 assert!(ds.sync_status().await.unwrap().is_fully_synced());
950
951 ds.append(leaves[0].clone().into()).await.unwrap();
954 assert_eq!(
955 ds.sync_status().await.unwrap(),
956 SyncStatusQueryData {
957 blocks: ResourceSyncStatus {
958 missing: 1,
959 ranges: vec![SyncStatusRange {
960 start: 0,
961 end: 1,
962 status: SyncStatus::Missing,
963 }]
964 },
965 vid_common: ResourceSyncStatus {
966 missing: 1,
967 ranges: vec![SyncStatusRange {
968 start: 0,
969 end: 1,
970 status: SyncStatus::Missing,
971 }]
972 },
973 leaves: ResourceSyncStatus {
974 missing: 0,
975 ranges: vec![SyncStatusRange {
976 start: 0,
977 end: 1,
978 status: SyncStatus::Present,
979 }]
980 },
981 pruned_height: None,
982 }
983 );
984
985 ds.append(leaves[2].clone().into()).await.unwrap();
988 assert_eq!(
989 ds.sync_status().await.unwrap(),
990 SyncStatusQueryData {
991 blocks: ResourceSyncStatus {
992 missing: 3,
993 ranges: vec![SyncStatusRange {
994 start: 0,
995 end: 3,
996 status: SyncStatus::Missing,
997 }]
998 },
999 vid_common: ResourceSyncStatus {
1000 missing: 3,
1001 ranges: vec![SyncStatusRange {
1002 start: 0,
1003 end: 3,
1004 status: SyncStatus::Missing,
1005 }]
1006 },
1007 leaves: ResourceSyncStatus {
1008 missing: 1,
1009 ranges: vec![
1010 SyncStatusRange {
1011 start: 0,
1012 end: 1,
1013 status: SyncStatus::Present,
1014 },
1015 SyncStatusRange {
1016 start: 1,
1017 end: 2,
1018 status: SyncStatus::Missing,
1019 },
1020 SyncStatusRange {
1021 start: 2,
1022 end: 3,
1023 status: SyncStatus::Present,
1024 }
1025 ]
1026 },
1027 pruned_height: None,
1028 }
1029 );
1030
1031 {
1033 let mut tx = ds.write().await.unwrap();
1034 tx.insert_vid(&vid_commons[0].clone(), None).await.unwrap();
1035 tx.commit().await.unwrap();
1036 }
1037 assert_eq!(
1038 ds.sync_status().await.unwrap(),
1039 SyncStatusQueryData {
1040 blocks: ResourceSyncStatus {
1041 missing: 3,
1042 ranges: vec![SyncStatusRange {
1043 start: 0,
1044 end: 3,
1045 status: SyncStatus::Missing,
1046 }]
1047 },
1048 vid_common: ResourceSyncStatus {
1049 missing: 2,
1050 ranges: vec![
1051 SyncStatusRange {
1052 start: 0,
1053 end: 1,
1054 status: SyncStatus::Present,
1055 },
1056 SyncStatusRange {
1057 start: 1,
1058 end: 3,
1059 status: SyncStatus::Missing,
1060 },
1061 ]
1062 },
1063 leaves: ResourceSyncStatus {
1064 missing: 1,
1065 ranges: vec![
1066 SyncStatusRange {
1067 start: 0,
1068 end: 1,
1069 status: SyncStatus::Present,
1070 },
1071 SyncStatusRange {
1072 start: 1,
1073 end: 2,
1074 status: SyncStatus::Missing,
1075 },
1076 SyncStatusRange {
1077 start: 2,
1078 end: 3,
1079 status: SyncStatus::Present,
1080 }
1081 ]
1082 },
1083 pruned_height: None,
1084 }
1085 );
1086
1087 {
1089 let mut tx = ds.write().await.unwrap();
1090 tx.insert_block(&blocks[0]).await.unwrap();
1091 tx.insert_vid(&vid_commons[0], None).await.unwrap();
1092 tx.insert_leaf(&leaves[1]).await.unwrap();
1093 tx.insert_block(&blocks[1]).await.unwrap();
1094 tx.insert_vid(&vid_commons[1], None).await.unwrap();
1095 tx.insert_block(&blocks[2]).await.unwrap();
1096 tx.insert_vid(&vid_commons[2], None).await.unwrap();
1097 tx.commit().await.unwrap();
1098 }
1099
1100 let leaves = if ds.get_leaf(1).await.try_resolve().is_err() {
1104 tracing::warn!(
1105 "data source does not support out-of-order filling, allowing one missing leaf"
1106 );
1107 ResourceSyncStatus {
1108 missing: 1,
1109 ranges: vec![
1110 SyncStatusRange {
1111 start: 0,
1112 end: 1,
1113 status: SyncStatus::Present,
1114 },
1115 SyncStatusRange {
1116 start: 1,
1117 end: 2,
1118 status: SyncStatus::Missing,
1119 },
1120 SyncStatusRange {
1121 start: 2,
1122 end: 3,
1123 status: SyncStatus::Present,
1124 },
1125 ],
1126 }
1127 } else {
1128 ResourceSyncStatus {
1129 missing: 0,
1130 ranges: vec![SyncStatusRange {
1131 start: 0,
1132 end: 3,
1133 status: SyncStatus::Present,
1134 }],
1135 }
1136 };
1137 let expected_sync_status = SyncStatusQueryData {
1138 leaves,
1139 blocks: ResourceSyncStatus {
1140 missing: 0,
1141 ranges: vec![SyncStatusRange {
1142 start: 0,
1143 end: 3,
1144 status: SyncStatus::Present,
1145 }],
1146 },
1147 vid_common: ResourceSyncStatus {
1148 missing: 0,
1149 ranges: vec![SyncStatusRange {
1150 start: 0,
1151 end: 3,
1152 status: SyncStatus::Present,
1153 }],
1154 },
1155 pruned_height: None,
1156 };
1157 assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
1158 }
1159
1160 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1161 pub async fn test_counters<D: TestableDataSource>() {
1162 let storage = D::create(0).await;
1163 let ds = D::connect(&storage).await;
1164
1165 assert_eq!(ds.count_transactions().await.unwrap(), 0);
1166 assert_eq!(ds.payload_size().await.unwrap(), 0);
1167
1168 let mut total_transactions = 0;
1170 let mut total_size = 0;
1171 'outer: for i in [0, 1, 2] {
1172 let (payload, metadata) =
1177 <TestBlockPayload as BlockPayload<TestTypes>>::from_transactions(
1178 [mock_transaction(vec![i as u8 % 2])],
1179 &TestValidatedState::default(),
1180 &TestInstanceState::default(),
1181 )
1182 .await
1183 .unwrap();
1184 let encoded = payload.encode();
1185 let payload_commitment =
1186 vid_commitment(&encoded, &metadata.encode(), 1, TEST_VERSIONS.test.base);
1187 let header = TestBlockHeader {
1188 block_number: i,
1189 payload_commitment,
1190 timestamp: i,
1191 timestamp_millis: i * 1_000,
1192 builder_commitment:
1193 <TestBlockPayload as BlockPayload<TestTypes>>::builder_commitment(
1194 &payload, &metadata,
1195 ),
1196 metadata: TestMetadata {
1197 num_transactions: 7, },
1199 random: 1, version: TEST_VERSIONS.test.base,
1201 };
1202
1203 let mut leaf = LeafQueryData::<MockTypes>::genesis(
1204 &TestValidatedState::default(),
1205 &TestInstanceState::default(),
1206 TEST_VERSIONS.test,
1207 )
1208 .await;
1209 *leaf.leaf.block_header_mut() = header.clone();
1210 let block = BlockQueryData::new(header, payload);
1211 ds.append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1212 .await
1213 .unwrap();
1214 assert_eq!(
1215 NodeDataSource::<MockTypes>::block_height(&ds)
1216 .await
1217 .unwrap(),
1218 (i + 1) as usize,
1219 );
1220
1221 total_transactions += 1;
1222 total_size += encoded.len();
1223
1224 for retry in 0..5 {
1226 let ds_transactions = ds.count_transactions().await.unwrap();
1227 let ds_payload_size = ds.payload_size().await.unwrap();
1228 if ds_transactions != total_transactions || ds_payload_size != total_size {
1229 tracing::info!(
1230 i,
1231 retry,
1232 total_transactions,
1233 ds_transactions,
1234 total_size,
1235 ds_payload_size,
1236 "waiting for statistics to update"
1237 );
1238 sleep(Duration::from_secs(1)).await;
1239 } else {
1240 continue 'outer;
1241 }
1242 }
1243 panic!("counters did not update in time");
1244 }
1245 }
1246
1247 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1248 pub async fn test_vid_shares<D: TestableDataSource>()
1249 where
1250 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1251 {
1252 let mut network = MockNetwork::<D>::init().await;
1253 let ds = network.data_source();
1254
1255 network.start().await;
1256
1257 let mut leaves = ds.subscribe_leaves(0).await.take(3);
1259 while let Some(leaf) = leaves.next().await {
1260 tracing::info!("got leaf {}", leaf.height());
1261 let mut tx = ds.read().await.unwrap();
1262 let share = tx.vid_share(leaf.height() as usize).await.unwrap();
1263 assert_eq!(share, tx.vid_share(leaf.block_hash()).await.unwrap());
1264 assert_eq!(
1265 share,
1266 tx.vid_share(BlockId::PayloadHash(leaf.payload_hash()))
1267 .await
1268 .unwrap()
1269 );
1270 }
1271 }
1272
1273 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1274 pub async fn test_vid_monotonicity<D: TestableDataSource>()
1275 where
1276 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1277 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1278 {
1279 let storage = D::create(0).await;
1280 let ds = D::connect(&storage).await;
1281
1282 let mut vid = advz_scheme(2);
1284 let disperse = vid.disperse([]).unwrap();
1285
1286 let leaf = LeafQueryData::<MockTypes>::genesis(
1288 &TestValidatedState::default(),
1289 &TestInstanceState::default(),
1290 TEST_VERSIONS.test,
1291 )
1292 .await;
1293 let common = VidCommonQueryData::new(leaf.header().clone(), VidCommon::V0(disperse.common));
1294 ds.append(BlockInfo::new(
1295 leaf,
1296 None,
1297 Some(common.clone()),
1298 Some(VidShare::V0(disperse.shares[0].clone())),
1299 ))
1300 .await
1301 .unwrap();
1302
1303 {
1304 assert_eq!(ds.get_vid_common(0).await.await, common);
1305 assert_eq!(
1306 ds.vid_share(0).await.unwrap(),
1307 VidShare::V0(disperse.shares[0].clone())
1308 );
1309 }
1310
1311 {
1314 let mut tx = ds.write().await.unwrap();
1315 tx.insert_vid(&common, None).await.unwrap();
1316 tx.commit().await.unwrap();
1317 }
1318 {
1319 assert_eq!(ds.get_vid_common(0).await.await, common);
1320 assert_eq!(
1321 ds.vid_share(0).await.unwrap(),
1322 VidShare::V0(disperse.shares[0].clone())
1323 );
1324 }
1325 }
1326
1327 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1328 pub async fn test_vid_recovery<D: TestableDataSource>()
1329 where
1330 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1331 {
1332 let mut network = MockNetwork::<D>::init().await;
1333 let ds = network.data_source();
1334
1335 network.start().await;
1336
1337 let mut blocks = ds.subscribe_blocks(0).await;
1339 let txn = mock_transaction(vec![1, 2, 3]);
1340 network.submit_transaction(txn.clone()).await;
1341
1342 let block = loop {
1344 tracing::info!("waiting for transaction");
1345 let block = blocks.next().await.unwrap();
1346 if !block.is_empty() {
1347 tracing::info!(height = block.height(), "transaction sequenced");
1348 break block;
1349 }
1350 tracing::info!(height = block.height(), "empty block");
1351 };
1352 let height = block.height() as usize;
1353 let commit = if let VidCommitment::V0(commit) = block.payload_hash() {
1354 commit
1355 } else {
1356 panic!("expect ADVZ commitment")
1357 };
1358
1359 let vid = advz_scheme(network.num_nodes());
1361
1362 tracing::info!("fetching common data");
1364 let common = ds.get_vid_common(height).await.await;
1365 let VidCommon::V0(common) = &common.common() else {
1366 panic!("expect ADVZ common");
1367 };
1368 ADVZScheme::is_consistent(&commit, common).unwrap();
1369
1370 tracing::info!("fetching shares");
1372 let network = &network;
1373 let vid = &vid;
1374 let shares: Vec<_> = join_all((0..network.num_nodes()).map(|i| async move {
1375 let ds = network.data_source_index(i);
1376
1377 let mut leaves = ds.subscribe_leaves(height).await;
1380 let leaf = leaves.next().await.unwrap();
1381 assert_eq!(leaf.height(), height as u64);
1382 assert_eq!(leaf.payload_hash(), VidCommitment::V0(commit));
1383
1384 let share = if let VidShare::V0(share) = ds.vid_share(height).await.unwrap() {
1385 share
1386 } else {
1387 panic!("expect ADVZ share")
1388 };
1389 vid.verify_share(&share, common, &commit).unwrap().unwrap();
1390 share
1391 }))
1392 .await;
1393
1394 tracing::info!("recovering payload");
1396 let bytes = vid.recover_payload(&shares, common).unwrap();
1397 let recovered = <MockPayload as BlockPayload<TestTypes>>::from_bytes(
1398 &bytes,
1399 &TestMetadata {
1400 num_transactions: 7, },
1402 );
1403 assert_eq!(recovered, *block.payload());
1404 assert_eq!(recovered.transactions, vec![txn]);
1405 }
1406
1407 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1408 pub async fn test_timestamp_window<D: TestableDataSource>() {
1409 let mut network = MockNetwork::<D>::init().await;
1410 let ds = network.data_source();
1411
1412 network.start().await;
1413
1414 let mut leaves = ds.subscribe_leaves(0).await;
1417 let mut test_blocks: Vec<Vec<Header<MockTypes>>> = vec![];
1420 while test_blocks.len() < 3 {
1421 let leaf = leaves.next().await.unwrap();
1423 let header = leaf.header().clone();
1424 if let Some(last_timestamp) = test_blocks.last_mut() {
1425 if <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&last_timestamp[0])
1426 == <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&header)
1427 {
1428 last_timestamp.push(header);
1429 } else {
1430 test_blocks.push(vec![header]);
1431 }
1432 } else {
1433 test_blocks.push(vec![header]);
1434 }
1435 }
1436 tracing::info!("blocks for testing: {test_blocks:#?}");
1437
1438 let check_invariants =
1440 |res: &TimeWindowQueryData<Header<MockTypes>>, start, end, check_prev| {
1441 let mut prev = res.prev.as_ref();
1442 if let Some(prev) = prev {
1443 if check_prev {
1444 assert!(block_header_timestamp(prev) < start);
1445 }
1446 } else {
1447 assert_eq!(res.from().unwrap(), 0);
1450 };
1451 for header in &res.window {
1452 assert!(start <= block_header_timestamp(header));
1453 assert!(block_header_timestamp(header) < end);
1454 if let Some(prev) = prev {
1455 assert!(
1456 <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(prev)
1457 <= <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
1458 );
1459 }
1460 prev = Some(header);
1461 }
1462 if let Some(next) = &res.next {
1463 assert!(<TestBlockHeader as BlockHeader<MockTypes>>::timestamp(next) >= end);
1464 assert!(block_header_timestamp(next) >= block_header_timestamp(prev.unwrap()));
1467 }
1468 };
1469
1470 let get_window = |start, end| {
1471 let ds = ds.clone();
1472 async move {
1473 let window = ds
1474 .get_header_window(WindowStart::Time(start), end, i64::MAX as usize)
1475 .await
1476 .unwrap();
1477 tracing::info!("window for timestamp range {start}-{end}: {window:#?}");
1478 check_invariants(&window, start, end, true);
1479 window
1480 }
1481 };
1482
1483 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1485 let end = start + 1;
1486 let res = get_window(start, end).await;
1487 assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1488 assert_eq!(res.window, test_blocks[1]);
1489 assert_eq!(res.next.unwrap(), test_blocks[2][0]);
1490
1491 let start = 0;
1493 let end = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[0][0]) + 1;
1494 let res = get_window(start, end).await;
1495 assert_eq!(res.prev, None);
1496 assert_eq!(res.window, test_blocks[0]);
1497 assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1498
1499 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[2][0]);
1501 let end = i64::MAX as u64;
1502 let res = get_window(start, end).await;
1503 assert_eq!(res.prev.unwrap(), *test_blocks[1].last().unwrap());
1504 assert_eq!(res.window[..test_blocks[2].len()], test_blocks[2]);
1507 assert_eq!(res.next, None);
1508 let from = test_blocks.iter().flatten().count() - 1;
1512 let more = ds
1513 .get_header_window(WindowStart::Height(from as u64), end, i64::MAX as usize)
1514 .await
1515 .unwrap();
1516 check_invariants(&more, start, end, false);
1517 assert_eq!(
1518 more.prev.as_ref().unwrap(),
1519 test_blocks.iter().flatten().nth(from - 1).unwrap()
1520 );
1521 assert_eq!(
1522 more.window[..res.window.len() - test_blocks[2].len() + 1],
1523 res.window[test_blocks[2].len() - 1..]
1524 );
1525 assert_eq!(res.next, None);
1526 let more2 = ds
1528 .get_header_window(
1529 test_blocks[2].last().unwrap().commit(),
1530 end,
1531 i64::MAX as usize,
1532 )
1533 .await
1534 .unwrap();
1535 check_invariants(&more2, start, end, false);
1536 assert_eq!(more2.from().unwrap(), more.from().unwrap());
1537 assert_eq!(more2.prev, more.prev);
1538 assert_eq!(more2.next, more.next);
1539 assert_eq!(more2.window[..more.window.len()], more.window);
1540
1541 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1543 let end = start;
1544 let res = get_window(start, end).await;
1545 assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1546 assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1547 assert_eq!(res.window, vec![]);
1548
1549 ds.get_header_window(
1551 WindowStart::Time((i64::MAX - 1) as u64),
1552 i64::MAX as u64,
1553 i64::MAX as usize,
1554 )
1555 .await
1556 .unwrap_err();
1557
1558 let blocks = [test_blocks[0].clone(), test_blocks[1].clone()]
1560 .into_iter()
1561 .flatten()
1562 .collect::<Vec<_>>();
1563 let start = block_header_timestamp(&blocks[0]);
1565 let end = block_header_timestamp(&test_blocks[2][0]);
1566 let res = ds
1567 .get_header_window(WindowStart::Time(start), end, 1)
1568 .await
1569 .unwrap();
1570 assert_eq!(res.prev, None);
1571 assert_eq!(res.window, [blocks[0].clone()]);
1572 assert_eq!(res.next, None);
1573 let res = ds
1575 .get_header_window(WindowStart::Height(blocks[0].height() + 1), end, 1)
1576 .await
1577 .unwrap();
1578 assert_eq!(res.window, [blocks[1].clone()]);
1579 assert_eq!(res.next, None);
1580 let res = ds
1582 .get_header_window(
1583 WindowStart::Height(blocks[1].height() + 1),
1584 end,
1585 blocks.len() - 1,
1586 )
1587 .await
1588 .unwrap();
1589 assert_eq!(res.window, blocks[2..].to_vec());
1590 assert_eq!(res.next, Some(test_blocks[2][0].clone()));
1591 }
1592
1593 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1594 pub async fn test_latest_qc_chain<D: TestableDataSource>()
1595 where
1596 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1597 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1598 {
1599 let storage = D::create(0).await;
1600 let ds = D::connect(&storage).await;
1601
1602 {
1603 let mut tx = ds.read().await.unwrap();
1604 assert_eq!(tx.latest_qc_chain().await.unwrap(), None);
1605 }
1606
1607 async fn leaf_with_qc_chain(
1608 number: u64,
1609 ) -> (LeafQueryData<MockTypes>, [CertificatePair<MockTypes>; 2]) {
1610 let mut leaf = Leaf2::<MockTypes>::genesis(
1611 &Default::default(),
1612 &Default::default(),
1613 TEST_VERSIONS.test.base,
1614 )
1615 .await;
1616 leaf.block_header_mut().block_number = number;
1617
1618 let mut qc1 = QuorumCertificate2::<MockTypes>::genesis(
1619 &Default::default(),
1620 &Default::default(),
1621 TEST_VERSIONS.test,
1622 )
1623 .await;
1624 qc1.view_number = ViewNumber::new(1);
1625 qc1.data.leaf_commit = Committable::commit(&leaf);
1626
1627 let mut qc2 = qc1.clone();
1628 qc2.view_number += 1;
1629
1630 let leaf = LeafQueryData::new(leaf, qc1.clone()).unwrap();
1631 (
1632 leaf,
1633 [
1634 CertificatePair::non_epoch_change(qc1),
1635 CertificatePair::non_epoch_change(qc2),
1636 ],
1637 )
1638 }
1639
1640 {
1642 let (leaf, qcs) = leaf_with_qc_chain(2).await;
1643 let mut tx = ds.write().await.unwrap();
1644 tx.insert_leaf_with_qc_chain(&leaf, Some(qcs.clone()))
1645 .await
1646 .unwrap();
1647 tx.commit().await.unwrap();
1648
1649 assert_eq!(
1650 ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1651 Some(qcs)
1652 );
1653 }
1654
1655 {
1658 let (leaf, _) = leaf_with_qc_chain(3).await;
1659 let mut tx = ds.write().await.unwrap();
1660 tx.insert_leaf_with_qc_chain(&leaf, None).await.unwrap();
1661 tx.commit().await.unwrap();
1662
1663 assert_eq!(
1664 ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1665 None
1666 );
1667 }
1668
1669 {
1672 let (leaf, qcs) = leaf_with_qc_chain(1).await;
1673 let mut tx = ds.write().await.unwrap();
1674 tx.insert_leaf_with_qc_chain(&leaf, Some(qcs))
1675 .await
1676 .unwrap();
1677 tx.commit().await.unwrap();
1678
1679 assert_eq!(
1680 ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1681 None
1682 );
1683 }
1684 }
1685}
1686
1687#[cfg(any(test, feature = "testing"))]
1689#[espresso_macros::generic_tests]
1690pub mod status_tests {
1691 use std::time::Duration;
1692
1693 use crate::{
1694 status::StatusDataSource,
1695 testing::{
1696 consensus::{DataSourceLifeCycle, MockNetwork},
1697 mocks::mock_transaction,
1698 sleep,
1699 },
1700 };
1701
1702 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1703 pub async fn test_metrics<D: DataSourceLifeCycle + StatusDataSource>() {
1704 let mut network = MockNetwork::<D>::init().await;
1705 let ds = network.data_source();
1706
1707 {
1708 assert_eq!(ds.block_height().await.unwrap(), 0);
1710 assert!(ds.success_rate().await.unwrap().is_nan());
1713 assert!(
1716 (ds.elapsed_time_since_last_decide().await.unwrap() as i64
1717 - chrono::Utc::now().timestamp())
1718 .abs()
1719 <= 1,
1720 "time elapsed since last_decided_time is not within 1s"
1721 );
1722 }
1723
1724 let txn = mock_transaction(vec![1, 2, 3]);
1726 network.submit_transaction(txn.clone()).await;
1727
1728 network.start().await;
1730
1731 loop {
1733 let height = ds.block_height().await.unwrap();
1734 if height > 1 {
1735 break;
1736 }
1737 tracing::info!(height, "waiting for a block to be finalized");
1738 sleep(Duration::from_secs(1)).await;
1739 }
1740
1741 {
1742 let success_rate = ds.success_rate().await.unwrap();
1746 assert!(success_rate.is_finite(), "{success_rate}");
1747 assert!(success_rate > 0.0, "{success_rate}");
1748 }
1749
1750 {
1751 network.shut_down().await;
1754 sleep(Duration::from_secs(3)).await;
1755 assert!(ds.elapsed_time_since_last_decide().await.unwrap() >= 3);
1757 }
1758 }
1759}
1760
1761#[macro_export]
1762macro_rules! instantiate_data_source_tests {
1763 ($t:ty) => {
1764 use $crate::data_source::{
1765 availability_tests, node_tests, persistence_tests, status_tests,
1766 };
1767
1768 instantiate_availability_tests!($t);
1769 instantiate_persistence_tests!($t);
1770 instantiate_node_tests!($t);
1771 instantiate_status_tests!($t);
1772 };
1773}