1mod extension;
30pub mod fetching;
31pub mod fs;
32mod metrics;
33mod notifier;
34pub mod sql;
35pub mod storage;
36mod update;
37
38pub use extension::ExtensibleDataSource;
39pub use fetching::{AvailabilityProvider, FetchingDataSource};
40#[cfg(feature = "file-system-data-source")]
41pub use fs::FileSystemDataSource;
42#[cfg(feature = "metrics-data-source")]
43pub use metrics::MetricsDataSource;
44#[cfg(feature = "sql-data-source")]
45pub use sql::SqlDataSource;
46pub use update::{Transaction, UpdateDataSource, VersionedDataSource};
47
48#[cfg(any(test, feature = "testing"))]
49mod test_helpers {
50 use std::ops::{Bound, RangeBounds};
51
52 use futures::{
53 future,
54 stream::{BoxStream, StreamExt},
55 };
56
57 use crate::{
58 availability::{BlockQueryData, Fetch, LeafQueryData},
59 node::NodeDataSource,
60 testing::{consensus::TestableDataSource, mocks::MockTypes},
61 };
62
63 async fn bound_range<R, D>(ds: &D, range: R) -> impl RangeBounds<usize> + use<R, D>
65 where
66 D: TestableDataSource,
67 R: RangeBounds<usize>,
68 {
69 let start = range.start_bound().cloned();
70 let mut end = range.end_bound().cloned();
71 if end == Bound::Unbounded {
72 end = Bound::Excluded(NodeDataSource::block_height(ds).await.unwrap());
73 }
74 (start, end)
75 }
76
77 pub async fn block_range<R, D>(
79 ds: &D,
80 range: R,
81 ) -> BoxStream<'static, BlockQueryData<MockTypes>>
82 where
83 D: TestableDataSource,
84 R: RangeBounds<usize> + Send + 'static,
85 {
86 ds.get_block_range(bound_range(ds, range).await)
87 .await
88 .then(Fetch::resolve)
89 .boxed()
90 }
91
92 pub async fn leaf_range<R, D>(ds: &D, range: R) -> BoxStream<'static, LeafQueryData<MockTypes>>
94 where
95 D: TestableDataSource,
96 R: RangeBounds<usize> + Send + 'static,
97 {
98 ds.get_leaf_range(bound_range(ds, range).await)
99 .await
100 .then(Fetch::resolve)
101 .boxed()
102 }
103
104 pub async fn get_non_empty_blocks<D>(
105 ds: &D,
106 ) -> Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>
107 where
108 D: TestableDataSource,
109 {
110 leaf_range(ds, 1..)
112 .await
113 .zip(block_range(ds, 1..).await)
114 .filter(|(_, block)| future::ready(!block.is_empty()))
115 .collect()
116 .await
117 }
118}
119
120#[cfg(any(test, feature = "testing"))]
122#[espresso_macros::generic_tests]
123pub mod availability_tests {
124 use std::{
125 collections::HashMap,
126 fmt::Debug,
127 ops::{Bound, RangeBounds},
128 };
129
130 use committable::Committable;
131 use futures::stream::StreamExt;
132 use hotshot_types::{data::Leaf2, vote::HasViewNumber};
133
134 use super::test_helpers::*;
135 use crate::{
136 availability::{BlockId, payload_size},
137 data_source::storage::{AvailabilityStorage, NodeStorage},
138 node::NodeDataSource,
139 testing::{
140 consensus::{MockNetwork, TestableDataSource},
141 mocks::{MockTypes, mock_transaction},
142 },
143 types::HeightIndexed,
144 };
145
146 async fn validate<D: TestableDataSource>(ds: &D)
147 where
148 for<'a> D::ReadOnly<'a>: AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
149 {
150 let mut seen_payloads = HashMap::new();
153 let mut seen_transactions = HashMap::new();
154 let mut leaves = leaf_range(ds, ..).await.enumerate();
155 while let Some((i, leaf)) = leaves.next().await {
156 assert_eq!(leaf.height(), i as u64);
157 assert_eq!(
158 leaf.hash(),
159 <Leaf2<MockTypes> as Committable>::commit(&leaf.leaf)
160 );
161
162 tracing::info!("looking up leaf {i} various ways");
164 assert_eq!(leaf, ds.get_leaf(i).await.await);
165 assert_eq!(leaf, ds.get_leaf(leaf.hash()).await.await);
166
167 tracing::info!("looking up block {i} various ways");
168 let block = ds.get_block(i).await.await;
169 assert_eq!(leaf.block_hash(), block.hash());
170 assert_eq!(block.height(), i as u64);
171 assert_eq!(block.hash(), block.header().commit());
172 assert_eq!(block.size(), payload_size::<MockTypes>(block.payload()));
173
174 assert_eq!(block, ds.get_block(i).await.await);
176 assert_eq!(ds.get_block(block.hash()).await.await.height(), i as u64);
177 let ix = seen_payloads
187 .entry(block.payload_hash())
188 .or_insert(i as u64);
189 if let Ok(block) = ds
190 .get_block(BlockId::PayloadHash(block.payload_hash()))
191 .await
192 .try_resolve()
193 {
194 assert_eq!(block.height(), *ix);
195 } else {
196 tracing::warn!(
197 "skipping block by payload index check for missing payload {:?}",
198 block.header()
199 );
200 ds.get_block(BlockId::PayloadHash(block.payload_hash()))
202 .await
203 .await;
204 }
205
206 tracing::info!("looking up payload {i} various ways");
208 let expected_payload = block.clone().into();
209 assert_eq!(ds.get_payload(i).await.await, expected_payload);
210 assert_eq!(ds.get_payload(block.hash()).await.await, expected_payload);
211 if let Ok(payload) = ds
214 .get_payload(BlockId::PayloadHash(block.payload_hash()))
215 .await
216 .try_resolve()
217 {
218 if *ix == i as u64 {
219 assert_eq!(payload, expected_payload);
220 }
221 } else {
222 tracing::warn!(
223 "skipping payload index check for missing payload {:?}",
224 block.header()
225 );
226 ds.get_payload(BlockId::PayloadHash(block.payload_hash()))
228 .await
229 .await;
230 }
231
232 tracing::info!("looking up VID common {i} various ways");
234 let common = ds.get_vid_common(block.height() as usize).await.await;
235 assert_eq!(common, ds.get_vid_common(block.hash()).await.await);
236 if let Ok(res) = ds
239 .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
240 .await
241 .try_resolve()
242 {
243 if *ix == i as u64 {
244 assert_eq!(res, common);
245 }
246 } else {
247 tracing::warn!(
248 "skipping VID common index check for missing data {:?}",
249 block.header()
250 );
251 let res = ds
253 .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
254 .await
255 .await;
256 assert_eq!(res.payload_hash(), common.payload_hash());
257 }
258
259 for (j, txn) in block.enumerate() {
260 tracing::info!("looking up transaction {i},{j:?}");
261
262 let ix = seen_transactions
270 .entry(txn.commit())
271 .or_insert((i as u64, j.clone()));
272 if let Ok(tx_data) = ds
273 .get_block_containing_transaction(txn.commit())
274 .await
275 .try_resolve()
276 {
277 assert_eq!(tx_data.transaction.transaction(), &txn);
278 assert_eq!(tx_data.transaction.block_height(), ix.0);
279 assert_eq!(tx_data.transaction.index(), ix.1.position as u64);
280 assert_eq!(tx_data.index, ix.1);
281 assert_eq!(tx_data.block, block);
282 } else {
283 tracing::warn!(
284 "skipping transaction index check for missing transaction {j:?} {txn:?}"
285 );
286 ds.get_block_containing_transaction(txn.commit())
288 .await
289 .await;
290 }
291 }
292 }
293
294 {
296 let mut tx = ds.read().await.unwrap();
297 let block_height = NodeStorage::block_height(&mut tx).await.unwrap();
298 let last_leaf = tx.get_leaf((block_height - 1).into()).await.unwrap();
299
300 if last_leaf.qc().data.epoch.is_some() {
301 tracing::info!(block_height, "checking QC chain");
302 let qc_chain = tx.latest_qc_chain().await.unwrap().unwrap();
303
304 assert_eq!(last_leaf.height(), (block_height - 1) as u64);
305 assert_eq!(qc_chain[0].view_number(), last_leaf.leaf().view_number());
306 assert_eq!(qc_chain[0].leaf_commit(), last_leaf.hash());
307 assert_eq!(qc_chain[1].view_number(), qc_chain[0].view_number() + 1);
308 }
309 }
310 }
311
312 #[test_log::test(tokio::test(flavor = "multi_thread"))]
313 pub async fn test_update<D: TestableDataSource>()
314 where
315 for<'a> D::ReadOnly<'a>: AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
316 {
317 let mut network = MockNetwork::<D>::init().await;
318 let ds = network.data_source();
319
320 network.start().await;
321 assert_eq!(get_non_empty_blocks(&ds).await, vec![]);
322
323 let mut blocks = ds.subscribe_blocks(0).await.enumerate();
326 for nonce in 0..3 {
327 let txn = mock_transaction(vec![nonce]);
328 network.submit_transaction(txn).await;
329
330 let (i, block) = loop {
332 tracing::info!("waiting for tx {nonce}");
333 let (i, block) = blocks.next().await.unwrap();
334 if !block.is_empty() {
335 break (i, block);
336 }
337 tracing::info!("block {i} is empty");
338 };
339
340 tracing::info!("got tx {nonce} in block {i}");
341 assert_eq!(ds.get_block(i).await.await, block);
342 validate(&ds).await;
343 }
344
345 {
349 tracing::info!("checking persisted storage");
350 let storage = D::connect(network.storage()).await;
351
352 let block_height = NodeDataSource::block_height(&ds).await.unwrap();
356 assert_eq!(
357 ds.get_block_range(..block_height)
358 .await
359 .map(|fetch| fetch.try_resolve().ok())
360 .collect::<Vec<_>>()
361 .await,
362 storage
363 .get_block_range(..block_height)
364 .await
365 .map(|fetch| fetch.try_resolve().ok())
366 .collect::<Vec<_>>()
367 .await
368 );
369 assert_eq!(
370 ds.get_leaf_range(..block_height)
371 .await
372 .map(|fetch| fetch.try_resolve().ok())
373 .collect::<Vec<_>>()
374 .await,
375 storage
376 .get_leaf_range(..block_height)
377 .await
378 .map(|fetch| fetch.try_resolve().ok())
379 .collect::<Vec<_>>()
380 .await
381 );
382 }
383 }
384
385 #[test_log::test(tokio::test(flavor = "multi_thread"))]
386 pub async fn test_range<D: TestableDataSource>()
387 where
388 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
389 {
390 let mut network = MockNetwork::<D>::init().await;
391 let ds = network.data_source();
392 network.start().await;
393
394 let block_height = loop {
396 let mut tx = ds.read().await.unwrap();
397 let block_height = tx.block_height().await.unwrap();
398 if block_height >= 3 {
399 break block_height as u64;
400 }
401 };
402
403 do_range_test(&ds, 1..=2, 1..3).await; do_range_test(&ds, 1..3, 1..3).await; do_range_test(&ds, 1.., 1..block_height).await; do_range_test(&ds, ..=2, 0..3).await; do_range_test(&ds, ..3, 0..3).await; do_range_test(&ds, .., 0..block_height).await; do_range_test(&ds, ExRange(0..=2), 1..3).await; do_range_test(&ds, ExRange(0..3), 1..3).await; do_range_test(&ds, ExRange(0..), 1..block_height).await; }
415
416 async fn do_range_test<D, R, I>(ds: &D, range: R, expected_indices: I)
417 where
418 D: TestableDataSource,
419 R: RangeBounds<usize> + Clone + Debug + Send + 'static,
420 I: IntoIterator<Item = u64>,
421 {
422 tracing::info!("testing range {range:?}");
423
424 let mut leaves = ds.get_leaf_range(range.clone()).await;
425 let mut blocks = ds.get_block_range(range.clone()).await;
426 let mut payloads = ds.get_payload_range(range.clone()).await;
427 let mut payloads_meta = ds.get_payload_metadata_range(range.clone()).await;
428 let mut vid_common = ds.get_vid_common_range(range.clone()).await;
429 let mut vid_common_meta = ds.get_vid_common_metadata_range(range.clone()).await;
430
431 for i in expected_indices {
432 tracing::info!(i, "check entries");
433 let leaf = leaves.next().await.unwrap().await;
434 let block = blocks.next().await.unwrap().await;
435 let payload = payloads.next().await.unwrap().await;
436 let payload_meta = payloads_meta.next().await.unwrap().await;
437 let common = vid_common.next().await.unwrap().await;
438 let common_meta = vid_common_meta.next().await.unwrap().await;
439 assert_eq!(leaf.height(), i);
440 assert_eq!(block.height(), i);
441 assert_eq!(payload, ds.get_payload(i as usize).await.await);
442 assert_eq!(payload_meta, block.into());
443 assert_eq!(common, ds.get_vid_common(i as usize).await.await);
444 assert_eq!(common_meta, common.into());
445 }
446
447 if range.end_bound() == Bound::Unbounded {
448 loop {
451 let fetch_leaf = leaves.next().await.unwrap();
452 let fetch_block = blocks.next().await.unwrap();
453 let fetch_payload = payloads.next().await.unwrap();
454 let fetch_payload_meta = payloads_meta.next().await.unwrap();
455 let fetch_common = vid_common.next().await.unwrap();
456 let fetch_common_meta = vid_common_meta.next().await.unwrap();
457
458 if fetch_leaf.try_resolve().is_ok()
459 && fetch_block.try_resolve().is_ok()
460 && fetch_payload.try_resolve().is_ok()
461 && fetch_payload_meta.try_resolve().is_ok()
462 && fetch_common.try_resolve().is_ok()
463 && fetch_common_meta.try_resolve().is_ok()
464 {
465 tracing::info!("searching for end of available objects");
466 } else {
467 break;
468 }
469 }
470 } else {
471 assert!(leaves.next().await.is_none());
473 assert!(blocks.next().await.is_none());
474 assert!(payloads.next().await.is_none());
475 assert!(payloads_meta.next().await.is_none());
476 assert!(vid_common.next().await.is_none());
477 assert!(vid_common_meta.next().await.is_none());
478 }
479 }
480
481 #[test_log::test(tokio::test(flavor = "multi_thread"))]
482 pub async fn test_range_rev<D: TestableDataSource>()
483 where
484 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
485 {
486 let mut network = MockNetwork::<D>::init().await;
487 let ds = network.data_source();
488 network.start().await;
489
490 ds.subscribe_leaves(5).await.next().await.unwrap();
492
493 do_range_rev_test(&ds, Bound::Included(1), 5, 1..=5).await;
495 do_range_rev_test(&ds, Bound::Excluded(1), 5, 2..=5).await;
496 do_range_rev_test(&ds, Bound::Unbounded, 5, 0..=5).await;
497 }
498
499 async fn do_range_rev_test<D>(
500 ds: &D,
501 start: Bound<usize>,
502 end: usize,
503 expected_indices: impl DoubleEndedIterator<Item = u64>,
504 ) where
505 D: TestableDataSource,
506 {
507 tracing::info!("testing range {start:?}-{end}");
508
509 let mut leaves = ds.get_leaf_range_rev(start, end).await;
510 let mut blocks = ds.get_block_range_rev(start, end).await;
511 let mut payloads = ds.get_payload_range_rev(start, end).await;
512 let mut payloads_meta = ds.get_payload_metadata_range_rev(start, end).await;
513 let mut vid_common = ds.get_vid_common_range_rev(start, end).await;
514 let mut vid_common_meta = ds.get_vid_common_metadata_range_rev(start, end).await;
515
516 for i in expected_indices.rev() {
517 tracing::info!(i, "check entries");
518 let leaf = leaves.next().await.unwrap().await;
519 let block = blocks.next().await.unwrap().await;
520 let payload = payloads.next().await.unwrap().await;
521 let payload_meta = payloads_meta.next().await.unwrap().await;
522 let common = vid_common.next().await.unwrap().await;
523 let common_meta = vid_common_meta.next().await.unwrap().await;
524 assert_eq!(leaf.height(), i);
525 assert_eq!(block.height(), i);
526 assert_eq!(payload.height(), i);
527 assert_eq!(payload_meta.height(), i);
528 assert_eq!(common, ds.get_vid_common(i as usize).await.await);
529 assert_eq!(
530 common_meta,
531 ds.get_vid_common_metadata(i as usize).await.await
532 );
533 }
534
535 assert!(leaves.next().await.is_none());
537 assert!(blocks.next().await.is_none());
538 assert!(payloads.next().await.is_none());
539 assert!(payloads_meta.next().await.is_none());
540 assert!(vid_common.next().await.is_none());
541 assert!(vid_common_meta.next().await.is_none());
542 }
543
544 #[derive(Clone, Copy, Debug)]
546 struct ExRange<R>(R);
547
548 impl<R: RangeBounds<usize>> RangeBounds<usize> for ExRange<R> {
549 fn start_bound(&self) -> Bound<&usize> {
550 match self.0.start_bound() {
551 Bound::Included(x) => Bound::Excluded(x),
552 Bound::Excluded(x) => Bound::Excluded(x),
553 Bound::Unbounded => Bound::Excluded(&0),
554 }
555 }
556
557 fn end_bound(&self) -> Bound<&usize> {
558 self.0.end_bound()
559 }
560 }
561}
562
563#[cfg(any(test, feature = "testing"))]
565#[espresso_macros::generic_tests]
566pub mod persistence_tests {
567 use committable::Committable;
568 use hotshot_example_types::{
569 node_types::TEST_VERSIONS,
570 state_types::{TestInstanceState, TestValidatedState},
571 };
572 use hotshot_types::simple_certificate::QuorumCertificate2;
573
574 use crate::{
575 Leaf2,
576 availability::{BlockQueryData, LeafQueryData},
577 data_source::{
578 Transaction,
579 storage::{AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage},
580 },
581 node::NodeDataSource,
582 testing::{
583 consensus::TestableDataSource,
584 mocks::{MockPayload, MockTypes},
585 },
586 types::HeightIndexed,
587 };
588
589 #[test_log::test(tokio::test(flavor = "multi_thread"))]
590 pub async fn test_revert<D: TestableDataSource>()
591 where
592 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
593 + AvailabilityStorage<MockTypes>
594 + NodeStorage<MockTypes>,
595 {
596 let storage = D::create(0).await;
597 let ds = D::connect(&storage).await;
598
599 let mut qc = QuorumCertificate2::<MockTypes>::genesis(
601 &TestValidatedState::default(),
602 &TestInstanceState::default(),
603 TEST_VERSIONS.test,
604 )
605 .await;
606 let mut leaf = Leaf2::<MockTypes>::genesis(
607 &TestValidatedState::default(),
608 &TestInstanceState::default(),
609 TEST_VERSIONS.test.base,
610 )
611 .await;
612 leaf.block_header_mut().block_number += 1;
615 qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
616
617 let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
618 let leaf = LeafQueryData::new(leaf, qc).unwrap();
619
620 let mut tx = ds.write().await.unwrap();
622 tx.insert_leaf(&leaf).await.unwrap();
623 tx.insert_block(&block).await.unwrap();
624
625 assert_eq!(tx.block_height().await.unwrap(), 2);
626 assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
627 assert_eq!(block, tx.get_block(1.into()).await.unwrap());
628
629 tx.revert().await;
631 assert_eq!(
632 NodeDataSource::<MockTypes>::block_height(&ds)
633 .await
634 .unwrap(),
635 0
636 );
637 ds.get_leaf(1).await.try_resolve().unwrap_err();
638 ds.get_block(1).await.try_resolve().unwrap_err();
639 }
640
641 #[test_log::test(tokio::test(flavor = "multi_thread"))]
642 pub async fn test_reset<D: TestableDataSource>()
643 where
644 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
645 {
646 let storage = D::create(0).await;
647 let ds = D::connect(&storage).await;
648
649 let mut qc = QuorumCertificate2::<MockTypes>::genesis(
651 &TestValidatedState::default(),
652 &TestInstanceState::default(),
653 TEST_VERSIONS.test,
654 )
655 .await;
656 let mut leaf = Leaf2::<MockTypes>::genesis(
657 &TestValidatedState::default(),
658 &TestInstanceState::default(),
659 TEST_VERSIONS.test.base,
660 )
661 .await;
662 leaf.block_header_mut().block_number += 1;
665 qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
666
667 let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
668 let leaf = LeafQueryData::new(leaf, qc).unwrap();
669
670 let mut tx = ds.write().await.unwrap();
672 tx.insert_leaf(&leaf).await.unwrap();
673 tx.insert_block(&block).await.unwrap();
674 tx.commit().await.unwrap();
675
676 assert_eq!(
677 NodeDataSource::<MockTypes>::block_height(&ds)
678 .await
679 .unwrap(),
680 2
681 );
682 assert_eq!(leaf, ds.get_leaf(1).await.await);
683 assert_eq!(block, ds.get_block(1).await.await);
684
685 drop(ds);
686
687 let ds = D::reset(&storage).await;
689 assert_eq!(
690 NodeDataSource::<MockTypes>::block_height(&ds)
691 .await
692 .unwrap(),
693 0
694 );
695 ds.get_leaf(1).await.try_resolve().unwrap_err();
696 ds.get_block(1).await.try_resolve().unwrap_err();
697 }
698
699 #[test_log::test(tokio::test(flavor = "multi_thread"))]
700 pub async fn test_drop_tx<D: TestableDataSource>()
701 where
702 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
703 + AvailabilityStorage<MockTypes>
704 + NodeStorage<MockTypes>,
705 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
706 {
707 let storage = D::create(0).await;
708 let ds = D::connect(&storage).await;
709
710 let mut mock_qc = QuorumCertificate2::<MockTypes>::genesis(
712 &TestValidatedState::default(),
713 &TestInstanceState::default(),
714 TEST_VERSIONS.test,
715 )
716 .await;
717 let mut mock_leaf = Leaf2::<MockTypes>::genesis(
718 &TestValidatedState::default(),
719 &TestInstanceState::default(),
720 TEST_VERSIONS.test.base,
721 )
722 .await;
723 mock_leaf.block_header_mut().block_number += 1;
726 mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
727
728 let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
729 let leaf = LeafQueryData::new(mock_leaf.clone(), mock_qc.clone()).unwrap();
730
731 tracing::info!("write");
733 let mut tx = ds.write().await.unwrap();
734 tx.insert_leaf(&leaf).await.unwrap();
735 tx.insert_block(&block).await.unwrap();
736
737 assert_eq!(tx.block_height().await.unwrap(), 2);
738 assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
739 assert_eq!(block, tx.get_block(1.into()).await.unwrap());
740
741 drop(tx);
743
744 tracing::info!("read");
746 let mut tx = ds.read().await.unwrap();
747 assert_eq!(tx.block_height().await.unwrap(), 0);
748 drop(tx);
749
750 mock_leaf.block_header_mut().block_number += 1;
752 mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
753 let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
754 let leaf = LeafQueryData::new(mock_leaf, mock_qc).unwrap();
755
756 tracing::info!("write again");
757 let mut tx = ds.write().await.unwrap();
758 tx.insert_leaf(&leaf).await.unwrap();
759 tx.insert_block(&block).await.unwrap();
760 tx.commit().await.unwrap();
761
762 tracing::info!("read again");
765 let height = leaf.height() as usize;
766 assert_eq!(
767 NodeDataSource::<MockTypes>::block_height(&ds)
768 .await
769 .unwrap(),
770 height + 1
771 );
772 assert_eq!(leaf, ds.get_leaf(height).await.await);
773 assert_eq!(block, ds.get_block(height).await.await);
774 ds.get_leaf(height - 1).await.try_resolve().unwrap_err();
775 ds.get_block(height - 1).await.try_resolve().unwrap_err();
776 }
777}
778
779#[cfg(any(test, feature = "testing"))]
781#[espresso_macros::generic_tests]
782pub mod node_tests {
783 use std::time::Duration;
784
785 use committable::Committable;
786 use futures::{future::join_all, stream::StreamExt};
787 use hotshot::traits::BlockPayload;
788 use hotshot_example_types::{
789 block_types::{TestBlockHeader, TestBlockPayload, TestMetadata},
790 node_types::{TEST_VERSIONS, TestTypes},
791 state_types::{TestInstanceState, TestValidatedState},
792 };
793 use hotshot_types::{
794 data::{VidCommitment, VidCommon, VidShare, ViewNumber, vid_commitment},
795 simple_certificate::{CertificatePair, QuorumCertificate2},
796 traits::block_contents::{BlockHeader, EncodeBytes},
797 vid::advz::{ADVZScheme, advz_scheme},
798 };
799 use jf_advz::VidScheme;
800 use pretty_assertions::assert_eq;
801
802 use crate::{
803 Header, Leaf2,
804 availability::{BlockInfo, BlockQueryData, LeafQueryData, VidCommonQueryData},
805 data_source::{
806 storage::{NodeStorage, UpdateAvailabilityStorage},
807 update::Transaction,
808 },
809 node::{
810 BlockId, NodeDataSource, ResourceSyncStatus, SyncStatus, SyncStatusQueryData,
811 SyncStatusRange, TimeWindowQueryData, WindowStart,
812 },
813 testing::{
814 consensus::{MockNetwork, TestableDataSource},
815 mocks::{MockPayload, MockTypes, mock_transaction},
816 sleep,
817 },
818 types::HeightIndexed,
819 };
820
821 fn block_header_timestamp(header: &Header<MockTypes>) -> u64 {
822 <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
823 }
824
825 #[test_log::test(tokio::test(flavor = "multi_thread"))]
826 pub async fn test_sync_status<D: TestableDataSource>()
827 where
828 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
829 {
830 let storage = D::create(0).await;
831 let ds = D::build(&storage, |builder| {
832 builder.with_sync_status_ttl(Duration::ZERO)
833 })
834 .await;
835
836 let mut vid = advz_scheme(2);
838
839 let mut leaves = vec![
841 LeafQueryData::<MockTypes>::genesis(
842 &TestValidatedState::default(),
843 &TestInstanceState::default(),
844 TEST_VERSIONS.test,
845 )
846 .await,
847 ];
848 let mut blocks = vec![
849 BlockQueryData::<MockTypes>::genesis(
850 &TestValidatedState::default(),
851 &TestInstanceState::default(),
852 TEST_VERSIONS.test.base,
853 )
854 .await,
855 ];
856 let dispersal = vid.disperse([]).unwrap();
857 let mut vid_commons = vec![VidCommonQueryData::new(
858 leaves[0].header().clone(),
859 VidCommon::V0(dispersal.common.clone()),
860 )];
861 for i in 0..2 {
862 let (payload, metadata) = <MockPayload as BlockPayload<MockTypes>>::from_transactions(
865 vec![mock_transaction(vec![i as u8])],
866 &Default::default(),
867 &Default::default(),
868 )
869 .await
870 .unwrap();
871 let dispersal = vid.disperse(payload.encode()).unwrap();
872
873 let mut leaf = leaves[i].clone();
874 leaf.leaf.block_header_mut().block_number += 1;
875 leaf.leaf.block_header_mut().payload_commitment = VidCommitment::V0(dispersal.commit);
876 leaf.leaf.block_header_mut().metadata = metadata;
877 let block = BlockQueryData::new(leaf.header().clone(), payload);
878 let vid_common = VidCommonQueryData::new(
879 leaf.header().clone(),
880 VidCommon::V0(dispersal.common.clone()),
881 );
882
883 leaves.push(leaf);
884 blocks.push(block);
885 vid_commons.push(vid_common);
886 }
887
888 assert!(ds.sync_status().await.unwrap().is_fully_synced());
890
891 ds.append(leaves[0].clone().into()).await.unwrap();
894 assert_eq!(
895 ds.sync_status().await.unwrap(),
896 SyncStatusQueryData {
897 blocks: ResourceSyncStatus {
898 missing: 1,
899 ranges: vec![SyncStatusRange {
900 start: 0,
901 end: 1,
902 status: SyncStatus::Missing,
903 }]
904 },
905 vid_common: ResourceSyncStatus {
906 missing: 1,
907 ranges: vec![SyncStatusRange {
908 start: 0,
909 end: 1,
910 status: SyncStatus::Missing,
911 }]
912 },
913 leaves: ResourceSyncStatus {
914 missing: 0,
915 ranges: vec![SyncStatusRange {
916 start: 0,
917 end: 1,
918 status: SyncStatus::Present,
919 }]
920 },
921 pruned_height: None,
922 }
923 );
924
925 ds.append(leaves[2].clone().into()).await.unwrap();
928 assert_eq!(
929 ds.sync_status().await.unwrap(),
930 SyncStatusQueryData {
931 blocks: ResourceSyncStatus {
932 missing: 3,
933 ranges: vec![SyncStatusRange {
934 start: 0,
935 end: 3,
936 status: SyncStatus::Missing,
937 }]
938 },
939 vid_common: ResourceSyncStatus {
940 missing: 3,
941 ranges: vec![SyncStatusRange {
942 start: 0,
943 end: 3,
944 status: SyncStatus::Missing,
945 }]
946 },
947 leaves: ResourceSyncStatus {
948 missing: 1,
949 ranges: vec![
950 SyncStatusRange {
951 start: 0,
952 end: 1,
953 status: SyncStatus::Present,
954 },
955 SyncStatusRange {
956 start: 1,
957 end: 2,
958 status: SyncStatus::Missing,
959 },
960 SyncStatusRange {
961 start: 2,
962 end: 3,
963 status: SyncStatus::Present,
964 }
965 ]
966 },
967 pruned_height: None,
968 }
969 );
970
971 {
973 let mut tx = ds.write().await.unwrap();
974 tx.insert_vid(&vid_commons[0].clone(), None).await.unwrap();
975 tx.commit().await.unwrap();
976 }
977 assert_eq!(
978 ds.sync_status().await.unwrap(),
979 SyncStatusQueryData {
980 blocks: ResourceSyncStatus {
981 missing: 3,
982 ranges: vec![SyncStatusRange {
983 start: 0,
984 end: 3,
985 status: SyncStatus::Missing,
986 }]
987 },
988 vid_common: ResourceSyncStatus {
989 missing: 2,
990 ranges: vec![
991 SyncStatusRange {
992 start: 0,
993 end: 1,
994 status: SyncStatus::Present,
995 },
996 SyncStatusRange {
997 start: 1,
998 end: 3,
999 status: SyncStatus::Missing,
1000 },
1001 ]
1002 },
1003 leaves: ResourceSyncStatus {
1004 missing: 1,
1005 ranges: vec![
1006 SyncStatusRange {
1007 start: 0,
1008 end: 1,
1009 status: SyncStatus::Present,
1010 },
1011 SyncStatusRange {
1012 start: 1,
1013 end: 2,
1014 status: SyncStatus::Missing,
1015 },
1016 SyncStatusRange {
1017 start: 2,
1018 end: 3,
1019 status: SyncStatus::Present,
1020 }
1021 ]
1022 },
1023 pruned_height: None,
1024 }
1025 );
1026
1027 {
1029 let mut tx = ds.write().await.unwrap();
1030 tx.insert_block(&blocks[0]).await.unwrap();
1031 tx.insert_vid(&vid_commons[0], None).await.unwrap();
1032 tx.insert_leaf(&leaves[1]).await.unwrap();
1033 tx.insert_block(&blocks[1]).await.unwrap();
1034 tx.insert_vid(&vid_commons[1], None).await.unwrap();
1035 tx.insert_block(&blocks[2]).await.unwrap();
1036 tx.insert_vid(&vid_commons[2], None).await.unwrap();
1037 tx.commit().await.unwrap();
1038 }
1039
1040 let leaves = if ds.get_leaf(1).await.try_resolve().is_err() {
1044 tracing::warn!(
1045 "data source does not support out-of-order filling, allowing one missing leaf"
1046 );
1047 ResourceSyncStatus {
1048 missing: 1,
1049 ranges: vec![
1050 SyncStatusRange {
1051 start: 0,
1052 end: 1,
1053 status: SyncStatus::Present,
1054 },
1055 SyncStatusRange {
1056 start: 1,
1057 end: 2,
1058 status: SyncStatus::Missing,
1059 },
1060 SyncStatusRange {
1061 start: 2,
1062 end: 3,
1063 status: SyncStatus::Present,
1064 },
1065 ],
1066 }
1067 } else {
1068 ResourceSyncStatus {
1069 missing: 0,
1070 ranges: vec![SyncStatusRange {
1071 start: 0,
1072 end: 3,
1073 status: SyncStatus::Present,
1074 }],
1075 }
1076 };
1077 let expected_sync_status = SyncStatusQueryData {
1078 leaves,
1079 blocks: ResourceSyncStatus {
1080 missing: 0,
1081 ranges: vec![SyncStatusRange {
1082 start: 0,
1083 end: 3,
1084 status: SyncStatus::Present,
1085 }],
1086 },
1087 vid_common: ResourceSyncStatus {
1088 missing: 0,
1089 ranges: vec![SyncStatusRange {
1090 start: 0,
1091 end: 3,
1092 status: SyncStatus::Present,
1093 }],
1094 },
1095 pruned_height: None,
1096 };
1097 assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
1098 }
1099
1100 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1101 pub async fn test_counters<D: TestableDataSource>() {
1102 let storage = D::create(0).await;
1103 let ds = D::connect(&storage).await;
1104
1105 assert_eq!(ds.count_transactions().await.unwrap(), 0);
1106 assert_eq!(ds.payload_size().await.unwrap(), 0);
1107
1108 let mut total_transactions = 0;
1110 let mut total_size = 0;
1111 'outer: for i in [0, 1, 2] {
1112 let (payload, metadata) =
1117 <TestBlockPayload as BlockPayload<TestTypes>>::from_transactions(
1118 [mock_transaction(vec![i as u8 % 2])],
1119 &TestValidatedState::default(),
1120 &TestInstanceState::default(),
1121 )
1122 .await
1123 .unwrap();
1124 let encoded = payload.encode();
1125 let payload_commitment =
1126 vid_commitment(&encoded, &metadata.encode(), 1, TEST_VERSIONS.test.base);
1127 let header = TestBlockHeader {
1128 block_number: i,
1129 payload_commitment,
1130 timestamp: i,
1131 timestamp_millis: i * 1_000,
1132 builder_commitment:
1133 <TestBlockPayload as BlockPayload<TestTypes>>::builder_commitment(
1134 &payload, &metadata,
1135 ),
1136 metadata: TestMetadata {
1137 num_transactions: 7, },
1139 random: 1, version: TEST_VERSIONS.test.base,
1141 };
1142
1143 let mut leaf = LeafQueryData::<MockTypes>::genesis(
1144 &TestValidatedState::default(),
1145 &TestInstanceState::default(),
1146 TEST_VERSIONS.test,
1147 )
1148 .await;
1149 *leaf.leaf.block_header_mut() = header.clone();
1150 let block = BlockQueryData::new(header, payload);
1151 ds.append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1152 .await
1153 .unwrap();
1154 assert_eq!(
1155 NodeDataSource::<MockTypes>::block_height(&ds)
1156 .await
1157 .unwrap(),
1158 (i + 1) as usize,
1159 );
1160
1161 total_transactions += 1;
1162 total_size += encoded.len();
1163
1164 for retry in 0..5 {
1166 let ds_transactions = ds.count_transactions().await.unwrap();
1167 let ds_payload_size = ds.payload_size().await.unwrap();
1168 if ds_transactions != total_transactions || ds_payload_size != total_size {
1169 tracing::info!(
1170 i,
1171 retry,
1172 total_transactions,
1173 ds_transactions,
1174 total_size,
1175 ds_payload_size,
1176 "waiting for statistics to update"
1177 );
1178 sleep(Duration::from_secs(1)).await;
1179 } else {
1180 continue 'outer;
1181 }
1182 }
1183 panic!("counters did not update in time");
1184 }
1185 }
1186
1187 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1188 pub async fn test_vid_shares<D: TestableDataSource>()
1189 where
1190 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1191 {
1192 let mut network = MockNetwork::<D>::init().await;
1193 let ds = network.data_source();
1194
1195 network.start().await;
1196
1197 let mut leaves = ds.subscribe_leaves(0).await.take(3);
1199 while let Some(leaf) = leaves.next().await {
1200 tracing::info!("got leaf {}", leaf.height());
1201 let mut tx = ds.read().await.unwrap();
1202 let share = tx.vid_share(leaf.height() as usize).await.unwrap();
1203 assert_eq!(share, tx.vid_share(leaf.block_hash()).await.unwrap());
1204 assert_eq!(
1205 share,
1206 tx.vid_share(BlockId::PayloadHash(leaf.payload_hash()))
1207 .await
1208 .unwrap()
1209 );
1210 }
1211 }
1212
1213 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1214 pub async fn test_vid_monotonicity<D: TestableDataSource>()
1215 where
1216 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1217 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1218 {
1219 let storage = D::create(0).await;
1220 let ds = D::connect(&storage).await;
1221
1222 let mut vid = advz_scheme(2);
1224 let disperse = vid.disperse([]).unwrap();
1225
1226 let leaf = LeafQueryData::<MockTypes>::genesis(
1228 &TestValidatedState::default(),
1229 &TestInstanceState::default(),
1230 TEST_VERSIONS.test,
1231 )
1232 .await;
1233 let common = VidCommonQueryData::new(leaf.header().clone(), VidCommon::V0(disperse.common));
1234 ds.append(BlockInfo::new(
1235 leaf,
1236 None,
1237 Some(common.clone()),
1238 Some(VidShare::V0(disperse.shares[0].clone())),
1239 ))
1240 .await
1241 .unwrap();
1242
1243 {
1244 assert_eq!(ds.get_vid_common(0).await.await, common);
1245 assert_eq!(
1246 ds.vid_share(0).await.unwrap(),
1247 VidShare::V0(disperse.shares[0].clone())
1248 );
1249 }
1250
1251 {
1254 let mut tx = ds.write().await.unwrap();
1255 tx.insert_vid(&common, None).await.unwrap();
1256 tx.commit().await.unwrap();
1257 }
1258 {
1259 assert_eq!(ds.get_vid_common(0).await.await, common);
1260 assert_eq!(
1261 ds.vid_share(0).await.unwrap(),
1262 VidShare::V0(disperse.shares[0].clone())
1263 );
1264 }
1265 }
1266
1267 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1268 pub async fn test_vid_recovery<D: TestableDataSource>()
1269 where
1270 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1271 {
1272 let mut network = MockNetwork::<D>::init().await;
1273 let ds = network.data_source();
1274
1275 network.start().await;
1276
1277 let mut blocks = ds.subscribe_blocks(0).await;
1279 let txn = mock_transaction(vec![1, 2, 3]);
1280 network.submit_transaction(txn.clone()).await;
1281
1282 let block = loop {
1284 tracing::info!("waiting for transaction");
1285 let block = blocks.next().await.unwrap();
1286 if !block.is_empty() {
1287 tracing::info!(height = block.height(), "transaction sequenced");
1288 break block;
1289 }
1290 tracing::info!(height = block.height(), "empty block");
1291 };
1292 let height = block.height() as usize;
1293 let commit = if let VidCommitment::V0(commit) = block.payload_hash() {
1294 commit
1295 } else {
1296 panic!("expect ADVZ commitment")
1297 };
1298
1299 let vid = advz_scheme(network.num_nodes());
1301
1302 tracing::info!("fetching common data");
1304 let common = ds.get_vid_common(height).await.await;
1305 let VidCommon::V0(common) = &common.common() else {
1306 panic!("expect ADVZ common");
1307 };
1308 ADVZScheme::is_consistent(&commit, common).unwrap();
1309
1310 tracing::info!("fetching shares");
1312 let network = &network;
1313 let vid = &vid;
1314 let shares: Vec<_> = join_all((0..network.num_nodes()).map(|i| async move {
1315 let ds = network.data_source_index(i);
1316
1317 let mut leaves = ds.subscribe_leaves(height).await;
1320 let leaf = leaves.next().await.unwrap();
1321 assert_eq!(leaf.height(), height as u64);
1322 assert_eq!(leaf.payload_hash(), VidCommitment::V0(commit));
1323
1324 let share = if let VidShare::V0(share) = ds.vid_share(height).await.unwrap() {
1325 share
1326 } else {
1327 panic!("expect ADVZ share")
1328 };
1329 vid.verify_share(&share, common, &commit).unwrap().unwrap();
1330 share
1331 }))
1332 .await;
1333
1334 tracing::info!("recovering payload");
1336 let bytes = vid.recover_payload(&shares, common).unwrap();
1337 let recovered = <MockPayload as BlockPayload<TestTypes>>::from_bytes(
1338 &bytes,
1339 &TestMetadata {
1340 num_transactions: 7, },
1342 );
1343 assert_eq!(recovered, *block.payload());
1344 assert_eq!(recovered.transactions, vec![txn]);
1345 }
1346
1347 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1348 pub async fn test_timestamp_window<D: TestableDataSource>() {
1349 let mut network = MockNetwork::<D>::init().await;
1350 let ds = network.data_source();
1351
1352 network.start().await;
1353
1354 let mut leaves = ds.subscribe_leaves(0).await;
1357 let mut test_blocks: Vec<Vec<Header<MockTypes>>> = vec![];
1360 while test_blocks.len() < 3 {
1361 let leaf = leaves.next().await.unwrap();
1363 let header = leaf.header().clone();
1364 if let Some(last_timestamp) = test_blocks.last_mut() {
1365 if <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&last_timestamp[0])
1366 == <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&header)
1367 {
1368 last_timestamp.push(header);
1369 } else {
1370 test_blocks.push(vec![header]);
1371 }
1372 } else {
1373 test_blocks.push(vec![header]);
1374 }
1375 }
1376 tracing::info!("blocks for testing: {test_blocks:#?}");
1377
1378 let check_invariants =
1380 |res: &TimeWindowQueryData<Header<MockTypes>>, start, end, check_prev| {
1381 let mut prev = res.prev.as_ref();
1382 if let Some(prev) = prev {
1383 if check_prev {
1384 assert!(block_header_timestamp(prev) < start);
1385 }
1386 } else {
1387 assert_eq!(res.from().unwrap(), 0);
1390 };
1391 for header in &res.window {
1392 assert!(start <= block_header_timestamp(header));
1393 assert!(block_header_timestamp(header) < end);
1394 if let Some(prev) = prev {
1395 assert!(
1396 <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(prev)
1397 <= <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
1398 );
1399 }
1400 prev = Some(header);
1401 }
1402 if let Some(next) = &res.next {
1403 assert!(<TestBlockHeader as BlockHeader<MockTypes>>::timestamp(next) >= end);
1404 assert!(block_header_timestamp(next) >= block_header_timestamp(prev.unwrap()));
1407 }
1408 };
1409
1410 let get_window = |start, end| {
1411 let ds = ds.clone();
1412 async move {
1413 let window = ds
1414 .get_header_window(WindowStart::Time(start), end, i64::MAX as usize)
1415 .await
1416 .unwrap();
1417 tracing::info!("window for timestamp range {start}-{end}: {window:#?}");
1418 check_invariants(&window, start, end, true);
1419 window
1420 }
1421 };
1422
1423 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1425 let end = start + 1;
1426 let res = get_window(start, end).await;
1427 assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1428 assert_eq!(res.window, test_blocks[1]);
1429 assert_eq!(res.next.unwrap(), test_blocks[2][0]);
1430
1431 let start = 0;
1433 let end = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[0][0]) + 1;
1434 let res = get_window(start, end).await;
1435 assert_eq!(res.prev, None);
1436 assert_eq!(res.window, test_blocks[0]);
1437 assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1438
1439 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[2][0]);
1441 let end = i64::MAX as u64;
1442 let res = get_window(start, end).await;
1443 assert_eq!(res.prev.unwrap(), *test_blocks[1].last().unwrap());
1444 assert_eq!(res.window[..test_blocks[2].len()], test_blocks[2]);
1447 assert_eq!(res.next, None);
1448 let from = test_blocks.iter().flatten().count() - 1;
1452 let more = ds
1453 .get_header_window(WindowStart::Height(from as u64), end, i64::MAX as usize)
1454 .await
1455 .unwrap();
1456 check_invariants(&more, start, end, false);
1457 assert_eq!(
1458 more.prev.as_ref().unwrap(),
1459 test_blocks.iter().flatten().nth(from - 1).unwrap()
1460 );
1461 assert_eq!(
1462 more.window[..res.window.len() - test_blocks[2].len() + 1],
1463 res.window[test_blocks[2].len() - 1..]
1464 );
1465 assert_eq!(res.next, None);
1466 let more2 = ds
1468 .get_header_window(
1469 test_blocks[2].last().unwrap().commit(),
1470 end,
1471 i64::MAX as usize,
1472 )
1473 .await
1474 .unwrap();
1475 check_invariants(&more2, start, end, false);
1476 assert_eq!(more2.from().unwrap(), more.from().unwrap());
1477 assert_eq!(more2.prev, more.prev);
1478 assert_eq!(more2.next, more.next);
1479 assert_eq!(more2.window[..more.window.len()], more.window);
1480
1481 let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1483 let end = start;
1484 let res = get_window(start, end).await;
1485 assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1486 assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1487 assert_eq!(res.window, vec![]);
1488
1489 ds.get_header_window(
1491 WindowStart::Time((i64::MAX - 1) as u64),
1492 i64::MAX as u64,
1493 i64::MAX as usize,
1494 )
1495 .await
1496 .unwrap_err();
1497
1498 let blocks = [test_blocks[0].clone(), test_blocks[1].clone()]
1500 .into_iter()
1501 .flatten()
1502 .collect::<Vec<_>>();
1503 let start = block_header_timestamp(&blocks[0]);
1505 let end = block_header_timestamp(&test_blocks[2][0]);
1506 let res = ds
1507 .get_header_window(WindowStart::Time(start), end, 1)
1508 .await
1509 .unwrap();
1510 assert_eq!(res.prev, None);
1511 assert_eq!(res.window, [blocks[0].clone()]);
1512 assert_eq!(res.next, None);
1513 let res = ds
1515 .get_header_window(WindowStart::Height(blocks[0].height() + 1), end, 1)
1516 .await
1517 .unwrap();
1518 assert_eq!(res.window, [blocks[1].clone()]);
1519 assert_eq!(res.next, None);
1520 let res = ds
1522 .get_header_window(
1523 WindowStart::Height(blocks[1].height() + 1),
1524 end,
1525 blocks.len() - 1,
1526 )
1527 .await
1528 .unwrap();
1529 assert_eq!(res.window, blocks[2..].to_vec());
1530 assert_eq!(res.next, Some(test_blocks[2][0].clone()));
1531 }
1532
1533 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1534 pub async fn test_latest_qc_chain<D: TestableDataSource>()
1535 where
1536 for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1537 for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1538 {
1539 let storage = D::create(0).await;
1540 let ds = D::connect(&storage).await;
1541
1542 {
1543 let mut tx = ds.read().await.unwrap();
1544 assert_eq!(tx.latest_qc_chain().await.unwrap(), None);
1545 }
1546
1547 async fn leaf_with_qc_chain(
1548 number: u64,
1549 ) -> (LeafQueryData<MockTypes>, [CertificatePair<MockTypes>; 2]) {
1550 let mut leaf = Leaf2::<MockTypes>::genesis(
1551 &Default::default(),
1552 &Default::default(),
1553 TEST_VERSIONS.test.base,
1554 )
1555 .await;
1556 leaf.block_header_mut().block_number = number;
1557
1558 let mut qc1 = QuorumCertificate2::<MockTypes>::genesis(
1559 &Default::default(),
1560 &Default::default(),
1561 TEST_VERSIONS.test,
1562 )
1563 .await;
1564 qc1.view_number = ViewNumber::new(1);
1565 qc1.data.leaf_commit = Committable::commit(&leaf);
1566
1567 let mut qc2 = qc1.clone();
1568 qc2.view_number += 1;
1569
1570 let leaf = LeafQueryData::new(leaf, qc1.clone()).unwrap();
1571 (
1572 leaf,
1573 [
1574 CertificatePair::non_epoch_change(qc1),
1575 CertificatePair::non_epoch_change(qc2),
1576 ],
1577 )
1578 }
1579
1580 {
1582 let (leaf, qcs) = leaf_with_qc_chain(2).await;
1583 let mut tx = ds.write().await.unwrap();
1584 tx.insert_leaf_with_qc_chain(&leaf, Some(qcs.clone()))
1585 .await
1586 .unwrap();
1587 tx.commit().await.unwrap();
1588
1589 assert_eq!(
1590 ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1591 Some(qcs)
1592 );
1593 }
1594
1595 {
1598 let (leaf, _) = leaf_with_qc_chain(3).await;
1599 let mut tx = ds.write().await.unwrap();
1600 tx.insert_leaf_with_qc_chain(&leaf, None).await.unwrap();
1601 tx.commit().await.unwrap();
1602
1603 assert_eq!(
1604 ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1605 None
1606 );
1607 }
1608
1609 {
1612 let (leaf, qcs) = leaf_with_qc_chain(1).await;
1613 let mut tx = ds.write().await.unwrap();
1614 tx.insert_leaf_with_qc_chain(&leaf, Some(qcs))
1615 .await
1616 .unwrap();
1617 tx.commit().await.unwrap();
1618
1619 assert_eq!(
1620 ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1621 None
1622 );
1623 }
1624 }
1625}
1626
1627#[cfg(any(test, feature = "testing"))]
1629#[espresso_macros::generic_tests]
1630pub mod status_tests {
1631 use std::time::Duration;
1632
1633 use crate::{
1634 status::StatusDataSource,
1635 testing::{
1636 consensus::{DataSourceLifeCycle, MockNetwork},
1637 mocks::mock_transaction,
1638 sleep,
1639 },
1640 };
1641
1642 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1643 pub async fn test_metrics<D: DataSourceLifeCycle + StatusDataSource>() {
1644 let mut network = MockNetwork::<D>::init().await;
1645 let ds = network.data_source();
1646
1647 {
1648 assert_eq!(ds.block_height().await.unwrap(), 0);
1650 assert!(ds.success_rate().await.unwrap().is_nan());
1653 assert!(
1656 (ds.elapsed_time_since_last_decide().await.unwrap() as i64
1657 - chrono::Utc::now().timestamp())
1658 .abs()
1659 <= 1,
1660 "time elapsed since last_decided_time is not within 1s"
1661 );
1662 }
1663
1664 let txn = mock_transaction(vec![1, 2, 3]);
1666 network.submit_transaction(txn.clone()).await;
1667
1668 network.start().await;
1670
1671 loop {
1673 let height = ds.block_height().await.unwrap();
1674 if height > 1 {
1675 break;
1676 }
1677 tracing::info!(height, "waiting for a block to be finalized");
1678 sleep(Duration::from_secs(1)).await;
1679 }
1680
1681 {
1682 let success_rate = ds.success_rate().await.unwrap();
1686 assert!(success_rate.is_finite(), "{success_rate}");
1687 assert!(success_rate > 0.0, "{success_rate}");
1688 }
1689
1690 {
1691 network.shut_down().await;
1694 sleep(Duration::from_secs(3)).await;
1695 assert!(ds.elapsed_time_since_last_decide().await.unwrap() >= 3);
1697 }
1698 }
1699}
1700
1701#[macro_export]
1702macro_rules! instantiate_data_source_tests {
1703 ($t:ty) => {
1704 use $crate::data_source::{
1705 availability_tests, node_tests, persistence_tests, status_tests,
1706 };
1707
1708 instantiate_availability_tests!($t);
1709 instantiate_persistence_tests!($t);
1710 instantiate_node_tests!($t);
1711 instantiate_status_tests!($t);
1712 };
1713}