1#![allow(clippy::panic)]
8use std::{
9 collections::{BTreeMap, HashMap, HashSet},
10 marker::PhantomData,
11 sync::Arc,
12};
13
14use async_broadcast::{Receiver, Sender, broadcast};
15use async_lock::RwLock;
16use futures::future::join_all;
17use hotshot::{
18 HotShotInitializer, InitializerEpochInfo, SystemContext,
19 traits::TestableNodeImplementation,
20 types::{Event, SystemContextHandle},
21};
22use hotshot_example_types::{
23 block_types::TestBlockHeader,
24 membership::TestableMembership,
25 state_types::{TestInstanceState, TestValidatedState},
26 storage_types::TestStorage,
27};
28use hotshot_task_impls::events::HotShotEvent;
29use hotshot_types::{
30 HotShotConfig, ValidatorConfig,
31 consensus::ConsensusMetricsValue,
32 constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
33 data::{EpochNumber, Leaf2, ViewNumber},
34 drb::INITIAL_DRB_RESULT,
35 epoch_membership::EpochMembershipCoordinator,
36 simple_certificate::QuorumCertificate2,
37 storage_metrics::StorageMetricsValue,
38 traits::{
39 leaf_fetcher_network::ConnectedNetworkLeafFetcher,
40 network::ConnectedNetwork,
41 node_implementation::{NodeImplementation, NodeType},
42 },
43};
44use tide_disco::Url;
45#[allow(deprecated)]
46use tracing::info;
47
48use super::{
49 completion_task::CompletionTask, consistency_task::ConsistencyTask, txn_task::TxnTask,
50};
51use crate::{
52 block_builder::{BuilderTask, TestBuilderImplementation},
53 completion_task::CompletionTaskDescription,
54 spinning_task::{ChangeNode, NodeAction, SpinningTask},
55 test_builder::create_test_handle,
56 test_launcher::{Network, TestLauncher},
57 test_task::{TestResult, TestTask, spawn_timeout_task},
58 txn_task::TxnTaskDescription,
59 view_sync_task::ViewSyncTask,
60};
61
62pub trait TaskErr: std::error::Error + Sync + Send + 'static {}
63
64impl<T: std::error::Error + Sync + Send + 'static> TaskErr for T {}
65
66impl<
67 TYPES: NodeType<
68 InstanceState = TestInstanceState,
69 ValidatedState = TestValidatedState,
70 BlockHeader = TestBlockHeader,
71 >,
72 I: TestableNodeImplementation<TYPES>,
73 N: ConnectedNetwork<TYPES::SignatureKey>,
74> TestRunner<TYPES, I, N>
75where
76 I: TestableNodeImplementation<TYPES>,
77 I: NodeImplementation<TYPES, Network = N, Storage = TestStorage<TYPES>>,
78 <TYPES as NodeType>::Membership: TestableMembership<TYPES>,
79{
80 #[allow(clippy::too_many_lines)]
85 pub async fn run_test<B: TestBuilderImplementation<TYPES>>(mut self) {
86 let (test_sender, test_receiver) = broadcast(EVENT_CHANNEL_SIZE);
87 let spinning_changes = self
88 .launcher
89 .metadata
90 .spinning_properties
91 .node_changes
92 .clone();
93
94 let mut late_start_nodes: HashSet<u64> = HashSet::new();
95 let mut restart_nodes: HashSet<u64> = HashSet::new();
96 for (_, changes) in &spinning_changes {
97 for change in changes {
98 if matches!(change.updown, NodeAction::Up) {
99 late_start_nodes.insert(change.idx.try_into().unwrap());
100 }
101 if matches!(change.updown, NodeAction::RestartDown(_)) {
102 restart_nodes.insert(change.idx.try_into().unwrap());
103 }
104 }
105 }
106
107 self.add_nodes::<B>(
108 self.launcher
109 .metadata
110 .test_config
111 .num_nodes_with_stake
112 .into(),
113 &late_start_nodes,
114 &restart_nodes,
115 )
116 .await;
117 let mut event_rxs = vec![];
118 let mut internal_event_rxs = vec![];
119
120 for node in &self.nodes {
121 let r = node.handle.event_stream_known_impl();
122 event_rxs.push(r);
123 }
124 for node in &self.nodes {
125 let r = node.handle.internal_event_stream_receiver_known_impl();
126 internal_event_rxs.push(r);
127 }
128
129 let TestRunner {
130 launcher,
131 nodes,
132 late_start,
133 next_node_id: _,
134 _pd: _,
135 } = self;
136
137 let mut task_futs = vec![];
138 let meta = launcher.metadata.clone();
139
140 let handles = Arc::new(RwLock::new(nodes));
141
142 let txn_task =
143 if let TxnTaskDescription::RoundRobinTimeBased(duration) = meta.txn_description {
144 let txn_task = TxnTask {
145 handles: Arc::clone(&handles),
146 next_node_idx: Some(0),
147 duration,
148 shutdown_chan: test_receiver.clone(),
149 };
150 Some(txn_task)
151 } else {
152 None
153 };
154
155 let CompletionTaskDescription::TimeBasedCompletionTaskBuilder(time_based) =
157 meta.completion_task_description;
158 let completion_task = CompletionTask {
159 tx: test_sender.clone(),
160 rx: test_receiver.clone(),
161 duration: time_based.duration,
162 };
163
164 let mut changes: BTreeMap<ViewNumber, Vec<ChangeNode>> = BTreeMap::new();
167 for (view, mut change) in spinning_changes {
168 changes
169 .entry(ViewNumber::new(view))
170 .or_default()
171 .append(&mut change);
172 }
173
174 let spinning_task_state = SpinningTask {
175 epoch_height: launcher.metadata.test_config.epoch_height,
176 epoch_start_block: launcher.metadata.test_config.epoch_start_block,
177 start_epoch_info: Vec::new(),
178 handles: Arc::clone(&handles),
179 late_start,
180 latest_view: None,
181 changes,
182 last_decided_leaf: Leaf2::genesis(
183 &TestValidatedState::default(),
184 &TestInstanceState::default(),
185 launcher.metadata.upgrade.base,
186 )
187 .await,
188 high_qc: QuorumCertificate2::genesis(
189 &TestValidatedState::default(),
190 &TestInstanceState::default(),
191 launcher.metadata.upgrade,
192 )
193 .await,
194 next_epoch_high_qc: None,
195 async_delay_config: launcher.metadata.async_delay_config,
196 restart_contexts: HashMap::new(),
197 channel_generator: launcher.resource_generators.channel_generator,
198 state_cert: None,
199 node_stakes: launcher.metadata.node_stakes.clone(),
200 upgrade: launcher.metadata.upgrade,
201 };
202 let spinning_task = TestTask::<SpinningTask<TYPES, N, I>>::new(
203 spinning_task_state,
204 event_rxs.clone(),
205 test_receiver.clone(),
206 );
207
208 let consistency_task_state = ConsistencyTask {
209 consensus_leaves: BTreeMap::new(),
210 safety_properties: launcher.metadata.overall_safety_properties.clone(),
211 test_sender: test_sender.clone(),
212 errors: vec![],
213 ensure_upgrade: launcher.metadata.upgrade_view.is_some(),
214 validate_transactions: launcher.metadata.validate_transactions,
215 timeout_task: spawn_timeout_task(
216 test_sender.clone(),
217 launcher.metadata.overall_safety_properties.decide_timeout,
218 ),
219 };
220
221 let consistency_task = TestTask::<ConsistencyTask<TYPES>>::new(
222 consistency_task_state,
223 event_rxs.clone(),
224 test_receiver.clone(),
225 );
226
227 let view_sync_task_state = ViewSyncTask {
229 hit_view_sync: HashSet::new(),
230 description: launcher.metadata.view_sync_properties,
231 _pd: PhantomData,
232 };
233
234 let view_sync_task = TestTask::<ViewSyncTask<TYPES, I>>::new(
235 view_sync_task_state,
236 internal_event_rxs,
237 test_receiver.clone(),
238 );
239
240 let nodes = handles.read().await;
241
242 for node in &*nodes {
244 node.network.wait_for_ready().await;
245 }
246
247 for node in &*nodes {
249 if !late_start_nodes.contains(&node.node_id) {
250 node.handle.hotshot.start_consensus().await;
251 }
252 }
253
254 drop(nodes);
255
256 for seed in launcher.additional_test_tasks {
257 let task = TestTask::new(
258 seed.into_state(Arc::clone(&handles)).await,
259 event_rxs.clone(),
260 test_receiver.clone(),
261 );
262 task_futs.push(task.run());
263 }
264
265 task_futs.push(consistency_task.run());
266 task_futs.push(view_sync_task.run());
267 task_futs.push(spinning_task.run());
268
269 let txn_handle = txn_task.map(|txn| txn.run());
271 let completion_handle = completion_task.run();
272
273 let mut error_list = vec![];
274
275 let results = join_all(task_futs).await;
276
277 for result in results {
278 match result {
279 Ok(res) => match res {
280 TestResult::Pass => {
281 info!("Task shut down successfully");
282 },
283 TestResult::Fail(e) => error_list.push(e),
284 },
285 Err(e) => {
286 tracing::error!("Error Joining the test task {e:?}");
287 },
288 }
289 }
290
291 if let Some(handle) = txn_handle {
292 handle.abort();
293 }
294 let mut nodes = handles.write().await;
297
298 for node in &mut *nodes {
299 node.handle.shut_down().await;
300 }
301 tracing::info!("Nodes shutdown");
302
303 completion_handle.abort();
304
305 assert!(
306 error_list.is_empty(),
307 "{}",
308 error_list
309 .iter()
310 .fold("TEST FAILED! Results:".to_string(), |acc, error| {
311 format!("{acc}\n\n{error:?}")
312 })
313 );
314 }
315
316 pub async fn init_builders<B: TestBuilderImplementation<TYPES>>(
317 &self,
318 ) -> (Vec<Box<dyn BuilderTask<TYPES>>>, Vec<Url>) {
319 let mut builder_tasks = Vec::new();
320 let mut builder_urls = Vec::new();
321
322 let mut ports = Vec::new();
323 for _ in &self.launcher.metadata.builders {
324 let port =
325 test_utils::reserve_tcp_port().expect("OS should have ephemeral ports available");
326 ports.push(port);
327 }
328
329 for (metadata, builder_port) in self.launcher.metadata.builders.iter().zip(&ports) {
330 let builder_url =
331 Url::parse(&format!("http://localhost:{builder_port}")).expect("Invalid URL");
332 let builder_task = B::start(
333 0, builder_url.clone(),
335 B::Config::default(),
336 metadata.changes.clone(),
337 )
338 .await;
339 builder_tasks.push(builder_task);
340 builder_urls.push(builder_url);
341 }
342
343 (builder_tasks, builder_urls)
344 }
345
346 pub async fn add_nodes<B: TestBuilderImplementation<TYPES>>(
351 &mut self,
352 total: usize,
353 late_start: &HashSet<u64>,
354 restart: &HashSet<u64>,
355 ) -> Vec<u64> {
356 let mut results = vec![];
357 let config = self.launcher.metadata.test_config.clone();
358
359 let (mut builder_tasks, builder_urls) = self.init_builders::<B>().await;
362
363 let mut uninitialized_nodes = Vec::new();
365 let mut networks_ready = Vec::new();
366
367 for i in 0..total {
368 let mut config = config.clone();
369 if let Some(upgrade_view) = self.launcher.metadata.upgrade_view {
370 config.set_view_upgrade(upgrade_view);
371 }
372 let node_id = self.next_node_id;
373 self.next_node_id += 1;
374 tracing::debug!("launch node {i}");
375
376 config.builder_urls = builder_urls
377 .clone()
378 .try_into()
379 .expect("Non-empty by construction");
380
381 let storage = (self.launcher.resource_generators.storage)(node_id);
382
383 let is_da = node_id < config.da_staked_committee_size as u64;
385
386 let validator_config = ValidatorConfig::<TYPES>::generated_from_seed_indexed(
388 [0u8; 32],
389 node_id,
390 self.launcher.metadata.node_stakes.get(node_id),
391 is_da,
392 );
393
394 let public_key = validator_config.public_key.clone();
395
396 let network = if late_start.contains(&node_id) && self.launcher.metadata.skip_late {
397 None
398 } else {
399 let net = (self.launcher.resource_generators.channel_generator)(node_id).await;
400 networks_ready.push({
401 let net = net.clone();
402 async move { net.wait_for_ready().await }
403 });
404 Some(net)
405 };
406
407 if late_start.contains(&node_id) {
408 if self.launcher.metadata.skip_late {
409 self.late_start.insert(
410 node_id,
411 LateStartNode {
412 network: None,
413 context: LateNodeContext::UninitializedContext(
414 LateNodeContextParameters {
415 storage: storage.clone(),
416 config,
417 },
418 ),
419 },
420 );
421 } else {
422 let initializer = HotShotInitializer::<TYPES>::from_genesis(
423 TestInstanceState::new(
424 self.launcher
425 .metadata
426 .async_delay_config
427 .get(&node_id)
428 .cloned()
429 .unwrap_or_default(),
430 ),
431 config.epoch_height,
432 config.epoch_start_block,
433 vec![InitializerEpochInfo::<TYPES> {
434 epoch: EpochNumber::new(1),
435 drb_result: INITIAL_DRB_RESULT,
436 block_header: None,
437 }],
438 self.launcher.metadata.upgrade,
439 )
440 .await
441 .unwrap();
442
443 let network = network.clone().expect("!skip_late => network created");
444
445 let hotshot = Self::add_node_with_config(
446 node_id,
447 network.clone(),
448 <TYPES as NodeType>::Membership::new(
449 config.known_nodes_with_stake.clone(),
450 config.known_da_nodes.clone(),
451 public_key.clone(),
452 config.epoch_height,
453 ),
454 initializer,
455 config,
456 self.launcher.metadata.upgrade,
457 validator_config,
458 storage,
459 )
460 .await;
461 self.late_start.insert(
462 node_id,
463 LateStartNode {
464 network: Some(network),
465 context: LateNodeContext::InitializedContext(hotshot),
466 },
467 );
468 }
469 } else {
470 let network = network.expect("!late_start => network created");
471 uninitialized_nodes.push((
472 node_id,
473 network.clone(),
474 <TYPES as NodeType>::Membership::new(
475 config.known_nodes_with_stake.clone(),
476 config.known_da_nodes.clone(),
477 public_key.clone(),
478 config.epoch_height,
479 ),
480 config,
481 storage,
482 ));
483 }
484
485 results.push(node_id);
486 }
487
488 for node_id in &results {
491 if restart.contains(node_id) {
492 self.late_start.insert(
493 *node_id,
494 LateStartNode {
495 network: None,
496 context: LateNodeContext::Restart,
497 },
498 );
499 }
500 }
501
502 join_all(networks_ready).await;
504
505 for (node_id, network, memberships, config, storage) in uninitialized_nodes {
507 let public_key = ValidatorConfig::<TYPES>::generated_from_seed_indexed(
508 [0u8; 32],
509 node_id,
510 self.launcher.metadata.node_stakes.get(node_id),
511 node_id < config.da_staked_committee_size as u64,
512 )
513 .public_key;
514 let memberships = Arc::new(memberships);
515 let handle = create_test_handle(
516 self.launcher.metadata.clone(),
517 node_id,
518 network.clone(),
519 memberships.clone(),
520 config.clone(),
521 storage.clone(),
522 )
523 .await;
524 memberships.set_leaf_fetcher(
530 Arc::new(ConnectedNetworkLeafFetcher::<TYPES, _>::new(
531 network.clone(),
532 )),
533 storage,
534 public_key,
535 handle.event_stream_known_impl(),
536 );
537
538 match node_id.cmp(&(config.da_staked_committee_size as u64 - 1)) {
539 std::cmp::Ordering::Less => {
540 if let Some(task) = builder_tasks.pop() {
541 task.start(Box::new(handle.event_stream()))
542 }
543 },
544 std::cmp::Ordering::Equal => {
545 while let Some(task) = builder_tasks.pop() {
547 task.start(Box::new(handle.event_stream()))
548 }
549 },
550 std::cmp::Ordering::Greater => {},
551 }
552
553 self.nodes.push(Node {
554 node_id,
555 network,
556 handle,
557 });
558 }
559
560 results
561 }
562
563 #[allow(clippy::too_many_arguments)]
567 pub async fn add_node_with_config(
568 node_id: u64,
569 network: Network<TYPES, I>,
570 memberships: TYPES::Membership,
571 initializer: HotShotInitializer<TYPES>,
572 config: HotShotConfig<TYPES>,
573 upgrade: versions::Upgrade,
574 validator_config: ValidatorConfig<TYPES>,
575 storage: I::Storage,
576 ) -> Arc<SystemContext<TYPES, I>> {
577 let internal_chan = async_broadcast::broadcast(EVENT_CHANNEL_SIZE);
578 let external_chan = async_broadcast::broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
579
580 memberships.set_leaf_fetcher(
585 Arc::new(ConnectedNetworkLeafFetcher::<TYPES, _>::new(
586 network.clone(),
587 )),
588 storage.clone(),
589 validator_config.public_key.clone(),
590 external_chan.1.new_receiver(),
591 );
592
593 Self::add_node_with_config_and_channels(
594 node_id,
595 network,
596 memberships,
597 initializer,
598 config,
599 upgrade,
600 validator_config,
601 storage,
602 internal_chan,
603 external_chan,
604 )
605 .await
606 }
607
608 #[allow(clippy::too_many_arguments, clippy::type_complexity)]
612 pub async fn add_node_with_config_and_channels(
613 node_id: u64,
614 network: Network<TYPES, I>,
615 memberships: TYPES::Membership,
616 initializer: HotShotInitializer<TYPES>,
617 config: HotShotConfig<TYPES>,
618 upgrade: versions::Upgrade,
619 validator_config: ValidatorConfig<TYPES>,
620 storage: I::Storage,
621 internal_channel: (
622 Sender<Arc<HotShotEvent<TYPES>>>,
623 Receiver<Arc<HotShotEvent<TYPES>>>,
624 ),
625 external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
626 ) -> Arc<SystemContext<TYPES, I>> {
627 let private_key = validator_config.private_key.clone();
629 let public_key = validator_config.public_key.clone();
630 let state_private_key = validator_config.state_private_key.clone();
631 let epoch_height = config.epoch_height;
632
633 SystemContext::new_from_channels(
634 public_key,
635 private_key,
636 state_private_key,
637 node_id,
638 config,
639 upgrade,
640 EpochMembershipCoordinator::new(Arc::new(memberships), epoch_height, &storage.clone()),
641 network,
642 initializer,
643 ConsensusMetricsValue::default(),
644 storage,
645 StorageMetricsValue::default(),
646 internal_channel,
647 external_channel,
648 )
649 .await
650 }
651}
652
653pub struct Node<TYPES: NodeType, I: TestableNodeImplementation<TYPES>> {
655 pub node_id: u64,
657 pub network: Network<TYPES, I>,
659 pub handle: SystemContextHandle<TYPES, I>,
661}
662
663pub struct LateNodeContextParameters<TYPES: NodeType, I: TestableNodeImplementation<TYPES>> {
666 pub storage: I::Storage,
668
669 pub config: HotShotConfig<TYPES>,
671}
672
673#[allow(clippy::large_enum_variant)]
675pub enum LateNodeContext<TYPES: NodeType, I: TestableNodeImplementation<TYPES>> {
676 InitializedContext(Arc<SystemContext<TYPES, I>>),
679
680 UninitializedContext(LateNodeContextParameters<TYPES, I>),
683 Restart,
685}
686
687pub struct LateStartNode<TYPES: NodeType, I: TestableNodeImplementation<TYPES>> {
689 pub network: Option<Network<TYPES, I>>,
691 pub context: LateNodeContext<TYPES, I>,
694}
695
696pub struct TestRunner<
699 TYPES: NodeType,
700 I: TestableNodeImplementation<TYPES>,
701 N: ConnectedNetwork<TYPES::SignatureKey>,
702> {
703 pub(crate) launcher: TestLauncher<TYPES, I>,
705 pub(crate) nodes: Vec<Node<TYPES, I>>,
707 pub(crate) late_start: HashMap<u64, LateStartNode<TYPES, I>>,
709 pub(crate) next_node_id: u64,
711 pub(crate) _pd: PhantomData<N>,
713}