hotshot_query_service/testing/
consensus.rs1use std::{fmt::Display, num::NonZeroUsize, sync::Arc, time::Duration};
14
15use alloy::primitives::U256;
16use async_trait::async_trait;
17use futures::{
18 future::{Future, join_all},
19 stream::StreamExt,
20};
21use hotshot::{
22 HotShotInitializer, SystemContext,
23 traits::implementations::{MasterMap, MemoryNetwork},
24 types::{Event, SystemContextHandle},
25};
26use hotshot_example_types::{
27 membership::TestableMembership, state_types::TestInstanceState, storage_types::TestStorage,
28};
29use hotshot_testing::block_builder::{SimpleBuilderImplementation, TestBuilderImplementation};
30use hotshot_types::{
31 HotShotConfig, PeerConfig,
32 consensus::ConsensusMetricsValue,
33 data::EpochNumber,
34 drb::INITIAL_DRB_RESULT,
35 epoch_membership::EpochMembershipCoordinator,
36 light_client::StateKeyPair,
37 signature_key::BLSPubKey,
38 storage_metrics::StorageMetricsValue,
39 traits::{election::Membership, network::Topic, signature_key::SignatureKey as _},
40};
41use test_utils::reserve_tcp_port;
42use tokio::{
43 runtime::Handle,
44 task::{block_in_place, yield_now},
45};
46use tracing::{Instrument, info_span};
47use url::Url;
48use versions::{MIN_SUPPORTED_VERSION, Upgrade};
49
50use super::mocks::{MockMembership, MockNodeImpl, MockTransaction, MockTypes};
51use crate::{
52 SignatureKey,
53 availability::{AvailabilityDataSource, UpdateAvailabilityData},
54 data_source::{FileSystemDataSource, SqlDataSource, VersionedDataSource, fetching::Builder},
55 fetching::provider::NoFetching,
56 node::NodeDataSource,
57 status::{StatusDataSource, UpdateStatusData},
58 task::BackgroundTask,
59};
60
61struct MockNode<D: DataSourceLifeCycle> {
62 hotshot: SystemContextHandle<MockTypes, MockNodeImpl>,
63 data_source: D,
64 storage: D::Storage,
65}
66
67pub struct MockNetwork<D: DataSourceLifeCycle> {
68 tasks: Vec<BackgroundTask>,
69 nodes: Vec<MockNode<D>>,
70 pub_keys: Vec<BLSPubKey>,
71}
72
73pub type MockDataSource = FileSystemDataSource<MockTypes, NoFetching>;
76pub type MockSqlDataSource = SqlDataSource<MockTypes, NoFetching>;
77
78pub const NUM_NODES: usize = 2;
79const EPOCH_HEIGHT: u64 = 10;
80const DIFFICULTY_LEVEL: u64 = 10;
81
82impl<D: DataSourceLifeCycle + UpdateStatusData> MockNetwork<D> {
83 pub async fn init() -> Self {
84 Self::init_with_config(|_| {}, false).await
85 }
86
87 pub async fn init_with_leaf_ds() -> Self {
88 Self::init_with_config(|_| {}, true).await
89 }
90
91 pub async fn init_with_config(
92 update_config: impl FnOnce(&mut HotShotConfig<MockTypes>),
93 leaf_only: bool,
94 ) -> Self {
95 let (pub_keys, priv_keys): (Vec<_>, Vec<_>) = (0..NUM_NODES)
96 .map(|i| BLSPubKey::generated_from_seed_indexed([0; 32], i as u64))
97 .unzip();
98 let num_staked_nodes = NonZeroUsize::new(pub_keys.len()).unwrap();
99 let state_key_pairs = (0..num_staked_nodes.into())
100 .map(|i| StateKeyPair::generate_from_seed_indexed([0; 32], i as u64))
101 .collect::<Vec<_>>();
102 let master_map = MasterMap::new();
103 let stake = 1u64;
104 let known_nodes_with_stake = (0..num_staked_nodes.into())
105 .map(|id| PeerConfig {
106 stake_table_entry: pub_keys[id].stake_table_entry(U256::from(stake)),
107 state_ver_key: state_key_pairs[id].ver_key(),
108 connect_info: None,
109 })
110 .collect::<Vec<_>>();
111
112 let builder_port = reserve_tcp_port().expect("OS should have ephemeral ports available");
114
115 let builder_url =
117 Url::parse(&format!("http://0.0.0.0:{builder_port}")).expect("Failed to parse URL");
118
119 let builder_task =
121 <SimpleBuilderImplementation as TestBuilderImplementation<MockTypes>>::start(
122 NUM_NODES,
123 builder_url.clone(),
124 (),
125 Default::default(),
126 )
127 .await;
128
129 let mut config = HotShotConfig {
130 builder_urls: vec1::vec1![builder_url.clone()],
131 fixed_leader_for_gpuvid: 0,
132 num_nodes_with_stake: num_staked_nodes,
133 known_nodes_with_stake: known_nodes_with_stake.clone(),
134 next_view_timeout: 10000,
135 num_bootstrap: 0,
136 da_staked_committee_size: pub_keys.len(),
137 known_da_nodes: known_nodes_with_stake.clone(),
138 da_committees: Default::default(),
139 data_request_delay: Duration::from_millis(200),
140 view_sync_timeout: Duration::from_millis(250),
141 start_threshold: (
142 known_nodes_with_stake.len() as u64,
143 known_nodes_with_stake.len() as u64,
144 ),
145 builder_timeout: Duration::from_secs(1),
146 start_proposing_view: 0,
147 stop_proposing_view: 0,
148 start_voting_view: 0,
149 stop_voting_view: 0,
150 start_proposing_time: 0,
151 stop_proposing_time: 0,
152 start_voting_time: 0,
153 stop_voting_time: 0,
154 epoch_height: EPOCH_HEIGHT,
155 epoch_start_block: 0,
156 stake_table_capacity: hotshot_types::light_client::DEFAULT_STAKE_TABLE_CAPACITY,
157 drb_difficulty: DIFFICULTY_LEVEL,
158 drb_upgrade_difficulty: DIFFICULTY_LEVEL,
159 };
160 update_config(&mut config);
161 let upgrade = Upgrade::trivial(MIN_SUPPORTED_VERSION);
162
163 let nodes = join_all(
164 priv_keys
165 .into_iter()
166 .enumerate()
167 .map(|(node_id, priv_key)| {
168 let config = config.clone();
169
170 let pub_keys = pub_keys.clone();
171 let master_map = master_map.clone();
172 let state_priv_keys = state_key_pairs
173 .iter()
174 .map(|kp| kp.sign_key())
175 .collect::<Vec<_>>();
176
177 let span = info_span!("initialize node", node_id);
178 let known_nodes_with_stake_clone = known_nodes_with_stake.clone();
179 async move {
180 let storage = D::create(node_id).await;
181 let data_source = if leaf_only {
182 D::leaf_only_ds(&storage).await
183 } else {
184 D::connect(&storage).await
185 };
186
187 let network = Arc::new(MemoryNetwork::new(
188 &pub_keys[node_id],
189 &master_map.clone(),
190 &[Topic::Global, Topic::Da],
191 None,
192 ));
193 let hs_storage: TestStorage<MockTypes> = TestStorage::default();
194
195 let membership = MockMembership::new(
196 known_nodes_with_stake_clone.clone(),
197 known_nodes_with_stake_clone,
198 pub_keys[node_id],
199 config.epoch_height,
200 );
201
202 membership.set_first_epoch(EpochNumber::new(0), INITIAL_DRB_RESULT);
203 let memberships = EpochMembershipCoordinator::new(
204 membership,
205 config.epoch_height,
206 &hs_storage.clone(),
207 );
208
209 let init = HotShotInitializer::from_genesis(
210 TestInstanceState::default(),
211 0,
212 0,
213 vec![],
214 upgrade,
215 )
216 .await
217 .unwrap();
218
219 let hotshot = SystemContext::init(
220 pub_keys[node_id],
221 priv_key,
222 state_priv_keys[node_id].clone(),
223 node_id as u64,
224 config,
225 upgrade,
226 memberships,
227 network,
228 init,
229 ConsensusMetricsValue::new(&*data_source.populate_metrics()),
230 hs_storage,
231 StorageMetricsValue::new(&*data_source.populate_metrics()),
232 )
233 .await
234 .unwrap()
235 .0;
236
237 MockNode {
238 hotshot,
239 data_source,
240 storage,
241 }
242 }
243 .instrument(span)
244 }),
245 )
246 .await;
247
248 builder_task.start(Box::new(nodes[0].hotshot.event_stream()));
251
252 let mut network = Self {
253 nodes,
254 pub_keys,
255 tasks: Default::default(),
256 };
257 D::setup(&mut network).await;
258 network
259 }
260}
261
262impl<D: DataSourceLifeCycle> MockNetwork<D> {
263 pub fn handle(&self) -> &SystemContextHandle<MockTypes, MockNodeImpl> {
264 &self.nodes[0].hotshot
265 }
266
267 pub async fn submit_transaction(&self, tx: MockTransaction) {
268 self.handle().submit_transaction(tx).await.unwrap();
269 }
270
271 pub fn num_nodes(&self) -> usize {
272 self.pub_keys.len()
273 }
274
275 pub fn proposer(&self, i: usize) -> SignatureKey<MockTypes> {
276 self.pub_keys[i]
277 }
278
279 pub fn data_source_index(&self, i: usize) -> D {
280 self.nodes[i].data_source.clone()
281 }
282
283 pub fn data_source(&self) -> D {
284 self.data_source_index(0)
285 }
286
287 pub fn storage(&self) -> &D::Storage {
288 &self.nodes[0].storage
289 }
290
291 pub fn spawn(&mut self, name: impl Display, task: impl Future + Send + 'static) {
292 self.tasks.push(BackgroundTask::spawn(name, task));
293 }
294
295 pub async fn shut_down(mut self) {
296 self.shut_down_impl().await
297 }
298
299 async fn shut_down_impl(&mut self) {
300 for node in &mut self.nodes {
301 node.hotshot.shut_down().await;
302 }
303 }
304
305 pub fn epoch_height(&self) -> u64 {
306 EPOCH_HEIGHT
307 }
308}
309
310impl<D: DataSourceLifeCycle> MockNetwork<D> {
311 pub async fn start(&mut self) {
312 for (i, node) in self.nodes.iter_mut().enumerate() {
314 let ds = node.data_source.clone();
315 let mut events = node.hotshot.event_stream();
316 self.tasks.push(BackgroundTask::spawn(
317 format!("update node {i}"),
318 async move {
319 while let Some(event) = events.next().await {
320 tracing::info!(node = i, event = ?event.event, "EVENT");
321 {
322 ds.handle_event(&event).await;
323 }
324 yield_now().await;
325 }
326 },
327 ));
328 }
329
330 join_all(
331 self.nodes
332 .iter()
333 .map(|node| node.hotshot.hotshot.start_consensus()),
334 )
335 .await;
336 }
337}
338
339impl<D: DataSourceLifeCycle> Drop for MockNetwork<D> {
340 fn drop(&mut self) {
341 if let Ok(handle) = Handle::try_current() {
342 block_in_place(move || handle.block_on(self.shut_down_impl()));
343 }
344 }
345}
346
347#[async_trait]
348pub trait DataSourceLifeCycle: Clone + Send + Sync + Sized + 'static {
349 type Storage: Send + Sync;
354
355 type S;
357
358 type P;
360
361 async fn create(node_id: usize) -> Self::Storage;
362 async fn build(
363 storage: &Self::Storage,
364 opt: impl Send
365 + FnOnce(Builder<MockTypes, Self::S, Self::P>) -> Builder<MockTypes, Self::S, Self::P>,
366 ) -> Self;
367 async fn reset(storage: &Self::Storage) -> Self;
368 async fn handle_event(&self, event: &Event<MockTypes>);
369
370 async fn connect(storage: &Self::Storage) -> Self {
371 Self::build(storage, |builder| builder).await
372 }
373
374 async fn leaf_only_ds(storage: &Self::Storage) -> Self {
375 Self::build(storage, |builder| builder.leaf_only()).await
376 }
377
378 async fn setup(_network: &mut MockNetwork<Self>) {}
380}
381
382pub trait TestableDataSource:
383 DataSourceLifeCycle
384 + AvailabilityDataSource<MockTypes>
385 + UpdateAvailabilityData<MockTypes>
386 + NodeDataSource<MockTypes>
387 + StatusDataSource
388 + VersionedDataSource
389{
390}
391
392impl<T> TestableDataSource for T where
393 T: DataSourceLifeCycle
394 + AvailabilityDataSource<MockTypes>
395 + UpdateAvailabilityData<MockTypes>
396 + NodeDataSource<MockTypes>
397 + StatusDataSource
398 + VersionedDataSource
399{
400}