Skip to main content

hotshot_query_service/
data_source.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13//! Persistent storage and sources of data consumed by APIs.
14//!
15//! The APIs provided by this query service are generic over the implementation which actually
16//! retrieves data in answer to queries. We call this implementation a _data source_. This module
17//! defines a data source and provides several pre-built implementations:
18//! * [`FileSystemDataSource`]
19//! * [`SqlDataSource`]
20//! * [`FetchingDataSource`], a generalization of the above
21//! * [`MetricsDataSource`]
22//!
23//! The user can choose which data source to use when initializing the query service.
24//!
25//! We also provide combinators for modularly adding functionality to existing data sources:
26//! * [`ExtensibleDataSource`]
27//!
28
29mod extension;
30pub mod fetching;
31pub mod fs;
32mod metrics;
33mod notifier;
34pub mod sql;
35pub mod storage;
36mod update;
37
38pub use extension::ExtensibleDataSource;
39pub use fetching::{AvailabilityProvider, FetchingDataSource};
40#[cfg(feature = "file-system-data-source")]
41pub use fs::FileSystemDataSource;
42#[cfg(feature = "metrics-data-source")]
43pub use metrics::MetricsDataSource;
44#[cfg(feature = "sql-data-source")]
45pub use sql::SqlDataSource;
46pub use update::{Transaction, UpdateDataSource, VersionedDataSource};
47
48#[cfg(any(test, feature = "testing"))]
49mod test_helpers {
50    use std::ops::{Bound, RangeBounds};
51
52    use futures::{
53        future,
54        stream::{BoxStream, StreamExt},
55    };
56
57    use crate::{
58        availability::{BlockQueryData, Fetch, LeafQueryData},
59        node::NodeDataSource,
60        testing::{consensus::TestableDataSource, mocks::MockTypes},
61    };
62
63    /// Apply an upper bound to a range based on the currently available block height.
64    async fn bound_range<R, D>(ds: &D, range: R) -> impl RangeBounds<usize> + use<R, D>
65    where
66        D: TestableDataSource,
67        R: RangeBounds<usize>,
68    {
69        let start = range.start_bound().cloned();
70        let mut end = range.end_bound().cloned();
71        if end == Bound::Unbounded {
72            end = Bound::Excluded(NodeDataSource::block_height(ds).await.unwrap());
73        }
74        (start, end)
75    }
76
77    /// Get a stream of blocks, implicitly terminating at the current block height.
78    pub async fn block_range<R, D>(
79        ds: &D,
80        range: R,
81    ) -> BoxStream<'static, BlockQueryData<MockTypes>>
82    where
83        D: TestableDataSource,
84        R: RangeBounds<usize> + Send + 'static,
85    {
86        ds.get_block_range(bound_range(ds, range).await)
87            .await
88            .then(Fetch::resolve)
89            .boxed()
90    }
91
92    /// Get a stream of leaves, implicitly terminating at the current block height.
93    pub async fn leaf_range<R, D>(ds: &D, range: R) -> BoxStream<'static, LeafQueryData<MockTypes>>
94    where
95        D: TestableDataSource,
96        R: RangeBounds<usize> + Send + 'static,
97    {
98        ds.get_leaf_range(bound_range(ds, range).await)
99            .await
100            .then(Fetch::resolve)
101            .boxed()
102    }
103
104    pub async fn get_non_empty_blocks<D>(
105        ds: &D,
106    ) -> Vec<(LeafQueryData<MockTypes>, BlockQueryData<MockTypes>)>
107    where
108        D: TestableDataSource,
109    {
110        // Ignore the genesis block (start from height 1).
111        leaf_range(ds, 1..)
112            .await
113            .zip(block_range(ds, 1..).await)
114            .filter(|(_, block)| future::ready(!block.is_empty()))
115            .collect()
116            .await
117    }
118}
119
120/// Generic tests we can instantiate for all the availability data sources.
121#[cfg(any(test, feature = "testing"))]
122#[espresso_macros::generic_tests]
123pub mod availability_tests {
124    use std::{
125        collections::HashMap,
126        fmt::Debug,
127        ops::{Bound, RangeBounds},
128    };
129
130    use committable::Committable;
131    use futures::stream::StreamExt;
132    use hotshot_example_types::node_types::TEST_VERSIONS;
133    use hotshot_types::{data::Leaf2, vote::HasViewNumber};
134
135    use super::test_helpers::*;
136    use crate::{
137        availability::{BlockId, BlockQueryData, LeafQueryData, VidCommonQueryData, payload_size},
138        data_source::{
139            Transaction,
140            storage::{AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage},
141        },
142        node::NodeDataSource,
143        testing::{
144            consensus::{MockNetwork, TestableDataSource},
145            mocks::{MockTypes, mock_transaction},
146        },
147        types::HeightIndexed,
148    };
149
150    async fn validate<D: TestableDataSource>(ds: &D)
151    where
152        for<'a> D::ReadOnly<'a>: AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
153    {
154        // Check the consistency of every block/leaf pair. Keep track of payloads and transactions
155        // we've seen so we can detect duplicates.
156        let mut seen_payloads = HashMap::new();
157        let mut seen_transactions = HashMap::new();
158        let mut leaves = leaf_range(ds, ..).await.enumerate();
159        while let Some((i, leaf)) = leaves.next().await {
160            assert_eq!(leaf.height(), i as u64);
161            assert_eq!(
162                leaf.hash(),
163                <Leaf2<MockTypes> as Committable>::commit(&leaf.leaf)
164            );
165
166            // Check indices.
167            tracing::info!("looking up leaf {i} various ways");
168            assert_eq!(leaf, ds.get_leaf(i).await.await);
169            assert_eq!(leaf, ds.get_leaf(leaf.hash()).await.await);
170
171            tracing::info!("looking up block {i} various ways");
172            let block = ds.get_block(i).await.await;
173            assert_eq!(leaf.block_hash(), block.hash());
174            assert_eq!(block.height(), i as u64);
175            assert_eq!(block.hash(), block.header().commit());
176            assert_eq!(block.size(), payload_size::<MockTypes>(block.payload()));
177
178            // Check indices.
179            assert_eq!(block, ds.get_block(i).await.await);
180            assert_eq!(ds.get_block(block.hash()).await.await.height(), i as u64);
181            // We should be able to look up the block by payload hash unless its payload is a
182            // duplicate. For duplicate payloads, this function returns the index of the first
183            // duplicate.
184            //
185            // Note: this ordering is not a strict requirement. It should hold for payloads in local
186            // storage, but we don't have a good way of enforcing it if the payload is missing, in
187            // which case we will return the first matching payload we see, which could happen in
188            // any order. We use `try_resolve` to skip this check if the object isn't available
189            // locally.
190            let ix = seen_payloads
191                .entry(block.payload_hash())
192                .or_insert(i as u64);
193            if let Ok(block) = ds
194                .get_block(BlockId::PayloadHash(block.payload_hash()))
195                .await
196                .try_resolve()
197            {
198                assert_eq!(block.height(), *ix);
199            } else {
200                tracing::warn!(
201                    "skipping block by payload index check for missing payload {:?}",
202                    block.header()
203                );
204                // At least check that _some_ block can be fetched.
205                ds.get_block(BlockId::PayloadHash(block.payload_hash()))
206                    .await
207                    .await;
208            }
209
210            // Check payload lookup.
211            tracing::info!("looking up payload {i} various ways");
212            let expected_payload = block.clone().into();
213            assert_eq!(ds.get_payload(i).await.await, expected_payload);
214            assert_eq!(ds.get_payload(block.hash()).await.await, expected_payload);
215            // Similar to the above, we can't guarantee which index we will get when passively
216            // fetching this payload, so only check the index if the payload is available locally.
217            if let Ok(payload) = ds
218                .get_payload(BlockId::PayloadHash(block.payload_hash()))
219                .await
220                .try_resolve()
221            {
222                if *ix == i as u64 {
223                    assert_eq!(payload, expected_payload);
224                }
225            } else {
226                tracing::warn!(
227                    "skipping payload index check for missing payload {:?}",
228                    block.header()
229                );
230                // At least check that _some_ payload can be fetched.
231                ds.get_payload(BlockId::PayloadHash(block.payload_hash()))
232                    .await
233                    .await;
234            }
235
236            // Look up the common VID data.
237            tracing::info!("looking up VID common {i} various ways");
238            let common = ds.get_vid_common(block.height() as usize).await.await;
239            assert_eq!(common, ds.get_vid_common(block.hash()).await.await);
240            // Similar to the above, we can't guarantee which index we will get when passively
241            // fetching this data, so only check the index if the data is available locally.
242            if let Ok(res) = ds
243                .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
244                .await
245                .try_resolve()
246            {
247                if *ix == i as u64 {
248                    assert_eq!(res, common);
249                }
250            } else {
251                tracing::warn!(
252                    "skipping VID common index check for missing data {:?}",
253                    block.header()
254                );
255                // At least check that _some_ data can be fetched.
256                let res = ds
257                    .get_vid_common(BlockId::PayloadHash(block.payload_hash()))
258                    .await
259                    .await;
260                assert_eq!(res.payload_hash(), common.payload_hash());
261            }
262
263            for (j, txn) in block.enumerate() {
264                tracing::info!("looking up transaction {i},{j:?}");
265
266                // We should be able to look up the transaction by hash unless it is a duplicate.
267                // For duplicate transactions, this function returns the index of the first
268                // duplicate.
269                //
270                // Similar to the above, we can't guarantee which index we will get when passively
271                // fetching this transaction, so only check the index if the transaction is
272                // available locally.
273                let ix = seen_transactions
274                    .entry(txn.commit())
275                    .or_insert((i as u64, j.clone()));
276                if let Ok(tx_data) = ds
277                    .get_block_containing_transaction(txn.commit())
278                    .await
279                    .try_resolve()
280                {
281                    assert_eq!(tx_data.transaction.transaction(), &txn);
282                    assert_eq!(tx_data.transaction.block_height(), ix.0);
283                    assert_eq!(tx_data.transaction.index(), ix.1.position as u64);
284                    assert_eq!(tx_data.index, ix.1);
285                    assert_eq!(tx_data.block, block);
286                } else {
287                    tracing::warn!(
288                        "skipping transaction index check for missing transaction {j:?} {txn:?}"
289                    );
290                    // At least check that _some_ transaction can be fetched.
291                    ds.get_block_containing_transaction(txn.commit())
292                        .await
293                        .await;
294                }
295            }
296        }
297
298        // Validate consistency of latest QC chain (only available after epoch upgrade).
299        {
300            let mut tx = ds.read().await.unwrap();
301            let block_height = NodeStorage::block_height(&mut tx).await.unwrap();
302            let last_leaf = tx.get_leaf((block_height - 1).into()).await.unwrap();
303
304            if last_leaf.qc().data.epoch.is_some() {
305                tracing::info!(block_height, "checking QC chain");
306                let qc_chain = tx.latest_qc_chain().await.unwrap().unwrap();
307
308                assert_eq!(last_leaf.height(), (block_height - 1) as u64);
309                assert_eq!(qc_chain[0].view_number(), last_leaf.leaf().view_number());
310                assert_eq!(qc_chain[0].leaf_commit(), last_leaf.hash());
311                assert_eq!(qc_chain[1].view_number(), qc_chain[0].view_number() + 1);
312            }
313        }
314    }
315
316    #[test_log::test(tokio::test(flavor = "multi_thread"))]
317    pub async fn test_update<D: TestableDataSource>()
318    where
319        for<'a> D::ReadOnly<'a>: AvailabilityStorage<MockTypes> + NodeStorage<MockTypes>,
320    {
321        let mut network = MockNetwork::<D>::init().await;
322        let ds = network.data_source();
323
324        network.start().await;
325        assert_eq!(get_non_empty_blocks(&ds).await, vec![]);
326
327        // Submit a few blocks and make sure each one gets reflected in the query service and
328        // preserves the consistency of the data and indices.
329        let mut blocks = ds.subscribe_blocks(0).await.enumerate();
330        for nonce in 0..3 {
331            let txn = mock_transaction(vec![nonce]);
332            network.submit_transaction(txn).await;
333
334            // Wait for the transaction to be finalized.
335            let (i, block) = loop {
336                tracing::info!("waiting for tx {nonce}");
337                let (i, block) = blocks.next().await.unwrap();
338                if !block.is_empty() {
339                    break (i, block);
340                }
341                tracing::info!("block {i} is empty");
342            };
343
344            tracing::info!("got tx {nonce} in block {i}");
345            assert_eq!(ds.get_block(i).await.await, block);
346            validate(&ds).await;
347        }
348
349        // Check that all the updates have been committed to storage, not simply held in memory: we
350        // should be able to read the same data if we connect an entirely new data source to the
351        // underlying storage.
352        {
353            tracing::info!("checking persisted storage");
354            let storage = D::connect(network.storage()).await;
355
356            // Ensure we have the same data in both data sources (if data was missing from the
357            // original it is of course allowed to be missing from persistent storage and thus from
358            // the latter).
359            let block_height = NodeDataSource::block_height(&ds).await.unwrap();
360            assert_eq!(
361                ds.get_block_range(..block_height)
362                    .await
363                    .map(|fetch| fetch.try_resolve().ok())
364                    .collect::<Vec<_>>()
365                    .await,
366                storage
367                    .get_block_range(..block_height)
368                    .await
369                    .map(|fetch| fetch.try_resolve().ok())
370                    .collect::<Vec<_>>()
371                    .await
372            );
373            assert_eq!(
374                ds.get_leaf_range(..block_height)
375                    .await
376                    .map(|fetch| fetch.try_resolve().ok())
377                    .collect::<Vec<_>>()
378                    .await,
379                storage
380                    .get_leaf_range(..block_height)
381                    .await
382                    .map(|fetch| fetch.try_resolve().ok())
383                    .collect::<Vec<_>>()
384                    .await
385            );
386        }
387    }
388
389    #[test_log::test(tokio::test(flavor = "multi_thread"))]
390    pub async fn test_range<D: TestableDataSource>()
391    where
392        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
393    {
394        let mut network = MockNetwork::<D>::init().await;
395        let ds = network.data_source();
396        network.start().await;
397
398        // Wait for there to be at least 3 blocks.
399        let block_height = loop {
400            let mut tx = ds.read().await.unwrap();
401            let block_height = tx.block_height().await.unwrap();
402            if block_height >= 3 {
403                break block_height as u64;
404            }
405        };
406
407        // Query for a variety of ranges testing all cases of included, excluded, and unbounded
408        // starting and ending bounds
409        do_range_test(&ds, 1..=2, 1..3).await; // (inclusive, inclusive)
410        do_range_test(&ds, 1..3, 1..3).await; // (inclusive, exclusive)
411        do_range_test(&ds, 1.., 1..block_height).await; // (inclusive, unbounded)
412        do_range_test(&ds, ..=2, 0..3).await; // (unbounded, inclusive)
413        do_range_test(&ds, ..3, 0..3).await; // (unbounded, exclusive)
414        do_range_test(&ds, .., 0..block_height).await; // (unbounded, unbounded)
415        do_range_test(&ds, ExRange(0..=2), 1..3).await; // (exclusive, inclusive)
416        do_range_test(&ds, ExRange(0..3), 1..3).await; // (exclusive, exclusive)
417        do_range_test(&ds, ExRange(0..), 1..block_height).await; // (exclusive, unbounded)
418    }
419
420    async fn do_range_test<D, R, I>(ds: &D, range: R, expected_indices: I)
421    where
422        D: TestableDataSource,
423        R: RangeBounds<usize> + Clone + Debug + Send + 'static,
424        I: IntoIterator<Item = u64>,
425    {
426        tracing::info!("testing range {range:?}");
427
428        let mut leaves = ds.get_leaf_range(range.clone()).await;
429        let mut blocks = ds.get_block_range(range.clone()).await;
430        let mut payloads = ds.get_payload_range(range.clone()).await;
431        let mut payloads_meta = ds.get_payload_metadata_range(range.clone()).await;
432        let mut vid_common = ds.get_vid_common_range(range.clone()).await;
433        let mut vid_common_meta = ds.get_vid_common_metadata_range(range.clone()).await;
434
435        for i in expected_indices {
436            tracing::info!(i, "check entries");
437            let leaf = leaves.next().await.unwrap().await;
438            let block = blocks.next().await.unwrap().await;
439            let payload = payloads.next().await.unwrap().await;
440            let payload_meta = payloads_meta.next().await.unwrap().await;
441            let common = vid_common.next().await.unwrap().await;
442            let common_meta = vid_common_meta.next().await.unwrap().await;
443            assert_eq!(leaf.height(), i);
444            assert_eq!(block.height(), i);
445            assert_eq!(payload, ds.get_payload(i as usize).await.await);
446            assert_eq!(payload_meta, block.into());
447            assert_eq!(common, ds.get_vid_common(i as usize).await.await);
448            assert_eq!(common_meta, common.into());
449        }
450
451        if range.end_bound() == Bound::Unbounded {
452            // If the range is unbounded, the stream should continue, eventually reaching a point at
453            // which further objects are not yet available, and yielding pending futures from there.
454            loop {
455                let fetch_leaf = leaves.next().await.unwrap();
456                let fetch_block = blocks.next().await.unwrap();
457                let fetch_payload = payloads.next().await.unwrap();
458                let fetch_payload_meta = payloads_meta.next().await.unwrap();
459                let fetch_common = vid_common.next().await.unwrap();
460                let fetch_common_meta = vid_common_meta.next().await.unwrap();
461
462                if fetch_leaf.try_resolve().is_ok()
463                    && fetch_block.try_resolve().is_ok()
464                    && fetch_payload.try_resolve().is_ok()
465                    && fetch_payload_meta.try_resolve().is_ok()
466                    && fetch_common.try_resolve().is_ok()
467                    && fetch_common_meta.try_resolve().is_ok()
468                {
469                    tracing::info!("searching for end of available objects");
470                } else {
471                    break;
472                }
473            }
474        } else {
475            // If the range is bounded, it should end where expected.
476            assert!(leaves.next().await.is_none());
477            assert!(blocks.next().await.is_none());
478            assert!(payloads.next().await.is_none());
479            assert!(payloads_meta.next().await.is_none());
480            assert!(vid_common.next().await.is_none());
481            assert!(vid_common_meta.next().await.is_none());
482        }
483    }
484
485    #[test_log::test(tokio::test(flavor = "multi_thread"))]
486    pub async fn test_range_rev<D: TestableDataSource>()
487    where
488        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
489    {
490        let mut network = MockNetwork::<D>::init().await;
491        let ds = network.data_source();
492        network.start().await;
493
494        // Wait for there to be at least 5 blocks.
495        ds.subscribe_leaves(5).await.next().await.unwrap();
496
497        // Test inclusive, exclusive and unbounded lower bound.
498        do_range_rev_test(&ds, Bound::Included(1), 5, 1..=5).await;
499        do_range_rev_test(&ds, Bound::Excluded(1), 5, 2..=5).await;
500        do_range_rev_test(&ds, Bound::Unbounded, 5, 0..=5).await;
501    }
502
503    async fn do_range_rev_test<D>(
504        ds: &D,
505        start: Bound<usize>,
506        end: usize,
507        expected_indices: impl DoubleEndedIterator<Item = u64>,
508    ) where
509        D: TestableDataSource,
510    {
511        tracing::info!("testing range {start:?}-{end}");
512
513        let mut leaves = ds.get_leaf_range_rev(start, end).await;
514        let mut blocks = ds.get_block_range_rev(start, end).await;
515        let mut payloads = ds.get_payload_range_rev(start, end).await;
516        let mut payloads_meta = ds.get_payload_metadata_range_rev(start, end).await;
517        let mut vid_common = ds.get_vid_common_range_rev(start, end).await;
518        let mut vid_common_meta = ds.get_vid_common_metadata_range_rev(start, end).await;
519
520        for i in expected_indices.rev() {
521            tracing::info!(i, "check entries");
522            let leaf = leaves.next().await.unwrap().await;
523            let block = blocks.next().await.unwrap().await;
524            let payload = payloads.next().await.unwrap().await;
525            let payload_meta = payloads_meta.next().await.unwrap().await;
526            let common = vid_common.next().await.unwrap().await;
527            let common_meta = vid_common_meta.next().await.unwrap().await;
528            assert_eq!(leaf.height(), i);
529            assert_eq!(block.height(), i);
530            assert_eq!(payload.height(), i);
531            assert_eq!(payload_meta.height(), i);
532            assert_eq!(common, ds.get_vid_common(i as usize).await.await);
533            assert_eq!(
534                common_meta,
535                ds.get_vid_common_metadata(i as usize).await.await
536            );
537        }
538
539        // The range should end where expected.
540        assert!(leaves.next().await.is_none());
541        assert!(blocks.next().await.is_none());
542        assert!(payloads.next().await.is_none());
543        assert!(payloads_meta.next().await.is_none());
544        assert!(vid_common.next().await.is_none());
545        assert!(vid_common_meta.next().await.is_none());
546    }
547
548    // A wrapper around a range that turns the lower bound from inclusive to exclusive.
549    #[derive(Clone, Copy, Debug)]
550    struct ExRange<R>(R);
551
552    impl<R: RangeBounds<usize>> RangeBounds<usize> for ExRange<R> {
553        fn start_bound(&self) -> Bound<&usize> {
554            match self.0.start_bound() {
555                Bound::Included(x) => Bound::Excluded(x),
556                Bound::Excluded(x) => Bound::Excluded(x),
557                Bound::Unbounded => Bound::Excluded(&0),
558            }
559        }
560
561        fn end_bound(&self) -> Bound<&usize> {
562            self.0.end_bound()
563        }
564    }
565
566    /// Regression test for a SQL bug.
567    ///
568    /// In PostgreSQL, upserting with multiple conflicting rows in a single statement is not
569    /// allowed.
570    #[tokio::test]
571    #[test_log::test]
572    pub async fn test_insert_consecutive_identical_blocks<D: TestableDataSource>()
573    where
574        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
575    {
576        let storage = D::create(0).await;
577        let ds = D::connect(&storage).await;
578
579        let leaf = LeafQueryData::<MockTypes>::genesis(
580            &Default::default(),
581            &Default::default(),
582            TEST_VERSIONS.test,
583        )
584        .await;
585        let block = BlockQueryData::<MockTypes>::genesis(
586            &Default::default(),
587            &Default::default(),
588            TEST_VERSIONS.test.base,
589        )
590        .await;
591        let vid = VidCommonQueryData::<MockTypes>::genesis(
592            &Default::default(),
593            &Default::default(),
594            TEST_VERSIONS.test.base,
595        )
596        .await;
597
598        let mut leaf2 = leaf.clone();
599        leaf2.leaf.block_header_mut().block_number += 1;
600        let block2 =
601            BlockQueryData::<MockTypes>::new(leaf2.header().clone(), block.payload.clone());
602        let vid2 = VidCommonQueryData::<MockTypes>::new(leaf2.header().clone(), vid.common.clone());
603
604        {
605            let mut tx = ds.write().await.unwrap();
606            tx.insert_leaf_range([&leaf, &leaf2]).await.unwrap();
607            tx.insert_block_range([&block, &block2]).await.unwrap();
608            tx.insert_vid_range([(&vid, None), (&vid2, None)])
609                .await
610                .unwrap();
611            tx.commit().await.unwrap();
612        }
613
614        assert_eq!(ds.get_leaf(0).await.await, leaf);
615        assert_eq!(ds.get_leaf(1).await.await, leaf2);
616        assert_eq!(ds.get_block(0).await.await, block);
617        assert_eq!(ds.get_block(1).await.await, block2);
618        assert_eq!(ds.get_vid_common(0).await.await, vid);
619        assert_eq!(ds.get_vid_common(1).await.await, vid2);
620    }
621}
622
623/// Generic tests we can instantiate for any data source with reliable, versioned persistent storage.
624#[cfg(any(test, feature = "testing"))]
625#[espresso_macros::generic_tests]
626pub mod persistence_tests {
627    use committable::Committable;
628    use hotshot_example_types::{
629        node_types::TEST_VERSIONS,
630        state_types::{TestInstanceState, TestValidatedState},
631    };
632    use hotshot_types::simple_certificate::QuorumCertificate2;
633
634    use crate::{
635        Leaf2,
636        availability::{BlockQueryData, LeafQueryData},
637        data_source::{
638            Transaction,
639            storage::{AvailabilityStorage, NodeStorage, UpdateAvailabilityStorage},
640        },
641        node::NodeDataSource,
642        testing::{
643            consensus::TestableDataSource,
644            mocks::{MockPayload, MockTypes},
645        },
646        types::HeightIndexed,
647    };
648
649    #[test_log::test(tokio::test(flavor = "multi_thread"))]
650    pub async fn test_revert<D: TestableDataSource>()
651    where
652        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
653            + AvailabilityStorage<MockTypes>
654            + NodeStorage<MockTypes>,
655    {
656        let storage = D::create(0).await;
657        let ds = D::connect(&storage).await;
658
659        // Mock up some consensus data.
660        let mut qc = QuorumCertificate2::<MockTypes>::genesis(
661            &TestValidatedState::default(),
662            &TestInstanceState::default(),
663            TEST_VERSIONS.test,
664        )
665        .await;
666        let mut leaf = Leaf2::<MockTypes>::genesis(
667            &TestValidatedState::default(),
668            &TestInstanceState::default(),
669            TEST_VERSIONS.test.base,
670        )
671        .await;
672        // Increment the block number, to distinguish this block from the genesis block, which
673        // already exists.
674        leaf.block_header_mut().block_number += 1;
675        qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
676
677        let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
678        let leaf = LeafQueryData::new(leaf, qc).unwrap();
679
680        // Insert, but do not commit, some data and check that we can read it back.
681        let mut tx = ds.write().await.unwrap();
682        tx.insert_leaf(&leaf).await.unwrap();
683        tx.insert_block(&block).await.unwrap();
684
685        assert_eq!(tx.block_height().await.unwrap(), 2);
686        assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
687        assert_eq!(block, tx.get_block(1.into()).await.unwrap());
688
689        // Revert the changes.
690        tx.revert().await;
691        assert_eq!(
692            NodeDataSource::<MockTypes>::block_height(&ds)
693                .await
694                .unwrap(),
695            0
696        );
697        ds.get_leaf(1).await.try_resolve().unwrap_err();
698        ds.get_block(1).await.try_resolve().unwrap_err();
699    }
700
701    #[test_log::test(tokio::test(flavor = "multi_thread"))]
702    pub async fn test_reset<D: TestableDataSource>()
703    where
704        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
705    {
706        let storage = D::create(0).await;
707        let ds = D::connect(&storage).await;
708
709        // Mock up some consensus data.
710        let mut qc = QuorumCertificate2::<MockTypes>::genesis(
711            &TestValidatedState::default(),
712            &TestInstanceState::default(),
713            TEST_VERSIONS.test,
714        )
715        .await;
716        let mut leaf = Leaf2::<MockTypes>::genesis(
717            &TestValidatedState::default(),
718            &TestInstanceState::default(),
719            TEST_VERSIONS.test.base,
720        )
721        .await;
722        // Increment the block number, to distinguish this block from the genesis block, which
723        // already exists.
724        leaf.block_header_mut().block_number += 1;
725        qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&leaf);
726
727        let block = BlockQueryData::new(leaf.block_header().clone(), MockPayload::genesis());
728        let leaf = LeafQueryData::new(leaf, qc).unwrap();
729
730        // Insert some data and check that we can read it back.
731        let mut tx = ds.write().await.unwrap();
732        tx.insert_leaf(&leaf).await.unwrap();
733        tx.insert_block(&block).await.unwrap();
734        tx.commit().await.unwrap();
735
736        assert_eq!(
737            NodeDataSource::<MockTypes>::block_height(&ds)
738                .await
739                .unwrap(),
740            2
741        );
742        assert_eq!(leaf, ds.get_leaf(1).await.await);
743        assert_eq!(block, ds.get_block(1).await.await);
744
745        drop(ds);
746
747        // Reset and check that the changes are gone.
748        let ds = D::reset(&storage).await;
749        assert_eq!(
750            NodeDataSource::<MockTypes>::block_height(&ds)
751                .await
752                .unwrap(),
753            0
754        );
755        ds.get_leaf(1).await.try_resolve().unwrap_err();
756        ds.get_block(1).await.try_resolve().unwrap_err();
757    }
758
759    #[test_log::test(tokio::test(flavor = "multi_thread"))]
760    pub async fn test_drop_tx<D: TestableDataSource>()
761    where
762        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>
763            + AvailabilityStorage<MockTypes>
764            + NodeStorage<MockTypes>,
765        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
766    {
767        let storage = D::create(0).await;
768        let ds = D::connect(&storage).await;
769
770        // Mock up some consensus data.
771        let mut mock_qc = QuorumCertificate2::<MockTypes>::genesis(
772            &TestValidatedState::default(),
773            &TestInstanceState::default(),
774            TEST_VERSIONS.test,
775        )
776        .await;
777        let mut mock_leaf = Leaf2::<MockTypes>::genesis(
778            &TestValidatedState::default(),
779            &TestInstanceState::default(),
780            TEST_VERSIONS.test.base,
781        )
782        .await;
783        // Increment the block number, to distinguish this block from the genesis block, which
784        // already exists.
785        mock_leaf.block_header_mut().block_number += 1;
786        mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
787
788        let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
789        let leaf = LeafQueryData::new(mock_leaf.clone(), mock_qc.clone()).unwrap();
790
791        // Insert, but do not commit, some data and check that we can read it back.
792        tracing::info!("write");
793        let mut tx = ds.write().await.unwrap();
794        tx.insert_leaf(&leaf).await.unwrap();
795        tx.insert_block(&block).await.unwrap();
796
797        assert_eq!(tx.block_height().await.unwrap(), 2);
798        assert_eq!(leaf, tx.get_leaf(1.into()).await.unwrap());
799        assert_eq!(block, tx.get_block(1.into()).await.unwrap());
800
801        // Drop the transaction, causing a revert.
802        drop(tx);
803
804        // Open a new transaction and check that the changes are reverted.
805        tracing::info!("read");
806        let mut tx = ds.read().await.unwrap();
807        assert_eq!(tx.block_height().await.unwrap(), 0);
808        drop(tx);
809
810        // Get a mutable transaction again, insert different data.
811        mock_leaf.block_header_mut().block_number += 1;
812        mock_qc.data.leaf_commit = <Leaf2<MockTypes> as Committable>::commit(&mock_leaf);
813        let block = BlockQueryData::new(mock_leaf.block_header().clone(), MockPayload::genesis());
814        let leaf = LeafQueryData::new(mock_leaf, mock_qc).unwrap();
815
816        tracing::info!("write again");
817        let mut tx = ds.write().await.unwrap();
818        tx.insert_leaf(&leaf).await.unwrap();
819        tx.insert_block(&block).await.unwrap();
820        tx.commit().await.unwrap();
821
822        // Read the data back. We should have _only_ the data that was written in the final
823        // transaction.
824        tracing::info!("read again");
825        let height = leaf.height() as usize;
826        assert_eq!(
827            NodeDataSource::<MockTypes>::block_height(&ds)
828                .await
829                .unwrap(),
830            height + 1
831        );
832        assert_eq!(leaf, ds.get_leaf(height).await.await);
833        assert_eq!(block, ds.get_block(height).await.await);
834        ds.get_leaf(height - 1).await.try_resolve().unwrap_err();
835        ds.get_block(height - 1).await.try_resolve().unwrap_err();
836    }
837}
838
839/// Generic tests we can instantiate for all the node data sources.
840#[cfg(any(test, feature = "testing"))]
841#[espresso_macros::generic_tests]
842pub mod node_tests {
843    use std::time::Duration;
844
845    use committable::Committable;
846    use futures::{future::join_all, stream::StreamExt};
847    use hotshot::traits::BlockPayload;
848    use hotshot_example_types::{
849        block_types::{TestBlockHeader, TestBlockPayload, TestMetadata},
850        node_types::{TEST_VERSIONS, TestTypes},
851        state_types::{TestInstanceState, TestValidatedState},
852    };
853    use hotshot_types::{
854        data::{VidCommitment, VidCommon, VidShare, ViewNumber, vid_commitment},
855        simple_certificate::{CertificatePair, QuorumCertificate2},
856        traits::block_contents::{BlockHeader, EncodeBytes},
857        vid::advz::{ADVZScheme, advz_scheme},
858    };
859    use jf_advz::VidScheme;
860    use pretty_assertions::assert_eq;
861
862    use crate::{
863        Header, Leaf2,
864        availability::{BlockInfo, BlockQueryData, LeafQueryData, VidCommonQueryData},
865        data_source::{
866            storage::{NodeStorage, UpdateAvailabilityStorage},
867            update::Transaction,
868        },
869        node::{
870            BlockId, NodeDataSource, ResourceSyncStatus, SyncStatus, SyncStatusQueryData,
871            SyncStatusRange, TimeWindowQueryData, WindowStart,
872        },
873        testing::{
874            consensus::{MockNetwork, TestableDataSource},
875            mocks::{MockPayload, MockTypes, mock_transaction},
876            sleep,
877        },
878        types::HeightIndexed,
879    };
880
881    fn block_header_timestamp(header: &Header<MockTypes>) -> u64 {
882        <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
883    }
884
885    #[test_log::test(tokio::test(flavor = "multi_thread"))]
886    pub async fn test_sync_status<D: TestableDataSource>()
887    where
888        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
889    {
890        let storage = D::create(0).await;
891        let ds = D::build(&storage, |builder| {
892            builder.with_sync_status_ttl(Duration::ZERO)
893        })
894        .await;
895
896        // Set up a mock VID scheme to use for generating test data.
897        let mut vid = advz_scheme(2);
898
899        // Generate some mock leaves and blocks to insert.
900        let mut leaves = vec![
901            LeafQueryData::<MockTypes>::genesis(
902                &TestValidatedState::default(),
903                &TestInstanceState::default(),
904                TEST_VERSIONS.test,
905            )
906            .await,
907        ];
908        let mut blocks = vec![
909            BlockQueryData::<MockTypes>::genesis(
910                &TestValidatedState::default(),
911                &TestInstanceState::default(),
912                TEST_VERSIONS.test.base,
913            )
914            .await,
915        ];
916        let dispersal = vid.disperse([]).unwrap();
917        let mut vid_commons = vec![VidCommonQueryData::new(
918            leaves[0].header().clone(),
919            VidCommon::V0(dispersal.common.clone()),
920        )];
921        for i in 0..2 {
922            // Generate a unique payload and VID data, so that missing data is actually missing
923            // (otherwise it could be borrowed from another block).
924            let (payload, metadata) = <MockPayload as BlockPayload<MockTypes>>::from_transactions(
925                vec![mock_transaction(vec![i as u8])],
926                &Default::default(),
927                &Default::default(),
928            )
929            .await
930            .unwrap();
931            let dispersal = vid.disperse(payload.encode()).unwrap();
932
933            let mut leaf = leaves[i].clone();
934            leaf.leaf.block_header_mut().block_number += 1;
935            leaf.leaf.block_header_mut().payload_commitment = VidCommitment::V0(dispersal.commit);
936            leaf.leaf.block_header_mut().metadata = metadata;
937            let block = BlockQueryData::new(leaf.header().clone(), payload);
938            let vid_common = VidCommonQueryData::new(
939                leaf.header().clone(),
940                VidCommon::V0(dispersal.common.clone()),
941            );
942
943            leaves.push(leaf);
944            blocks.push(block);
945            vid_commons.push(vid_common);
946        }
947
948        // At first, the node is fully synced.
949        assert!(ds.sync_status().await.unwrap().is_fully_synced());
950
951        // Insert a leaf without the corresponding block or VID info, make sure we detect that the
952        // block and VID info are missing.
953        ds.append(leaves[0].clone().into()).await.unwrap();
954        assert_eq!(
955            ds.sync_status().await.unwrap(),
956            SyncStatusQueryData {
957                blocks: ResourceSyncStatus {
958                    missing: 1,
959                    ranges: vec![SyncStatusRange {
960                        start: 0,
961                        end: 1,
962                        status: SyncStatus::Missing,
963                    }]
964                },
965                vid_common: ResourceSyncStatus {
966                    missing: 1,
967                    ranges: vec![SyncStatusRange {
968                        start: 0,
969                        end: 1,
970                        status: SyncStatus::Missing,
971                    }]
972                },
973                leaves: ResourceSyncStatus {
974                    missing: 0,
975                    ranges: vec![SyncStatusRange {
976                        start: 0,
977                        end: 1,
978                        status: SyncStatus::Present,
979                    }]
980                },
981                pruned_height: None,
982            }
983        );
984
985        // Insert a leaf whose height is not the successor of the previous leaf. We should now
986        // detect that the leaf in between is missing (along with all _three_ corresponding blocks).
987        ds.append(leaves[2].clone().into()).await.unwrap();
988        assert_eq!(
989            ds.sync_status().await.unwrap(),
990            SyncStatusQueryData {
991                blocks: ResourceSyncStatus {
992                    missing: 3,
993                    ranges: vec![SyncStatusRange {
994                        start: 0,
995                        end: 3,
996                        status: SyncStatus::Missing,
997                    }]
998                },
999                vid_common: ResourceSyncStatus {
1000                    missing: 3,
1001                    ranges: vec![SyncStatusRange {
1002                        start: 0,
1003                        end: 3,
1004                        status: SyncStatus::Missing,
1005                    }]
1006                },
1007                leaves: ResourceSyncStatus {
1008                    missing: 1,
1009                    ranges: vec![
1010                        SyncStatusRange {
1011                            start: 0,
1012                            end: 1,
1013                            status: SyncStatus::Present,
1014                        },
1015                        SyncStatusRange {
1016                            start: 1,
1017                            end: 2,
1018                            status: SyncStatus::Missing,
1019                        },
1020                        SyncStatusRange {
1021                            start: 2,
1022                            end: 3,
1023                            status: SyncStatus::Present,
1024                        }
1025                    ]
1026                },
1027                pruned_height: None,
1028            }
1029        );
1030
1031        // Insert VID common without a corresponding share.
1032        {
1033            let mut tx = ds.write().await.unwrap();
1034            tx.insert_vid(&vid_commons[0].clone(), None).await.unwrap();
1035            tx.commit().await.unwrap();
1036        }
1037        assert_eq!(
1038            ds.sync_status().await.unwrap(),
1039            SyncStatusQueryData {
1040                blocks: ResourceSyncStatus {
1041                    missing: 3,
1042                    ranges: vec![SyncStatusRange {
1043                        start: 0,
1044                        end: 3,
1045                        status: SyncStatus::Missing,
1046                    }]
1047                },
1048                vid_common: ResourceSyncStatus {
1049                    missing: 2,
1050                    ranges: vec![
1051                        SyncStatusRange {
1052                            start: 0,
1053                            end: 1,
1054                            status: SyncStatus::Present,
1055                        },
1056                        SyncStatusRange {
1057                            start: 1,
1058                            end: 3,
1059                            status: SyncStatus::Missing,
1060                        },
1061                    ]
1062                },
1063                leaves: ResourceSyncStatus {
1064                    missing: 1,
1065                    ranges: vec![
1066                        SyncStatusRange {
1067                            start: 0,
1068                            end: 1,
1069                            status: SyncStatus::Present,
1070                        },
1071                        SyncStatusRange {
1072                            start: 1,
1073                            end: 2,
1074                            status: SyncStatus::Missing,
1075                        },
1076                        SyncStatusRange {
1077                            start: 2,
1078                            end: 3,
1079                            status: SyncStatus::Present,
1080                        }
1081                    ]
1082                },
1083                pruned_height: None,
1084            }
1085        );
1086
1087        // Rectify the missing data.
1088        {
1089            let mut tx = ds.write().await.unwrap();
1090            tx.insert_block(&blocks[0]).await.unwrap();
1091            tx.insert_vid(&vid_commons[0], None).await.unwrap();
1092            tx.insert_leaf(&leaves[1]).await.unwrap();
1093            tx.insert_block(&blocks[1]).await.unwrap();
1094            tx.insert_vid(&vid_commons[1], None).await.unwrap();
1095            tx.insert_block(&blocks[2]).await.unwrap();
1096            tx.insert_vid(&vid_commons[2], None).await.unwrap();
1097            tx.commit().await.unwrap();
1098        }
1099
1100        // Some data sources (e.g. file system) don't support out-of-order insertion of missing
1101        // data. These would have just ignored the insertion of `leaves[1]`. Detect if this is the
1102        // case; then we allow 1 missing leaf.
1103        let leaves = if ds.get_leaf(1).await.try_resolve().is_err() {
1104            tracing::warn!(
1105                "data source does not support out-of-order filling, allowing one missing leaf"
1106            );
1107            ResourceSyncStatus {
1108                missing: 1,
1109                ranges: vec![
1110                    SyncStatusRange {
1111                        start: 0,
1112                        end: 1,
1113                        status: SyncStatus::Present,
1114                    },
1115                    SyncStatusRange {
1116                        start: 1,
1117                        end: 2,
1118                        status: SyncStatus::Missing,
1119                    },
1120                    SyncStatusRange {
1121                        start: 2,
1122                        end: 3,
1123                        status: SyncStatus::Present,
1124                    },
1125                ],
1126            }
1127        } else {
1128            ResourceSyncStatus {
1129                missing: 0,
1130                ranges: vec![SyncStatusRange {
1131                    start: 0,
1132                    end: 3,
1133                    status: SyncStatus::Present,
1134                }],
1135            }
1136        };
1137        let expected_sync_status = SyncStatusQueryData {
1138            leaves,
1139            blocks: ResourceSyncStatus {
1140                missing: 0,
1141                ranges: vec![SyncStatusRange {
1142                    start: 0,
1143                    end: 3,
1144                    status: SyncStatus::Present,
1145                }],
1146            },
1147            vid_common: ResourceSyncStatus {
1148                missing: 0,
1149                ranges: vec![SyncStatusRange {
1150                    start: 0,
1151                    end: 3,
1152                    status: SyncStatus::Present,
1153                }],
1154            },
1155            pruned_height: None,
1156        };
1157        assert_eq!(ds.sync_status().await.unwrap(), expected_sync_status);
1158    }
1159
1160    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1161    pub async fn test_counters<D: TestableDataSource>() {
1162        let storage = D::create(0).await;
1163        let ds = D::connect(&storage).await;
1164
1165        assert_eq!(ds.count_transactions().await.unwrap(), 0);
1166        assert_eq!(ds.payload_size().await.unwrap(), 0);
1167
1168        // Insert some transactions.
1169        let mut total_transactions = 0;
1170        let mut total_size = 0;
1171        'outer: for i in [0, 1, 2] {
1172            // Using `i % 2` as the transaction data ensures we insert a duplicate transaction
1173            // (since we insert more than 2 transactions total). The query service should still
1174            // count these as separate transactions and should include both duplicates when
1175            // computing the total size.
1176            let (payload, metadata) =
1177                <TestBlockPayload as BlockPayload<TestTypes>>::from_transactions(
1178                    [mock_transaction(vec![i as u8 % 2])],
1179                    &TestValidatedState::default(),
1180                    &TestInstanceState::default(),
1181                )
1182                .await
1183                .unwrap();
1184            let encoded = payload.encode();
1185            let payload_commitment =
1186                vid_commitment(&encoded, &metadata.encode(), 1, TEST_VERSIONS.test.base);
1187            let header = TestBlockHeader {
1188                block_number: i,
1189                payload_commitment,
1190                timestamp: i,
1191                timestamp_millis: i * 1_000,
1192                builder_commitment:
1193                    <TestBlockPayload as BlockPayload<TestTypes>>::builder_commitment(
1194                        &payload, &metadata,
1195                    ),
1196                metadata: TestMetadata {
1197                    num_transactions: 7, // arbitrary
1198                },
1199                random: 1, // arbitrary
1200                version: TEST_VERSIONS.test.base,
1201            };
1202
1203            let mut leaf = LeafQueryData::<MockTypes>::genesis(
1204                &TestValidatedState::default(),
1205                &TestInstanceState::default(),
1206                TEST_VERSIONS.test,
1207            )
1208            .await;
1209            *leaf.leaf.block_header_mut() = header.clone();
1210            let block = BlockQueryData::new(header, payload);
1211            ds.append(BlockInfo::new(leaf, Some(block.clone()), None, None))
1212                .await
1213                .unwrap();
1214            assert_eq!(
1215                NodeDataSource::<MockTypes>::block_height(&ds)
1216                    .await
1217                    .unwrap(),
1218                (i + 1) as usize,
1219            );
1220
1221            total_transactions += 1;
1222            total_size += encoded.len();
1223
1224            // Allow some time for the aggregator to update.
1225            for retry in 0..5 {
1226                let ds_transactions = ds.count_transactions().await.unwrap();
1227                let ds_payload_size = ds.payload_size().await.unwrap();
1228                if ds_transactions != total_transactions || ds_payload_size != total_size {
1229                    tracing::info!(
1230                        i,
1231                        retry,
1232                        total_transactions,
1233                        ds_transactions,
1234                        total_size,
1235                        ds_payload_size,
1236                        "waiting for statistics to update"
1237                    );
1238                    sleep(Duration::from_secs(1)).await;
1239                } else {
1240                    continue 'outer;
1241                }
1242            }
1243            panic!("counters did not update in time");
1244        }
1245    }
1246
1247    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1248    pub async fn test_vid_shares<D: TestableDataSource>()
1249    where
1250        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1251    {
1252        let mut network = MockNetwork::<D>::init().await;
1253        let ds = network.data_source();
1254
1255        network.start().await;
1256
1257        // Check VID shares for a few blocks.
1258        let mut leaves = ds.subscribe_leaves(0).await.take(3);
1259        while let Some(leaf) = leaves.next().await {
1260            tracing::info!("got leaf {}", leaf.height());
1261            let mut tx = ds.read().await.unwrap();
1262            let share = tx.vid_share(leaf.height() as usize).await.unwrap();
1263            assert_eq!(share, tx.vid_share(leaf.block_hash()).await.unwrap());
1264            assert_eq!(
1265                share,
1266                tx.vid_share(BlockId::PayloadHash(leaf.payload_hash()))
1267                    .await
1268                    .unwrap()
1269            );
1270        }
1271    }
1272
1273    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1274    pub async fn test_vid_monotonicity<D: TestableDataSource>()
1275    where
1276        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1277        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1278    {
1279        let storage = D::create(0).await;
1280        let ds = D::connect(&storage).await;
1281
1282        // Generate some test VID data.
1283        let mut vid = advz_scheme(2);
1284        let disperse = vid.disperse([]).unwrap();
1285
1286        // Insert test data with VID common and a share.
1287        let leaf = LeafQueryData::<MockTypes>::genesis(
1288            &TestValidatedState::default(),
1289            &TestInstanceState::default(),
1290            TEST_VERSIONS.test,
1291        )
1292        .await;
1293        let common = VidCommonQueryData::new(leaf.header().clone(), VidCommon::V0(disperse.common));
1294        ds.append(BlockInfo::new(
1295            leaf,
1296            None,
1297            Some(common.clone()),
1298            Some(VidShare::V0(disperse.shares[0].clone())),
1299        ))
1300        .await
1301        .unwrap();
1302
1303        {
1304            assert_eq!(ds.get_vid_common(0).await.await, common);
1305            assert_eq!(
1306                ds.vid_share(0).await.unwrap(),
1307                VidShare::V0(disperse.shares[0].clone())
1308            );
1309        }
1310
1311        // Re-insert the common data, without a share. This should not overwrite the share we
1312        // already have.
1313        {
1314            let mut tx = ds.write().await.unwrap();
1315            tx.insert_vid(&common, None).await.unwrap();
1316            tx.commit().await.unwrap();
1317        }
1318        {
1319            assert_eq!(ds.get_vid_common(0).await.await, common);
1320            assert_eq!(
1321                ds.vid_share(0).await.unwrap(),
1322                VidShare::V0(disperse.shares[0].clone())
1323            );
1324        }
1325    }
1326
1327    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1328    pub async fn test_vid_recovery<D: TestableDataSource>()
1329    where
1330        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1331    {
1332        let mut network = MockNetwork::<D>::init().await;
1333        let ds = network.data_source();
1334
1335        network.start().await;
1336
1337        // Submit a transaction so we can try to recover a non-empty block.
1338        let mut blocks = ds.subscribe_blocks(0).await;
1339        let txn = mock_transaction(vec![1, 2, 3]);
1340        network.submit_transaction(txn.clone()).await;
1341
1342        // Wait for the transaction to be finalized.
1343        let block = loop {
1344            tracing::info!("waiting for transaction");
1345            let block = blocks.next().await.unwrap();
1346            if !block.is_empty() {
1347                tracing::info!(height = block.height(), "transaction sequenced");
1348                break block;
1349            }
1350            tracing::info!(height = block.height(), "empty block");
1351        };
1352        let height = block.height() as usize;
1353        let commit = if let VidCommitment::V0(commit) = block.payload_hash() {
1354            commit
1355        } else {
1356            panic!("expect ADVZ commitment")
1357        };
1358
1359        // Set up a test VID scheme.
1360        let vid = advz_scheme(network.num_nodes());
1361
1362        // Get VID common data and verify it.
1363        tracing::info!("fetching common data");
1364        let common = ds.get_vid_common(height).await.await;
1365        let VidCommon::V0(common) = &common.common() else {
1366            panic!("expect ADVZ common");
1367        };
1368        ADVZScheme::is_consistent(&commit, common).unwrap();
1369
1370        // Collect shares from each node.
1371        tracing::info!("fetching shares");
1372        let network = &network;
1373        let vid = &vid;
1374        let shares: Vec<_> = join_all((0..network.num_nodes()).map(|i| async move {
1375            let ds = network.data_source_index(i);
1376
1377            // Wait until the node has processed up to the desired block; since we have thus far
1378            // only interacted with node 0, it is possible other nodes are slightly behind.
1379            let mut leaves = ds.subscribe_leaves(height).await;
1380            let leaf = leaves.next().await.unwrap();
1381            assert_eq!(leaf.height(), height as u64);
1382            assert_eq!(leaf.payload_hash(), VidCommitment::V0(commit));
1383
1384            let share = if let VidShare::V0(share) = ds.vid_share(height).await.unwrap() {
1385                share
1386            } else {
1387                panic!("expect ADVZ share")
1388            };
1389            vid.verify_share(&share, common, &commit).unwrap().unwrap();
1390            share
1391        }))
1392        .await;
1393
1394        // Recover payload.
1395        tracing::info!("recovering payload");
1396        let bytes = vid.recover_payload(&shares, common).unwrap();
1397        let recovered = <MockPayload as BlockPayload<TestTypes>>::from_bytes(
1398            &bytes,
1399            &TestMetadata {
1400                num_transactions: 7, // arbitrary
1401            },
1402        );
1403        assert_eq!(recovered, *block.payload());
1404        assert_eq!(recovered.transactions, vec![txn]);
1405    }
1406
1407    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1408    pub async fn test_timestamp_window<D: TestableDataSource>() {
1409        let mut network = MockNetwork::<D>::init().await;
1410        let ds = network.data_source();
1411
1412        network.start().await;
1413
1414        // Wait for blocks with at least three different timestamps to be sequenced. This lets us
1415        // test all the edge cases.
1416        let mut leaves = ds.subscribe_leaves(0).await;
1417        // `test_blocks` is a list of lists of headers with the same timestamp. The flattened list
1418        // of headers is contiguous.
1419        let mut test_blocks: Vec<Vec<Header<MockTypes>>> = vec![];
1420        while test_blocks.len() < 3 {
1421            // Wait for the next block to be sequenced.
1422            let leaf = leaves.next().await.unwrap();
1423            let header = leaf.header().clone();
1424            if let Some(last_timestamp) = test_blocks.last_mut() {
1425                if <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&last_timestamp[0])
1426                    == <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&header)
1427                {
1428                    last_timestamp.push(header);
1429                } else {
1430                    test_blocks.push(vec![header]);
1431                }
1432            } else {
1433                test_blocks.push(vec![header]);
1434            }
1435        }
1436        tracing::info!("blocks for testing: {test_blocks:#?}");
1437
1438        // Define invariants that every response should satisfy.
1439        let check_invariants =
1440            |res: &TimeWindowQueryData<Header<MockTypes>>, start, end, check_prev| {
1441                let mut prev = res.prev.as_ref();
1442                if let Some(prev) = prev {
1443                    if check_prev {
1444                        assert!(block_header_timestamp(prev) < start);
1445                    }
1446                } else {
1447                    // `prev` can only be `None` if the first block in the window is the genesis
1448                    // block.
1449                    assert_eq!(res.from().unwrap(), 0);
1450                };
1451                for header in &res.window {
1452                    assert!(start <= block_header_timestamp(header));
1453                    assert!(block_header_timestamp(header) < end);
1454                    if let Some(prev) = prev {
1455                        assert!(
1456                            <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(prev)
1457                                <= <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(header)
1458                        );
1459                    }
1460                    prev = Some(header);
1461                }
1462                if let Some(next) = &res.next {
1463                    assert!(<TestBlockHeader as BlockHeader<MockTypes>>::timestamp(next) >= end);
1464                    // If there is a `next`, there must be at least one previous block (either `prev`
1465                    // itself or the last block if the window is nonempty), so we can `unwrap` here.
1466                    assert!(block_header_timestamp(next) >= block_header_timestamp(prev.unwrap()));
1467                }
1468            };
1469
1470        let get_window = |start, end| {
1471            let ds = ds.clone();
1472            async move {
1473                let window = ds
1474                    .get_header_window(WindowStart::Time(start), end, i64::MAX as usize)
1475                    .await
1476                    .unwrap();
1477                tracing::info!("window for timestamp range {start}-{end}: {window:#?}");
1478                check_invariants(&window, start, end, true);
1479                window
1480            }
1481        };
1482
1483        // Case 0: happy path. All blocks are available, including prev and next.
1484        let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1485        let end = start + 1;
1486        let res = get_window(start, end).await;
1487        assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1488        assert_eq!(res.window, test_blocks[1]);
1489        assert_eq!(res.next.unwrap(), test_blocks[2][0]);
1490
1491        // Case 1: no `prev`, start of window is before genesis.
1492        let start = 0;
1493        let end = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[0][0]) + 1;
1494        let res = get_window(start, end).await;
1495        assert_eq!(res.prev, None);
1496        assert_eq!(res.window, test_blocks[0]);
1497        assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1498
1499        // Case 2: no `next`, end of window is after the most recently sequenced block.
1500        let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[2][0]);
1501        let end = i64::MAX as u64;
1502        let res = get_window(start, end).await;
1503        assert_eq!(res.prev.unwrap(), *test_blocks[1].last().unwrap());
1504        // There may have been more blocks sequenced since we grabbed `test_blocks`, so just check
1505        // that the prefix of the window is correct.
1506        assert_eq!(res.window[..test_blocks[2].len()], test_blocks[2]);
1507        assert_eq!(res.next, None);
1508        // Fetch more blocks using the `from` form of the endpoint. Start from the last block we had
1509        // previously (ie fetch a slightly overlapping window) to ensure there is at least one block
1510        // in the new window.
1511        let from = test_blocks.iter().flatten().count() - 1;
1512        let more = ds
1513            .get_header_window(WindowStart::Height(from as u64), end, i64::MAX as usize)
1514            .await
1515            .unwrap();
1516        check_invariants(&more, start, end, false);
1517        assert_eq!(
1518            more.prev.as_ref().unwrap(),
1519            test_blocks.iter().flatten().nth(from - 1).unwrap()
1520        );
1521        assert_eq!(
1522            more.window[..res.window.len() - test_blocks[2].len() + 1],
1523            res.window[test_blocks[2].len() - 1..]
1524        );
1525        assert_eq!(res.next, None);
1526        // We should get the same result whether we query by block height or hash.
1527        let more2 = ds
1528            .get_header_window(
1529                test_blocks[2].last().unwrap().commit(),
1530                end,
1531                i64::MAX as usize,
1532            )
1533            .await
1534            .unwrap();
1535        check_invariants(&more2, start, end, false);
1536        assert_eq!(more2.from().unwrap(), more.from().unwrap());
1537        assert_eq!(more2.prev, more.prev);
1538        assert_eq!(more2.next, more.next);
1539        assert_eq!(more2.window[..more.window.len()], more.window);
1540
1541        // Case 3: the window is empty.
1542        let start = <TestBlockHeader as BlockHeader<MockTypes>>::timestamp(&test_blocks[1][0]);
1543        let end = start;
1544        let res = get_window(start, end).await;
1545        assert_eq!(res.prev.unwrap(), *test_blocks[0].last().unwrap());
1546        assert_eq!(res.next.unwrap(), test_blocks[1][0]);
1547        assert_eq!(res.window, vec![]);
1548
1549        // Case 4: no relevant blocks are available yet.
1550        ds.get_header_window(
1551            WindowStart::Time((i64::MAX - 1) as u64),
1552            i64::MAX as u64,
1553            i64::MAX as usize,
1554        )
1555        .await
1556        .unwrap_err();
1557
1558        // Case 5: limits.
1559        let blocks = [test_blocks[0].clone(), test_blocks[1].clone()]
1560            .into_iter()
1561            .flatten()
1562            .collect::<Vec<_>>();
1563        // Make a query that would return everything, but gets limited.
1564        let start = block_header_timestamp(&blocks[0]);
1565        let end = block_header_timestamp(&test_blocks[2][0]);
1566        let res = ds
1567            .get_header_window(WindowStart::Time(start), end, 1)
1568            .await
1569            .unwrap();
1570        assert_eq!(res.prev, None);
1571        assert_eq!(res.window, [blocks[0].clone()]);
1572        assert_eq!(res.next, None);
1573        // Query the next page of results, get limited again.
1574        let res = ds
1575            .get_header_window(WindowStart::Height(blocks[0].height() + 1), end, 1)
1576            .await
1577            .unwrap();
1578        assert_eq!(res.window, [blocks[1].clone()]);
1579        assert_eq!(res.next, None);
1580        // Get the rest of the results.
1581        let res = ds
1582            .get_header_window(
1583                WindowStart::Height(blocks[1].height() + 1),
1584                end,
1585                blocks.len() - 1,
1586            )
1587            .await
1588            .unwrap();
1589        assert_eq!(res.window, blocks[2..].to_vec());
1590        assert_eq!(res.next, Some(test_blocks[2][0].clone()));
1591    }
1592
1593    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1594    pub async fn test_latest_qc_chain<D: TestableDataSource>()
1595    where
1596        for<'a> D::ReadOnly<'a>: NodeStorage<MockTypes>,
1597        for<'a> D::Transaction<'a>: UpdateAvailabilityStorage<MockTypes>,
1598    {
1599        let storage = D::create(0).await;
1600        let ds = D::connect(&storage).await;
1601
1602        {
1603            let mut tx = ds.read().await.unwrap();
1604            assert_eq!(tx.latest_qc_chain().await.unwrap(), None);
1605        }
1606
1607        async fn leaf_with_qc_chain(
1608            number: u64,
1609        ) -> (LeafQueryData<MockTypes>, [CertificatePair<MockTypes>; 2]) {
1610            let mut leaf = Leaf2::<MockTypes>::genesis(
1611                &Default::default(),
1612                &Default::default(),
1613                TEST_VERSIONS.test.base,
1614            )
1615            .await;
1616            leaf.block_header_mut().block_number = number;
1617
1618            let mut qc1 = QuorumCertificate2::<MockTypes>::genesis(
1619                &Default::default(),
1620                &Default::default(),
1621                TEST_VERSIONS.test,
1622            )
1623            .await;
1624            qc1.view_number = ViewNumber::new(1);
1625            qc1.data.leaf_commit = Committable::commit(&leaf);
1626
1627            let mut qc2 = qc1.clone();
1628            qc2.view_number += 1;
1629
1630            let leaf = LeafQueryData::new(leaf, qc1.clone()).unwrap();
1631            (
1632                leaf,
1633                [
1634                    CertificatePair::non_epoch_change(qc1),
1635                    CertificatePair::non_epoch_change(qc2),
1636                ],
1637            )
1638        }
1639
1640        // Insert a leaf with QC chain.
1641        {
1642            let (leaf, qcs) = leaf_with_qc_chain(2).await;
1643            let mut tx = ds.write().await.unwrap();
1644            tx.insert_leaf_with_qc_chain(&leaf, Some(qcs.clone()))
1645                .await
1646                .unwrap();
1647            tx.commit().await.unwrap();
1648
1649            assert_eq!(
1650                ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1651                Some(qcs)
1652            );
1653        }
1654
1655        // Insert a later leaf without a QC chain. This should clear the previously saved QC chain,
1656        // which is no longer up to date.
1657        {
1658            let (leaf, _) = leaf_with_qc_chain(3).await;
1659            let mut tx = ds.write().await.unwrap();
1660            tx.insert_leaf_with_qc_chain(&leaf, None).await.unwrap();
1661            tx.commit().await.unwrap();
1662
1663            assert_eq!(
1664                ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1665                None
1666            );
1667        }
1668
1669        // Insert an earlier leaf with a QC chain. This should not be saved since it is not the
1670        // latest leaf.
1671        {
1672            let (leaf, qcs) = leaf_with_qc_chain(1).await;
1673            let mut tx = ds.write().await.unwrap();
1674            tx.insert_leaf_with_qc_chain(&leaf, Some(qcs))
1675                .await
1676                .unwrap();
1677            tx.commit().await.unwrap();
1678
1679            assert_eq!(
1680                ds.read().await.unwrap().latest_qc_chain().await.unwrap(),
1681                None
1682            );
1683        }
1684    }
1685}
1686
1687/// Generic tests we can instantiate for all the status data sources.
1688#[cfg(any(test, feature = "testing"))]
1689#[espresso_macros::generic_tests]
1690pub mod status_tests {
1691    use std::time::Duration;
1692
1693    use crate::{
1694        status::StatusDataSource,
1695        testing::{
1696            consensus::{DataSourceLifeCycle, MockNetwork},
1697            mocks::mock_transaction,
1698            sleep,
1699        },
1700    };
1701
1702    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1703    pub async fn test_metrics<D: DataSourceLifeCycle + StatusDataSource>() {
1704        let mut network = MockNetwork::<D>::init().await;
1705        let ds = network.data_source();
1706
1707        {
1708            // Check that block height is initially zero.
1709            assert_eq!(ds.block_height().await.unwrap(), 0);
1710            // With consensus paused, check that the success rate returns NAN (since the block
1711            // height, the numerator, is 0, and the view number, the denominator, is 0).
1712            assert!(ds.success_rate().await.unwrap().is_nan());
1713            // Since there is no block produced, "last_decided_time" metric is 0.
1714            // Therefore, the elapsed time since the last block should be close to the time elapsed since the Unix epoch.
1715            assert!(
1716                (ds.elapsed_time_since_last_decide().await.unwrap() as i64
1717                    - chrono::Utc::now().timestamp())
1718                .abs()
1719                    <= 1,
1720                "time elapsed since last_decided_time is not within 1s"
1721            );
1722        }
1723
1724        // Submit a transaction
1725        let txn = mock_transaction(vec![1, 2, 3]);
1726        network.submit_transaction(txn.clone()).await;
1727
1728        // Start consensus and wait for the transaction to be finalized.
1729        network.start().await;
1730
1731        // Now wait for at least one non-genesis block to be finalized.
1732        loop {
1733            let height = ds.block_height().await.unwrap();
1734            if height > 1 {
1735                break;
1736            }
1737            tracing::info!(height, "waiting for a block to be finalized");
1738            sleep(Duration::from_secs(1)).await;
1739        }
1740
1741        {
1742            // Check that the success rate has been updated. Note that we can only check if success
1743            // rate is positive. We don't know exactly what it is because we can't know how many
1744            // views have elapsed without race conditions.
1745            let success_rate = ds.success_rate().await.unwrap();
1746            assert!(success_rate.is_finite(), "{success_rate}");
1747            assert!(success_rate > 0.0, "{success_rate}");
1748        }
1749
1750        {
1751            // Shutting down the consensus to halt block production
1752            // Introducing a delay of 3 seconds to ensure that elapsed time since last block is atleast 3seconds
1753            network.shut_down().await;
1754            sleep(Duration::from_secs(3)).await;
1755            // Asserting that the elapsed time since the last block is at least 3 seconds
1756            assert!(ds.elapsed_time_since_last_decide().await.unwrap() >= 3);
1757        }
1758    }
1759}
1760
1761#[macro_export]
1762macro_rules! instantiate_data_source_tests {
1763    ($t:ty) => {
1764        use $crate::data_source::{
1765            availability_tests, node_tests, persistence_tests, status_tests,
1766        };
1767
1768        instantiate_availability_tests!($t);
1769        instantiate_persistence_tests!($t);
1770        instantiate_node_tests!($t);
1771        instantiate_status_tests!($t);
1772    };
1773}