Skip to main content

hotshot_query_service/testing/
consensus.rs

1// Copyright (c) 2022 Espresso Systems (espressosys.com)
2// This file is part of the HotShot Query Service library.
3//
4// This program is free software: you can redistribute it and/or modify it under the terms of the GNU
5// General Public License as published by the Free Software Foundation, either version 3 of the
6// License, or (at your option) any later version.
7// This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
8// even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9// General Public License for more details.
10// You should have received a copy of the GNU General Public License along with this program. If not,
11// see <https://www.gnu.org/licenses/>.
12
13use std::{fmt::Display, num::NonZeroUsize, sync::Arc, time::Duration};
14
15use alloy::primitives::U256;
16use async_trait::async_trait;
17use futures::{
18    future::{Future, join_all},
19    stream::StreamExt,
20};
21use hotshot::{
22    HotShotInitializer, SystemContext,
23    traits::implementations::{MasterMap, MemoryNetwork},
24    types::{Event, SystemContextHandle},
25};
26use hotshot_example_types::{
27    membership::TestableMembership, state_types::TestInstanceState, storage_types::TestStorage,
28};
29use hotshot_testing::block_builder::{SimpleBuilderImplementation, TestBuilderImplementation};
30use hotshot_types::{
31    HotShotConfig, PeerConfig,
32    consensus::ConsensusMetricsValue,
33    data::EpochNumber,
34    drb::INITIAL_DRB_RESULT,
35    epoch_membership::EpochMembershipCoordinator,
36    light_client::StateKeyPair,
37    signature_key::BLSPubKey,
38    storage_metrics::StorageMetricsValue,
39    traits::{election::Membership, network::Topic, signature_key::SignatureKey as _},
40};
41use test_utils::reserve_tcp_port;
42use tokio::{
43    runtime::Handle,
44    task::{block_in_place, yield_now},
45};
46use tracing::{Instrument, info_span};
47use url::Url;
48use versions::{MIN_SUPPORTED_VERSION, Upgrade};
49
50use super::mocks::{MockMembership, MockNodeImpl, MockTransaction, MockTypes};
51use crate::{
52    SignatureKey,
53    availability::{AvailabilityDataSource, UpdateAvailabilityData},
54    data_source::{FileSystemDataSource, SqlDataSource, VersionedDataSource, fetching::Builder},
55    fetching::provider::NoFetching,
56    node::NodeDataSource,
57    status::{StatusDataSource, UpdateStatusData},
58    task::BackgroundTask,
59};
60
61struct MockNode<D: DataSourceLifeCycle> {
62    hotshot: SystemContextHandle<MockTypes, MockNodeImpl>,
63    data_source: D,
64    storage: D::Storage,
65}
66
67pub struct MockNetwork<D: DataSourceLifeCycle> {
68    tasks: Vec<BackgroundTask>,
69    nodes: Vec<MockNode<D>>,
70    pub_keys: Vec<BLSPubKey>,
71}
72
73// MockNetwork can be used with any DataSourceLifeCycle, but it's nice to have a default with a
74// convenient type alias.
75pub type MockDataSource = FileSystemDataSource<MockTypes, NoFetching>;
76pub type MockSqlDataSource = SqlDataSource<MockTypes, NoFetching>;
77
78pub const NUM_NODES: usize = 2;
79const EPOCH_HEIGHT: u64 = 10;
80const DIFFICULTY_LEVEL: u64 = 10;
81
82impl<D: DataSourceLifeCycle + UpdateStatusData> MockNetwork<D> {
83    pub async fn init() -> Self {
84        Self::init_with_config(|_| {}, false).await
85    }
86
87    pub async fn init_with_leaf_ds() -> Self {
88        Self::init_with_config(|_| {}, true).await
89    }
90
91    pub async fn init_with_config(
92        update_config: impl FnOnce(&mut HotShotConfig<MockTypes>),
93        leaf_only: bool,
94    ) -> Self {
95        let (pub_keys, priv_keys): (Vec<_>, Vec<_>) = (0..NUM_NODES)
96            .map(|i| BLSPubKey::generated_from_seed_indexed([0; 32], i as u64))
97            .unzip();
98        let num_staked_nodes = NonZeroUsize::new(pub_keys.len()).unwrap();
99        let state_key_pairs = (0..num_staked_nodes.into())
100            .map(|i| StateKeyPair::generate_from_seed_indexed([0; 32], i as u64))
101            .collect::<Vec<_>>();
102        let master_map = MasterMap::new();
103        let stake = 1u64;
104        let known_nodes_with_stake = (0..num_staked_nodes.into())
105            .map(|id| PeerConfig {
106                stake_table_entry: pub_keys[id].stake_table_entry(U256::from(stake)),
107                state_ver_key: state_key_pairs[id].ver_key(),
108                connect_info: None,
109            })
110            .collect::<Vec<_>>();
111
112        // Pick a random, unused port for the builder server
113        let builder_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
114
115        // Create the bind URL from the random port
116        let builder_url =
117            Url::parse(&format!("http://0.0.0.0:{builder_port}")).expect("Failed to parse URL");
118
119        // Start the builder server
120        let builder_task =
121            <SimpleBuilderImplementation as TestBuilderImplementation<MockTypes>>::start(
122                NUM_NODES,
123                builder_url.clone(),
124                (),
125                Default::default(),
126            )
127            .await;
128
129        let mut config = HotShotConfig {
130            builder_urls: vec1::vec1![builder_url.clone()],
131            fixed_leader_for_gpuvid: 0,
132            num_nodes_with_stake: num_staked_nodes,
133            known_nodes_with_stake: known_nodes_with_stake.clone(),
134            next_view_timeout: 10000,
135            num_bootstrap: 0,
136            da_staked_committee_size: pub_keys.len(),
137            known_da_nodes: known_nodes_with_stake.clone(),
138            da_committees: Default::default(),
139            data_request_delay: Duration::from_millis(200),
140            view_sync_timeout: Duration::from_millis(250),
141            start_threshold: (
142                known_nodes_with_stake.len() as u64,
143                known_nodes_with_stake.len() as u64,
144            ),
145            builder_timeout: Duration::from_secs(1),
146            start_proposing_view: 0,
147            stop_proposing_view: 0,
148            start_voting_view: 0,
149            stop_voting_view: 0,
150            start_proposing_time: 0,
151            stop_proposing_time: 0,
152            start_voting_time: 0,
153            stop_voting_time: 0,
154            epoch_height: EPOCH_HEIGHT,
155            epoch_start_block: 0,
156            stake_table_capacity: hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY,
157            drb_difficulty: DIFFICULTY_LEVEL,
158            drb_upgrade_difficulty: DIFFICULTY_LEVEL,
159        };
160        update_config(&mut config);
161        let upgrade = Upgrade::trivial(MIN_SUPPORTED_VERSION);
162
163        let nodes = join_all(
164            priv_keys
165                .into_iter()
166                .enumerate()
167                .map(|(node_id, priv_key)| {
168                    let config = config.clone();
169
170                    let pub_keys = pub_keys.clone();
171                    let master_map = master_map.clone();
172                    let state_priv_keys = state_key_pairs
173                        .iter()
174                        .map(|kp| kp.sign_key())
175                        .collect::<Vec<_>>();
176
177                    let span = info_span!("initialize node", node_id);
178                    let known_nodes_with_stake_clone = known_nodes_with_stake.clone();
179                    async move {
180                        let storage = D::create(node_id).await;
181                        let data_source = if leaf_only {
182                            D::leaf_only_ds(&storage).await
183                        } else {
184                            D::connect(&storage).await
185                        };
186
187                        let network = Arc::new(MemoryNetwork::new(
188                            &pub_keys[node_id],
189                            &master_map.clone(),
190                            &[Topic::Global, Topic::Da],
191                            None,
192                        ));
193                        let hs_storage: TestStorage<MockTypes> = TestStorage::default();
194
195                        let membership = MockMembership::new(
196                            known_nodes_with_stake_clone.clone(),
197                            known_nodes_with_stake_clone,
198                            pub_keys[node_id],
199                            config.epoch_height,
200                        );
201
202                        membership.set_first_epoch(EpochNumber::new(0), INITIAL_DRB_RESULT);
203                        let memberships = EpochMembershipCoordinator::new(
204                            membership,
205                            config.epoch_height,
206                            &hs_storage.clone(),
207                        );
208
209                        let init = HotShotInitializer::from_genesis(
210                            TestInstanceState::default(),
211                            0,
212                            0,
213                            vec![],
214                            upgrade,
215                        )
216                        .await
217                        .unwrap();
218
219                        let hotshot = SystemContext::init(
220                            pub_keys[node_id],
221                            priv_key,
222                            state_priv_keys[node_id].clone(),
223                            node_id as u64,
224                            config,
225                            upgrade,
226                            memberships,
227                            network,
228                            init,
229                            ConsensusMetricsValue::new(&*data_source.populate_metrics()),
230                            hs_storage,
231                            StorageMetricsValue::new(&*data_source.populate_metrics()),
232                        )
233                        .await
234                        .unwrap()
235                        .0;
236
237                        MockNode {
238                            hotshot,
239                            data_source,
240                            storage,
241                        }
242                    }
243                    .instrument(span)
244                }),
245        )
246        .await;
247
248        // Hook the builder up to the event stream from the first node
249
250        builder_task.start(Box::new(nodes[0].hotshot.event_stream()));
251
252        let mut network = Self {
253            nodes,
254            pub_keys,
255            tasks: Default::default(),
256        };
257        D::setup(&mut network).await;
258        network
259    }
260}
261
262impl<D: DataSourceLifeCycle> MockNetwork<D> {
263    pub fn handle(&self) -> &SystemContextHandle<MockTypes, MockNodeImpl> {
264        &self.nodes[0].hotshot
265    }
266
267    pub async fn submit_transaction(&self, tx: MockTransaction) {
268        self.handle().submit_transaction(tx).await.unwrap();
269    }
270
271    pub fn num_nodes(&self) -> usize {
272        self.pub_keys.len()
273    }
274
275    pub fn proposer(&self, i: usize) -> SignatureKey<MockTypes> {
276        self.pub_keys[i]
277    }
278
279    pub fn data_source_index(&self, i: usize) -> D {
280        self.nodes[i].data_source.clone()
281    }
282
283    pub fn data_source(&self) -> D {
284        self.data_source_index(0)
285    }
286
287    pub fn storage(&self) -> &D::Storage {
288        &self.nodes[0].storage
289    }
290
291    pub fn spawn(&mut self, name: impl Display, task: impl Future + Send + 'static) {
292        self.tasks.push(BackgroundTask::spawn(name, task));
293    }
294
295    pub async fn shut_down(mut self) {
296        self.shut_down_impl().await
297    }
298
299    async fn shut_down_impl(&mut self) {
300        for node in &mut self.nodes {
301            node.hotshot.shut_down().await;
302        }
303    }
304
305    pub fn epoch_height(&self) -> u64 {
306        EPOCH_HEIGHT
307    }
308}
309
310impl<D: DataSourceLifeCycle> MockNetwork<D> {
311    pub async fn start(&mut self) {
312        // Spawn the update tasks.
313        for (i, node) in self.nodes.iter_mut().enumerate() {
314            let ds = node.data_source.clone();
315            let mut events = node.hotshot.event_stream();
316            self.tasks.push(BackgroundTask::spawn(
317                format!("update node {i}"),
318                async move {
319                    while let Some(event) = events.next().await {
320                        tracing::info!(node = i, event = ?event.event, "EVENT");
321                        {
322                            ds.handle_event(&event).await;
323                        }
324                        yield_now().await;
325                    }
326                },
327            ));
328        }
329
330        join_all(
331            self.nodes
332                .iter()
333                .map(|node| node.hotshot.hotshot.start_consensus()),
334        )
335        .await;
336    }
337}
338
339impl<D: DataSourceLifeCycle> Drop for MockNetwork<D> {
340    fn drop(&mut self) {
341        if let Ok(handle) = Handle::try_current() {
342            block_in_place(move || handle.block_on(self.shut_down_impl()));
343        }
344    }
345}
346
347#[async_trait]
348pub trait DataSourceLifeCycle: Clone + Send + Sync + Sized + 'static {
349    /// Backing storage for the data source.
350    ///
351    /// This can be used to connect to data sources to the same underlying data. It must be kept
352    /// alive as long as the related data sources are open.
353    type Storage: Send + Sync;
354
355    /// Type parameter for builder.
356    type S;
357
358    /// Type parameter for builder.
359    type P;
360
361    async fn create(node_id: usize) -> Self::Storage;
362    async fn build(
363        storage: &Self::Storage,
364        opt: impl Send
365        + FnOnce(Builder<MockTypes, Self::S, Self::P>) -> Builder<MockTypes, Self::S, Self::P>,
366    ) -> Self;
367    async fn reset(storage: &Self::Storage) -> Self;
368    async fn handle_event(&self, event: &Event<MockTypes>);
369
370    async fn connect(storage: &Self::Storage) -> Self {
371        Self::build(storage, |builder| builder).await
372    }
373
374    async fn leaf_only_ds(storage: &Self::Storage) -> Self {
375        Self::build(storage, |builder| builder.leaf_only()).await
376    }
377
378    /// Setup runs after setting up the network but before starting a test.
379    async fn setup(_network: &mut MockNetwork<Self>) {}
380}
381
382pub trait TestableDataSource:
383    DataSourceLifeCycle
384    + AvailabilityDataSource<MockTypes>
385    + UpdateAvailabilityData<MockTypes>
386    + NodeDataSource<MockTypes>
387    + StatusDataSource
388    + VersionedDataSource
389{
390}
391
392impl<T> TestableDataSource for T where
393    T: DataSourceLifeCycle
394        + AvailabilityDataSource<MockTypes>
395        + UpdateAvailabilityData<MockTypes>
396        + NodeDataSource<MockTypes>
397        + StatusDataSource
398        + VersionedDataSource
399{
400}