Skip to main content

hotshot_testing/
test_runner.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7#![allow(clippy::panic)]
8use std::{
9    collections::{BTreeMap, HashMap, HashSet},
10    marker::PhantomData,
11    sync::Arc,
12};
13
14use async_broadcast::{Receiver, Sender, broadcast};
15use async_lock::RwLock;
16use futures::future::join_all;
17use hotshot::{
18    HotShotInitializer, InitializerEpochInfo, SystemContext,
19    traits::TestableNodeImplementation,
20    types::{Event, SystemContextHandle},
21};
22use hotshot_example_types::{
23    block_types::TestBlockHeader,
24    membership::TestableMembership,
25    state_types::{TestInstanceState, TestValidatedState},
26    storage_types::TestStorage,
27};
28use hotshot_task_impls::events::HotShotEvent;
29use hotshot_types::{
30    HotShotConfig, ValidatorConfig,
31    consensus::ConsensusMetricsValue,
32    constants::{EVENT_CHANNEL_SIZE, EXTERNAL_EVENT_CHANNEL_SIZE},
33    data::{EpochNumber, Leaf2, ViewNumber},
34    drb::INITIAL_DRB_RESULT,
35    epoch_membership::EpochMembershipCoordinator,
36    simple_certificate::QuorumCertificate2,
37    storage_metrics::StorageMetricsValue,
38    traits::{
39        leaf_fetcher_network::ConnectedNetworkLeafFetcher,
40        network::ConnectedNetwork,
41        node_implementation::{NodeImplementation, NodeType},
42    },
43};
44use tide_disco::Url;
45#[allow(deprecated)]
46use tracing::info;
47
48use super::{
49    completion_task::CompletionTask, consistency_task::ConsistencyTask, txn_task::TxnTask,
50};
51use crate::{
52    block_builder::{BuilderTask, TestBuilderImplementation},
53    completion_task::CompletionTaskDescription,
54    spinning_task::{ChangeNode, NodeAction, SpinningTask},
55    test_builder::create_test_handle,
56    test_launcher::{Network, TestLauncher},
57    test_task::{TestResult, TestTask, spawn_timeout_task},
58    txn_task::TxnTaskDescription,
59    view_sync_task::ViewSyncTask,
60};
61
62pub trait TaskErr: std::error::Error + Sync + Send + 'static {}
63
64impl<T: std::error::Error + Sync + Send + 'static> TaskErr for T {}
65
66impl<
67    TYPES: NodeType<
68            InstanceState = TestInstanceState,
69            ValidatedState = TestValidatedState,
70            BlockHeader = TestBlockHeader,
71        >,
72    I: TestableNodeImplementation<TYPES>,
73    N: ConnectedNetwork<TYPES::SignatureKey>,
74> TestRunner<TYPES, I, N>
75where
76    I: TestableNodeImplementation<TYPES>,
77    I: NodeImplementation<TYPES, Network = N, Storage = TestStorage<TYPES>>,
78    <TYPES as NodeType>::Membership: TestableMembership<TYPES>,
79{
80    /// execute test
81    ///
82    /// # Panics
83    /// if the test fails
84    #[allow(clippy::too_many_lines)]
85    pub async fn run_test<B: TestBuilderImplementation<TYPES>>(mut self) {
86        let (test_sender, test_receiver) = broadcast(EVENT_CHANNEL_SIZE);
87        let spinning_changes = self
88            .launcher
89            .metadata
90            .spinning_properties
91            .node_changes
92            .clone();
93
94        let mut late_start_nodes: HashSet<u64> = HashSet::new();
95        let mut restart_nodes: HashSet<u64> = HashSet::new();
96        for (_, changes) in &spinning_changes {
97            for change in changes {
98                if matches!(change.updown, NodeAction::Up) {
99                    late_start_nodes.insert(change.idx.try_into().unwrap());
100                }
101                if matches!(change.updown, NodeAction::RestartDown(_)) {
102                    restart_nodes.insert(change.idx.try_into().unwrap());
103                }
104            }
105        }
106
107        self.add_nodes::<B>(
108            self.launcher
109                .metadata
110                .test_config
111                .num_nodes_with_stake
112                .into(),
113            &late_start_nodes,
114            &restart_nodes,
115        )
116        .await;
117        let mut event_rxs = vec![];
118        let mut internal_event_rxs = vec![];
119
120        for node in &self.nodes {
121            let r = node.handle.event_stream_known_impl();
122            event_rxs.push(r);
123        }
124        for node in &self.nodes {
125            let r = node.handle.internal_event_stream_receiver_known_impl();
126            internal_event_rxs.push(r);
127        }
128
129        let TestRunner {
130            launcher,
131            nodes,
132            late_start,
133            next_node_id: _,
134            _pd: _,
135        } = self;
136
137        let mut task_futs = vec![];
138        let meta = launcher.metadata.clone();
139
140        let handles = Arc::new(RwLock::new(nodes));
141
142        let txn_task =
143            if let TxnTaskDescription::RoundRobinTimeBased(duration) = meta.txn_description {
144                let txn_task = TxnTask {
145                    handles: Arc::clone(&handles),
146                    next_node_idx: Some(0),
147                    duration,
148                    shutdown_chan: test_receiver.clone(),
149                };
150                Some(txn_task)
151            } else {
152                None
153            };
154
155        // add completion task
156        let CompletionTaskDescription::TimeBasedCompletionTaskBuilder(time_based) =
157            meta.completion_task_description;
158        let completion_task = CompletionTask {
159            tx: test_sender.clone(),
160            rx: test_receiver.clone(),
161            duration: time_based.duration,
162        };
163
164        // add spinning task
165        // map spinning to view
166        let mut changes: BTreeMap<ViewNumber, Vec<ChangeNode>> = BTreeMap::new();
167        for (view, mut change) in spinning_changes {
168            changes
169                .entry(ViewNumber::new(view))
170                .or_default()
171                .append(&mut change);
172        }
173
174        let spinning_task_state = SpinningTask {
175            epoch_height: launcher.metadata.test_config.epoch_height,
176            epoch_start_block: launcher.metadata.test_config.epoch_start_block,
177            start_epoch_info: Vec::new(),
178            handles: Arc::clone(&handles),
179            late_start,
180            latest_view: None,
181            changes,
182            last_decided_leaf: Leaf2::genesis(
183                &TestValidatedState::default(),
184                &TestInstanceState::default(),
185                launcher.metadata.upgrade.base,
186            )
187            .await,
188            high_qc: QuorumCertificate2::genesis(
189                &TestValidatedState::default(),
190                &TestInstanceState::default(),
191                launcher.metadata.upgrade,
192            )
193            .await,
194            next_epoch_high_qc: None,
195            async_delay_config: launcher.metadata.async_delay_config,
196            restart_contexts: HashMap::new(),
197            channel_generator: launcher.resource_generators.channel_generator,
198            state_cert: None,
199            node_stakes: launcher.metadata.node_stakes.clone(),
200            upgrade: launcher.metadata.upgrade,
201        };
202        let spinning_task = TestTask::<SpinningTask<TYPES, N, I>>::new(
203            spinning_task_state,
204            event_rxs.clone(),
205            test_receiver.clone(),
206        );
207
208        let consistency_task_state = ConsistencyTask {
209            consensus_leaves: BTreeMap::new(),
210            safety_properties: launcher.metadata.overall_safety_properties.clone(),
211            test_sender: test_sender.clone(),
212            errors: vec![],
213            ensure_upgrade: launcher.metadata.upgrade_view.is_some(),
214            validate_transactions: launcher.metadata.validate_transactions,
215            timeout_task: spawn_timeout_task(
216                test_sender.clone(),
217                launcher.metadata.overall_safety_properties.decide_timeout,
218            ),
219        };
220
221        let consistency_task = TestTask::<ConsistencyTask<TYPES>>::new(
222            consistency_task_state,
223            event_rxs.clone(),
224            test_receiver.clone(),
225        );
226
227        // add view sync task
228        let view_sync_task_state = ViewSyncTask {
229            hit_view_sync: HashSet::new(),
230            description: launcher.metadata.view_sync_properties,
231            _pd: PhantomData,
232        };
233
234        let view_sync_task = TestTask::<ViewSyncTask<TYPES, I>>::new(
235            view_sync_task_state,
236            internal_event_rxs,
237            test_receiver.clone(),
238        );
239
240        let nodes = handles.read().await;
241
242        // wait for networks to be ready
243        for node in &*nodes {
244            node.network.wait_for_ready().await;
245        }
246
247        // Start hotshot
248        for node in &*nodes {
249            if !late_start_nodes.contains(&node.node_id) {
250                node.handle.hotshot.start_consensus().await;
251            }
252        }
253
254        drop(nodes);
255
256        for seed in launcher.additional_test_tasks {
257            let task = TestTask::new(
258                seed.into_state(Arc::clone(&handles)).await,
259                event_rxs.clone(),
260                test_receiver.clone(),
261            );
262            task_futs.push(task.run());
263        }
264
265        task_futs.push(consistency_task.run());
266        task_futs.push(view_sync_task.run());
267        task_futs.push(spinning_task.run());
268
269        // `generator` tasks that do not process events.
270        let txn_handle = txn_task.map(|txn| txn.run());
271        let completion_handle = completion_task.run();
272
273        let mut error_list = vec![];
274
275        let results = join_all(task_futs).await;
276
277        for result in results {
278            match result {
279                Ok(res) => match res {
280                    TestResult::Pass => {
281                        info!("Task shut down successfully");
282                    },
283                    TestResult::Fail(e) => error_list.push(e),
284                },
285                Err(e) => {
286                    tracing::error!("Error Joining the test task {e:?}");
287                },
288            }
289        }
290
291        if let Some(handle) = txn_handle {
292            handle.abort();
293        }
294        // Shutdown all of the servers at the end
295
296        let mut nodes = handles.write().await;
297
298        for node in &mut *nodes {
299            node.handle.shut_down().await;
300        }
301        tracing::info!("Nodes shutdown");
302
303        completion_handle.abort();
304
305        assert!(
306            error_list.is_empty(),
307            "{}",
308            error_list
309                .iter()
310                .fold("TEST FAILED! Results:".to_string(), |acc, error| {
311                    format!("{acc}\n\n{error:?}")
312                })
313        );
314    }
315
316    pub async fn init_builders<B: TestBuilderImplementation<TYPES>>(
317        &self,
318    ) -> (Vec<Box<dyn BuilderTask<TYPES>>>, Vec<Url>) {
319        let mut builder_tasks = Vec::new();
320        let mut builder_urls = Vec::new();
321
322        let mut ports = Vec::new();
323        for _ in &self.launcher.metadata.builders {
324            let port =
325                test_utils::reserve_tcp_port().expect("OS should have ephemeral ports available");
326            ports.push(port);
327        }
328
329        for (metadata, builder_port) in self.launcher.metadata.builders.iter().zip(&ports) {
330            let builder_url =
331                Url::parse(&format!("http://localhost:{builder_port}")).expect("Invalid URL");
332            let builder_task = B::start(
333                0, // This field gets updated while the test is running, 0 is just to seed it
334                builder_url.clone(),
335                B::Config::default(),
336                metadata.changes.clone(),
337            )
338            .await;
339            builder_tasks.push(builder_task);
340            builder_urls.push(builder_url);
341        }
342
343        (builder_tasks, builder_urls)
344    }
345
346    /// Add nodes.
347    ///
348    /// # Panics
349    /// Panics if unable to create a [`HotShotInitializer`]
350    pub async fn add_nodes<B: TestBuilderImplementation<TYPES>>(
351        &mut self,
352        total: usize,
353        late_start: &HashSet<u64>,
354        restart: &HashSet<u64>,
355    ) -> Vec<u64> {
356        let mut results = vec![];
357        let config = self.launcher.metadata.test_config.clone();
358
359        // Num_nodes is updated on the fly now via claim_block_with_num_nodes. This stays around to seed num_nodes
360        // in the builders for tests which don't update that field.
361        let (mut builder_tasks, builder_urls) = self.init_builders::<B>().await;
362
363        // Collect uninitialized nodes because we need to wait for all networks to be ready before starting the tasks
364        let mut uninitialized_nodes = Vec::new();
365        let mut networks_ready = Vec::new();
366
367        for i in 0..total {
368            let mut config = config.clone();
369            if let Some(upgrade_view) = self.launcher.metadata.upgrade_view {
370                config.set_view_upgrade(upgrade_view);
371            }
372            let node_id = self.next_node_id;
373            self.next_node_id += 1;
374            tracing::debug!("launch node {i}");
375
376            config.builder_urls = builder_urls
377                .clone()
378                .try_into()
379                .expect("Non-empty by construction");
380
381            let storage = (self.launcher.resource_generators.storage)(node_id);
382
383            // See whether or not we should be DA
384            let is_da = node_id < config.da_staked_committee_size as u64;
385
386            // We assign node's public key and stake value rather than read from config file since it's a test
387            let validator_config = ValidatorConfig::<TYPES>::generated_from_seed_indexed(
388                [0u8; 32],
389                node_id,
390                self.launcher.metadata.node_stakes.get(node_id),
391                is_da,
392            );
393
394            let public_key = validator_config.public_key.clone();
395
396            let network = if late_start.contains(&node_id) && self.launcher.metadata.skip_late {
397                None
398            } else {
399                let net = (self.launcher.resource_generators.channel_generator)(node_id).await;
400                networks_ready.push({
401                    let net = net.clone();
402                    async move { net.wait_for_ready().await }
403                });
404                Some(net)
405            };
406
407            if late_start.contains(&node_id) {
408                if self.launcher.metadata.skip_late {
409                    self.late_start.insert(
410                        node_id,
411                        LateStartNode {
412                            network: None,
413                            context: LateNodeContext::UninitializedContext(
414                                LateNodeContextParameters {
415                                    storage: storage.clone(),
416                                    config,
417                                },
418                            ),
419                        },
420                    );
421                } else {
422                    let initializer = HotShotInitializer::<TYPES>::from_genesis(
423                        TestInstanceState::new(
424                            self.launcher
425                                .metadata
426                                .async_delay_config
427                                .get(&node_id)
428                                .cloned()
429                                .unwrap_or_default(),
430                        ),
431                        config.epoch_height,
432                        config.epoch_start_block,
433                        vec![InitializerEpochInfo::<TYPES> {
434                            epoch: EpochNumber::new(1),
435                            drb_result: INITIAL_DRB_RESULT,
436                            block_header: None,
437                        }],
438                        self.launcher.metadata.upgrade,
439                    )
440                    .await
441                    .unwrap();
442
443                    let network = network.clone().expect("!skip_late => network created");
444
445                    let hotshot = Self::add_node_with_config(
446                        node_id,
447                        network.clone(),
448                        <TYPES as NodeType>::Membership::new(
449                            config.known_nodes_with_stake.clone(),
450                            config.known_da_nodes.clone(),
451                            public_key.clone(),
452                            config.epoch_height,
453                        ),
454                        initializer,
455                        config,
456                        self.launcher.metadata.upgrade,
457                        validator_config,
458                        storage,
459                    )
460                    .await;
461                    self.late_start.insert(
462                        node_id,
463                        LateStartNode {
464                            network: Some(network),
465                            context: LateNodeContext::InitializedContext(hotshot),
466                        },
467                    );
468                }
469            } else {
470                let network = network.expect("!late_start => network created");
471                uninitialized_nodes.push((
472                    node_id,
473                    network.clone(),
474                    <TYPES as NodeType>::Membership::new(
475                        config.known_nodes_with_stake.clone(),
476                        config.known_da_nodes.clone(),
477                        public_key.clone(),
478                        config.epoch_height,
479                    ),
480                    config,
481                    storage,
482                ));
483            }
484
485            results.push(node_id);
486        }
487
488        // Add the restart nodes after the rest.  This must be done after all the original networks are
489        // created because this will reset the bootstrap info for the restarted nodes
490        for node_id in &results {
491            if restart.contains(node_id) {
492                self.late_start.insert(
493                    *node_id,
494                    LateStartNode {
495                        network: None,
496                        context: LateNodeContext::Restart,
497                    },
498                );
499            }
500        }
501
502        // Wait for all networks to be ready
503        join_all(networks_ready).await;
504
505        // Then start the necessary tasks
506        for (node_id, network, memberships, config, storage) in uninitialized_nodes {
507            let public_key = ValidatorConfig::<TYPES>::generated_from_seed_indexed(
508                [0u8; 32],
509                node_id,
510                self.launcher.metadata.node_stakes.get(node_id),
511                node_id < config.da_staked_committee_size as u64,
512            )
513            .public_key;
514            let memberships = Arc::new(memberships);
515            let handle = create_test_handle(
516                self.launcher.metadata.clone(),
517                node_id,
518                network.clone(),
519                memberships.clone(),
520                config.clone(),
521                storage.clone(),
522            )
523            .await;
524            // Install the test leaf fetcher so epoch-root catchup has a
525            // working network and a receiver wired into the same external
526            // channel the network task forwards `ExternalMessageReceived`
527            // events to. Done after `create_test_handle` returns but before
528            // `start_consensus` so no catchup events can be missed.
529            memberships.set_leaf_fetcher(
530                Arc::new(ConnectedNetworkLeafFetcher::<TYPES, _>::new(
531                    network.clone(),
532                )),
533                storage,
534                public_key,
535                handle.event_stream_known_impl(),
536            );
537
538            match node_id.cmp(&(config.da_staked_committee_size as u64 - 1)) {
539                std::cmp::Ordering::Less => {
540                    if let Some(task) = builder_tasks.pop() {
541                        task.start(Box::new(handle.event_stream()))
542                    }
543                },
544                std::cmp::Ordering::Equal => {
545                    // If we have more builder tasks than DA nodes, pin them all on the last node.
546                    while let Some(task) = builder_tasks.pop() {
547                        task.start(Box::new(handle.event_stream()))
548                    }
549                },
550                std::cmp::Ordering::Greater => {},
551            }
552
553            self.nodes.push(Node {
554                node_id,
555                network,
556                handle,
557            });
558        }
559
560        results
561    }
562
563    /// add a specific node with a config
564    /// # Panics
565    /// if unable to initialize the node's `SystemContext` based on the config
566    #[allow(clippy::too_many_arguments)]
567    pub async fn add_node_with_config(
568        node_id: u64,
569        network: Network<TYPES, I>,
570        memberships: TYPES::Membership,
571        initializer: HotShotInitializer<TYPES>,
572        config: HotShotConfig<TYPES>,
573        upgrade: versions::Upgrade,
574        validator_config: ValidatorConfig<TYPES>,
575        storage: I::Storage,
576    ) -> Arc<SystemContext<TYPES, I>> {
577        let internal_chan = async_broadcast::broadcast(EVENT_CHANNEL_SIZE);
578        let external_chan = async_broadcast::broadcast(EXTERNAL_EVENT_CHANNEL_SIZE);
579
580        // Install the test leaf fetcher before consensus starts so epoch
581        // catchup (`get_epoch_root` / `get_epoch_drb`) has a working network
582        // and a receiver wired into the same external channel the network
583        // task uses to forward `ExternalMessageReceived` events.
584        memberships.set_leaf_fetcher(
585            Arc::new(ConnectedNetworkLeafFetcher::<TYPES, _>::new(
586                network.clone(),
587            )),
588            storage.clone(),
589            validator_config.public_key.clone(),
590            external_chan.1.new_receiver(),
591        );
592
593        Self::add_node_with_config_and_channels(
594            node_id,
595            network,
596            memberships,
597            initializer,
598            config,
599            upgrade,
600            validator_config,
601            storage,
602            internal_chan,
603            external_chan,
604        )
605        .await
606    }
607
608    /// add a specific node with a config
609    /// # Panics
610    /// if unable to initialize the node's `SystemContext` based on the config
611    #[allow(clippy::too_many_arguments, clippy::type_complexity)]
612    pub async fn add_node_with_config_and_channels(
613        node_id: u64,
614        network: Network<TYPES, I>,
615        memberships: TYPES::Membership,
616        initializer: HotShotInitializer<TYPES>,
617        config: HotShotConfig<TYPES>,
618        upgrade: versions::Upgrade,
619        validator_config: ValidatorConfig<TYPES>,
620        storage: I::Storage,
621        internal_channel: (
622            Sender<Arc<HotShotEvent<TYPES>>>,
623            Receiver<Arc<HotShotEvent<TYPES>>>,
624        ),
625        external_channel: (Sender<Event<TYPES>>, Receiver<Event<TYPES>>),
626    ) -> Arc<SystemContext<TYPES, I>> {
627        // Get key pair for certificate aggregation
628        let private_key = validator_config.private_key.clone();
629        let public_key = validator_config.public_key.clone();
630        let state_private_key = validator_config.state_private_key.clone();
631        let epoch_height = config.epoch_height;
632
633        SystemContext::new_from_channels(
634            public_key,
635            private_key,
636            state_private_key,
637            node_id,
638            config,
639            upgrade,
640            EpochMembershipCoordinator::new(Arc::new(memberships), epoch_height, &storage.clone()),
641            network,
642            initializer,
643            ConsensusMetricsValue::default(),
644            storage,
645            StorageMetricsValue::default(),
646            internal_channel,
647            external_channel,
648        )
649        .await
650    }
651}
652
653/// a node participating in a test
654pub struct Node<TYPES: NodeType, I: TestableNodeImplementation<TYPES>> {
655    /// The node's unique identifier
656    pub node_id: u64,
657    /// The underlying network belonging to the node
658    pub network: Network<TYPES, I>,
659    /// The handle to the node's internals
660    pub handle: SystemContextHandle<TYPES, I>,
661}
662
663/// This type combines all of the parameters needed to build the context for a node that started
664/// late during a unit test or integration test.
665pub struct LateNodeContextParameters<TYPES: NodeType, I: TestableNodeImplementation<TYPES>> {
666    /// The storage trait for Sequencer persistence.
667    pub storage: I::Storage,
668
669    /// The config associated with this node.
670    pub config: HotShotConfig<TYPES>,
671}
672
673/// The late node context dictates how we're building a node that started late during the test.
674#[allow(clippy::large_enum_variant)]
675pub enum LateNodeContext<TYPES: NodeType, I: TestableNodeImplementation<TYPES>> {
676    /// The system context that we're passing directly to the node, this means the node is already
677    /// initialized successfully.
678    InitializedContext(Arc<SystemContext<TYPES, I>>),
679
680    /// The system context that we're passing to the node when it is not yet initialized, so we're
681    /// initializing it based on the received leaf and init parameters.
682    UninitializedContext(LateNodeContextParameters<TYPES, I>),
683    /// The node is to be restarted so we will build the context from the node that was already running.
684    Restart,
685}
686
687/// A yet-to-be-started node that participates in tests
688pub struct LateStartNode<TYPES: NodeType, I: TestableNodeImplementation<TYPES>> {
689    /// The underlying network belonging to the node
690    pub network: Option<Network<TYPES, I>>,
691    /// Either the context to which we will use to launch HotShot for initialized node when it's
692    /// time, or the parameters that will be used to initialize the node and launch HotShot.
693    pub context: LateNodeContext<TYPES, I>,
694}
695
696/// The runner of a test network
697/// spin up and down nodes, execute rounds
698pub struct TestRunner<
699    TYPES: NodeType,
700    I: TestableNodeImplementation<TYPES>,
701    N: ConnectedNetwork<TYPES::SignatureKey>,
702> {
703    /// test launcher, contains a bunch of useful metadata and closures
704    pub(crate) launcher: TestLauncher<TYPES, I>,
705    /// nodes in the test
706    pub(crate) nodes: Vec<Node<TYPES, I>>,
707    /// nodes with a late start
708    pub(crate) late_start: HashMap<u64, LateStartNode<TYPES, I>>,
709    /// the next node unique identifier
710    pub(crate) next_node_id: u64,
711    /// Phantom for N
712    pub(crate) _pd: PhantomData<N>,
713}