Skip to main content

hotshot_testing/
spinning_task.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8    collections::{BTreeMap, HashMap},
9    sync::Arc,
10};
11
12use async_broadcast::broadcast;
13use async_lock::RwLock;
14use async_trait::async_trait;
15use futures::future::join_all;
16use hotshot::{
17    HotShotInitializer, InitializerEpochInfo, SystemContext, traits::TestableNodeImplementation,
18    types::EventType,
19};
20use hotshot_example_types::{
21    block_types::TestBlockHeader,
22    membership::TestableMembership,
23    state_types::{TestInstanceState, TestValidatedState},
24    storage_types::TestStorage,
25    testable_delay::DelayConfig,
26};
27use hotshot_types::{
28    ValidatorConfig,
29    constants::EVENT_CHANNEL_SIZE,
30    data::{Leaf2, ViewNumber},
31    event::Event,
32    message::convert_proposal,
33    simple_certificate::{
34        LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
35    },
36    traits::{
37        network::{AsyncGenerator, ConnectedNetwork},
38        node_implementation::{NodeImplementation, NodeType},
39    },
40    utils::genesis_epoch_from_version,
41    vote::HasViewNumber,
42};
43use hotshot_utils::anytrace::*;
44
45use crate::{
46    node_stake::TestNodeStakes,
47    test_launcher::Network,
48    test_runner::{LateNodeContext, LateNodeContextParameters, LateStartNode, Node, TestRunner},
49    test_task::{TestResult, TestTaskState},
50};
51
52/// convenience type for state and block
53pub type StateAndBlock<S, B> = (Vec<S>, Vec<B>);
54
55/// Spinning task state
56pub struct SpinningTask<
57    TYPES: NodeType,
58    N: ConnectedNetwork<TYPES::SignatureKey>,
59    I: TestableNodeImplementation<TYPES>,
60> {
61    /// epoch height
62    pub epoch_height: u64,
63    /// Epoch start block
64    pub epoch_start_block: u64,
65    /// Saved epoch information. This must be sorted ascending by epoch.
66    pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
67    /// handle to the nodes
68    pub(crate) handles: Arc<RwLock<Vec<Node<TYPES, I>>>>,
69    /// late start nodes
70    pub(crate) late_start: HashMap<u64, LateStartNode<TYPES, I>>,
71    /// time based changes
72    pub(crate) changes: BTreeMap<ViewNumber, Vec<ChangeNode>>,
73    /// most recent view seen by spinning task
74    pub(crate) latest_view: Option<ViewNumber>,
75    /// Last decided leaf that can be used as the anchor leaf to initialize the node.
76    pub(crate) last_decided_leaf: Leaf2<TYPES>,
77    /// Highest qc seen in the test for restarting nodes
78    pub(crate) high_qc: QuorumCertificate2<TYPES>,
79    /// Next epoch highest qc seen in the test for restarting nodes
80    pub(crate) next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
81    /// Add specified delay to async calls
82    pub(crate) async_delay_config: HashMap<u64, DelayConfig>,
83    /// Context stored for nodes to be restarted with
84    pub(crate) restart_contexts: HashMap<usize, RestartContext<TYPES, N, I>>,
85    /// Generate network channel for restart nodes
86    pub(crate) channel_generator: AsyncGenerator<Network<TYPES, I>>,
87    /// The light client state update certificate
88    pub(crate) state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
89    /// Node stakes
90    pub(crate) node_stakes: TestNodeStakes,
91    /// Configured version upgrade
92    pub(crate) upgrade: versions::Upgrade,
93}
94
95#[async_trait]
96impl<
97    TYPES: NodeType<
98            InstanceState = TestInstanceState,
99            ValidatedState = TestValidatedState,
100            BlockHeader = TestBlockHeader,
101        >,
102    I: TestableNodeImplementation<TYPES>,
103    N: ConnectedNetwork<TYPES::SignatureKey>,
104> TestTaskState for SpinningTask<TYPES, N, I>
105where
106    I: TestableNodeImplementation<TYPES>,
107    I: NodeImplementation<TYPES, Network = N, Storage = TestStorage<TYPES>>,
108    <TYPES as NodeType>::Membership: TestableMembership<TYPES>,
109{
110    type Event = Event<TYPES>;
111    type Error = Error;
112
113    async fn handle_event(&mut self, (message, _id): (Self::Event, usize)) -> Result<()> {
114        let Event { view_number, event } = message;
115
116        if let EventType::Decide { leaf_chain, .. } = event {
117            let leaf = leaf_chain.first().unwrap().leaf.clone();
118            if leaf.view_number() > self.last_decided_leaf.view_number() {
119                self.last_decided_leaf = leaf;
120            }
121        } else if let EventType::QuorumProposal {
122            proposal,
123            sender: _,
124        } = event
125        {
126            if proposal.data.justify_qc().view_number() > self.high_qc.view_number() {
127                self.high_qc = proposal.data.justify_qc().clone();
128            }
129        } else if let EventType::ViewTimeout { view_number } = event {
130            tracing::error!("View timeout for view {view_number}");
131        }
132
133        let mut new_nodes = vec![];
134        let mut new_networks = vec![];
135        // if we have not seen this view before
136        if self.latest_view.is_none() || view_number > self.latest_view.unwrap() {
137            // perform operations on the nodes
138            if let Some(operations) = self.changes.remove(&view_number) {
139                for ChangeNode { idx, updown } in operations {
140                    match updown {
141                        NodeAction::Up => {
142                            let node_id = idx.try_into().unwrap();
143                            if let Some(node) = self.late_start.remove(&node_id) {
144                                tracing::error!("Node {idx} spinning up late");
145                                let network = if let Some(network) = node.network {
146                                    network
147                                } else {
148                                    let generated_network = (self.channel_generator)(node_id).await;
149                                    generated_network.wait_for_ready().await;
150                                    generated_network
151                                };
152                                let node_id = idx.try_into().unwrap();
153                                let context = match node.context {
154                                    LateNodeContext::InitializedContext(context) => context,
155                                    // Node not initialized. Initialize it
156                                    // based on the received leaf.
157                                    LateNodeContext::UninitializedContext(late_context_params) => {
158                                        let LateNodeContextParameters { storage, config } =
159                                            late_context_params;
160
161                                        // We assign node's public key and stake value rather than read from config file since it's a test
162                                        let validator_config: ValidatorConfig<TYPES> =
163                                            ValidatorConfig::generated_from_seed_indexed(
164                                                [0u8; 32],
165                                                node_id,
166                                                self.node_stakes.get(node_id),
167                                                // For tests, make the node DA based on its index
168                                                node_id < config.da_staked_committee_size as u64,
169                                            );
170
171                                        let memberships = <TYPES as NodeType>::Membership::new(
172                                            config.known_nodes_with_stake.clone(),
173                                            config.known_da_nodes.clone(),
174                                            validator_config.public_key.clone(),
175                                            config.epoch_height,
176                                        );
177
178                                        let initializer = HotShotInitializer::<TYPES>::load(
179                                            TestInstanceState::new(
180                                                self.async_delay_config
181                                                    .get(&node_id)
182                                                    .cloned()
183                                                    .unwrap_or_default(),
184                                            ),
185                                            self.epoch_height,
186                                            self.epoch_start_block,
187                                            self.start_epoch_info.clone(),
188                                            self.last_decided_leaf.clone(),
189                                            (
190                                                ViewNumber::genesis(),
191                                                genesis_epoch_from_version(self.upgrade.base),
192                                            ),
193                                            (self.high_qc.clone(), self.next_epoch_high_qc.clone()),
194                                            ViewNumber::genesis(),
195                                            BTreeMap::new(),
196                                            BTreeMap::new(),
197                                            None,
198                                            self.state_cert.clone(),
199                                        );
200
201                                        TestRunner::add_node_with_config(
202                                            node_id,
203                                            network.clone(),
204                                            memberships,
205                                            initializer,
206                                            config,
207                                            self.upgrade,
208                                            validator_config,
209                                            storage,
210                                        )
211                                        .await
212                                    },
213                                    LateNodeContext::Restart => {
214                                        panic!("Cannot spin up a node with Restart context")
215                                    },
216                                };
217
218                                let handle = context.run_tasks().await;
219
220                                // Create the node and add it to the state, so we can shut them
221                                // down properly later to avoid the overflow error in the overall
222                                // safety task.
223                                let node = Node {
224                                    node_id,
225                                    network,
226                                    handle,
227                                };
228                                node.handle.hotshot.start_consensus().await;
229
230                                self.handles.write().await.push(node);
231                            }
232                        },
233                        NodeAction::Down => {
234                            if let Some(node) = self.handles.write().await.get_mut(idx) {
235                                tracing::error!("Node {idx} shutting down in view {view_number}");
236                                node.handle.shut_down().await;
237                            }
238                        },
239                        NodeAction::RestartDown(delay_views) => {
240                            let node_id = idx.try_into().unwrap();
241                            if let Some(node) = self.handles.write().await.get_mut(idx) {
242                                tracing::error!("Node {idx} shutting down in view {view_number}");
243                                node.handle.shut_down().await;
244                                // For restarted nodes generate the network on correct view
245                                let generated_network = (self.channel_generator)(node_id).await;
246
247                                let Some(LateStartNode {
248                                    network: _,
249                                    context: LateNodeContext::Restart,
250                                }) = self.late_start.get(&node_id)
251                                else {
252                                    panic!("Restarted Nodes must have an uninitialized context");
253                                };
254
255                                let storage = node.handle.storage().clone();
256
257                                let membership = <TYPES as NodeType>::Membership::new(
258                                    node.handle.hotshot.config.known_nodes_with_stake.clone(),
259                                    node.handle.hotshot.config.known_da_nodes.clone(),
260                                    node.handle.public_key().clone(),
261                                    node.handle.hotshot.config.epoch_height,
262                                );
263
264                                let config = node.handle.hotshot.config.clone();
265
266                                let next_epoch_high_qc = storage.next_epoch_high_qc_cloned().await;
267                                let start_view = storage.restart_view().await;
268                                let last_actioned_view = storage.last_actioned_view().await;
269                                let start_epoch = storage.last_actioned_epoch().await;
270                                let high_qc = storage.high_qc_cloned().await.unwrap_or(
271                                    QuorumCertificate2::genesis(
272                                        &TestValidatedState::default(),
273                                        &TestInstanceState::default(),
274                                        self.upgrade,
275                                    )
276                                    .await,
277                                );
278                                let state_cert = storage.state_cert_cloned().await;
279                                let saved_proposals = storage.proposals_cloned().await;
280                                let mut vid_shares = BTreeMap::new();
281                                for (view, hash_map) in storage.vids_cloned().await {
282                                    let mut converted_hash_map = HashMap::new();
283                                    for (key, proposal) in hash_map {
284                                        converted_hash_map
285                                            .entry(key)
286                                            .or_insert_with(BTreeMap::new)
287                                            .insert(
288                                                proposal.data.target_epoch(),
289                                                convert_proposal(proposal),
290                                            );
291                                    }
292                                    vid_shares.insert(view, converted_hash_map);
293                                }
294                                let decided_upgrade_certificate =
295                                    storage.decided_upgrade_certificate().await;
296
297                                let initializer = HotShotInitializer::<TYPES>::load(
298                                    TestInstanceState::new(
299                                        self.async_delay_config
300                                            .get(&node_id)
301                                            .cloned()
302                                            .unwrap_or_default(),
303                                    ),
304                                    self.epoch_height,
305                                    self.epoch_start_block,
306                                    self.start_epoch_info.clone(),
307                                    self.last_decided_leaf.clone(),
308                                    (start_view, start_epoch),
309                                    (high_qc, next_epoch_high_qc),
310                                    last_actioned_view,
311                                    saved_proposals,
312                                    vid_shares,
313                                    decided_upgrade_certificate,
314                                    state_cert,
315                                );
316                                // We assign node's public key and stake value rather than read from config file since it's a test
317                                let validator_config: ValidatorConfig<TYPES> =
318                                    ValidatorConfig::generated_from_seed_indexed(
319                                        [0u8; 32],
320                                        node_id,
321                                        self.node_stakes.get(node_id),
322                                        // For tests, make the node DA based on its index
323                                        node_id < config.da_staked_committee_size as u64,
324                                    );
325                                let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
326                                // Install the test leaf fetcher on the
327                                // restarted node's membership before the
328                                // SystemContext spins up, so epoch-root
329                                // catchup is wired against the same external
330                                // channel the network task forwards events to.
331                                membership.set_leaf_fetcher(
332                                    Arc::new(
333                                        hotshot_types::traits::leaf_fetcher_network::ConnectedNetworkLeafFetcher::<
334                                            TYPES,
335                                            _,
336                                        >::new(generated_network.clone()),
337                                    ),
338                                    storage.clone(),
339                                    validator_config.public_key.clone(),
340                                    node.handle.event_stream_known_impl(),
341                                );
342                                let context =
343                                    TestRunner::<TYPES, I, N>::add_node_with_config_and_channels(
344                                        node_id,
345                                        generated_network.clone(),
346                                        membership,
347                                        initializer,
348                                        config,
349                                        self.upgrade,
350                                        validator_config,
351                                        storage.clone(),
352                                        internal_chan,
353                                        (
354                                            node.handle.external_channel_sender(),
355                                            node.handle.event_stream_known_impl().new_receiver(),
356                                        ),
357                                    )
358                                    .await;
359                                tracing::info!(
360                                    "Node {} restarting in view {} with start view {}",
361                                    idx,
362                                    view_number + delay_views,
363                                    start_view
364                                );
365                                if delay_views == 0 {
366                                    new_nodes.push((context, idx));
367                                    new_networks.push(generated_network.clone());
368                                } else {
369                                    let up_view = view_number + delay_views;
370                                    let change = ChangeNode {
371                                        idx,
372                                        updown: NodeAction::RestartUp,
373                                    };
374                                    self.changes.entry(up_view).or_default().push(change);
375                                    let new_ctx = RestartContext {
376                                        context,
377                                        network: generated_network.clone(),
378                                    };
379                                    self.restart_contexts.insert(idx, new_ctx);
380                                }
381                            }
382                        },
383                        NodeAction::RestartUp => {
384                            if let Some(ctx) = self.restart_contexts.remove(&idx) {
385                                new_nodes.push((ctx.context, idx));
386                                new_networks.push(ctx.network.clone());
387                            }
388                        },
389                        NodeAction::NetworkUp => {
390                            if let Some(handle) = self.handles.write().await.get(idx) {
391                                tracing::error!("Node {idx} networks resuming");
392                                handle.network.resume();
393                            }
394                        },
395                        NodeAction::NetworkDown => {
396                            if let Some(handle) = self.handles.write().await.get(idx) {
397                                tracing::error!("Node {idx} networks pausing");
398                                handle.network.pause();
399                            }
400                        },
401                    }
402                }
403            }
404            let mut ready_futs = vec![];
405            while let Some(net) = new_networks.pop() {
406                ready_futs.push(async move {
407                    net.wait_for_ready().await;
408                });
409            }
410            join_all(ready_futs).await;
411
412            let mut start_futs = vec![];
413
414            while let Some((node, id)) = new_nodes.pop() {
415                let handles = self.handles.clone();
416                let fut = async move {
417                    tracing::info!("Starting node {id} back up");
418                    node.network.wait_for_ready().await;
419                    let handle = node.run_tasks().await;
420
421                    // Create the node and add it to the state, so we can shut them
422                    // down properly later to avoid the overflow error in the overall
423                    // safety task.
424                    let node = Node {
425                        node_id: id.try_into().unwrap(),
426                        network: node.network.clone(),
427                        handle,
428                    };
429                    node.handle.hotshot.start_consensus().await;
430
431                    handles.write().await[id] = node;
432                };
433                start_futs.push(fut);
434            }
435            if !start_futs.is_empty() {
436                join_all(start_futs).await;
437                tracing::info!("Nodes all started");
438            }
439
440            // update our latest view
441            self.latest_view = Some(view_number);
442        }
443
444        Ok(())
445    }
446
447    async fn check(&self) -> TestResult {
448        TestResult::Pass
449    }
450}
451
452#[derive(Clone)]
453pub(crate) struct RestartContext<
454    TYPES: NodeType,
455    N: ConnectedNetwork<TYPES::SignatureKey>,
456    I: TestableNodeImplementation<TYPES>,
457> {
458    context: Arc<SystemContext<TYPES, I>>,
459    network: Arc<N>,
460}
461
462/// Spin the node up or down
463#[derive(Clone, Debug)]
464pub enum NodeAction {
465    /// spin the node up
466    Up,
467    /// spin the node down
468    Down,
469    /// spin the node's network up
470    NetworkUp,
471    /// spin the node's network down
472    NetworkDown,
473    /// Take a node down to be restarted after a number of views
474    RestartDown(u64),
475    /// Start a node up again after it's been shutdown for restart.  This
476    /// should only be created following a `RestartDown`
477    RestartUp,
478}
479
480/// denotes a change in node state
481#[derive(Clone, Debug)]
482pub struct ChangeNode {
483    /// the index of the node
484    pub idx: usize,
485    /// spin the node or node's network up or down
486    pub updown: NodeAction,
487}
488
489/// description of the spinning task
490/// (used to build a spinning task)
491#[derive(Clone, Debug)]
492pub struct SpinningTaskDescription {
493    /// the changes in node status, time -> changes
494    pub node_changes: Vec<(u64, Vec<ChangeNode>)>,
495}