hotshot_testing/spinning_task.rs
1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::{
8 collections::{BTreeMap, HashMap},
9 sync::Arc,
10};
11
12use async_broadcast::broadcast;
13use async_lock::RwLock;
14use async_trait::async_trait;
15use futures::future::join_all;
16use hotshot::{
17 HotShotInitializer, InitializerEpochInfo, SystemContext, traits::TestableNodeImplementation,
18 types::EventType,
19};
20use hotshot_example_types::{
21 block_types::TestBlockHeader,
22 membership::TestableMembership,
23 state_types::{TestInstanceState, TestValidatedState},
24 storage_types::TestStorage,
25 testable_delay::DelayConfig,
26};
27use hotshot_types::{
28 ValidatorConfig,
29 constants::EVENT_CHANNEL_SIZE,
30 data::{Leaf2, ViewNumber},
31 event::Event,
32 message::convert_proposal,
33 simple_certificate::{
34 LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
35 },
36 traits::{
37 network::{AsyncGenerator, ConnectedNetwork},
38 node_implementation::{NodeImplementation, NodeType},
39 },
40 utils::genesis_epoch_from_version,
41 vote::HasViewNumber,
42};
43use hotshot_utils::anytrace::*;
44
45use crate::{
46 node_stake::TestNodeStakes,
47 test_launcher::Network,
48 test_runner::{LateNodeContext, LateNodeContextParameters, LateStartNode, Node, TestRunner},
49 test_task::{TestResult, TestTaskState},
50};
51
52/// convenience type for state and block
53pub type StateAndBlock<S, B> = (Vec<S>, Vec<B>);
54
55/// Spinning task state
56pub struct SpinningTask<
57 TYPES: NodeType,
58 N: ConnectedNetwork<TYPES::SignatureKey>,
59 I: TestableNodeImplementation<TYPES>,
60> {
61 /// epoch height
62 pub epoch_height: u64,
63 /// Epoch start block
64 pub epoch_start_block: u64,
65 /// Saved epoch information. This must be sorted ascending by epoch.
66 pub start_epoch_info: Vec<InitializerEpochInfo<TYPES>>,
67 /// handle to the nodes
68 pub(crate) handles: Arc<RwLock<Vec<Node<TYPES, I>>>>,
69 /// late start nodes
70 pub(crate) late_start: HashMap<u64, LateStartNode<TYPES, I>>,
71 /// time based changes
72 pub(crate) changes: BTreeMap<ViewNumber, Vec<ChangeNode>>,
73 /// most recent view seen by spinning task
74 pub(crate) latest_view: Option<ViewNumber>,
75 /// Last decided leaf that can be used as the anchor leaf to initialize the node.
76 pub(crate) last_decided_leaf: Leaf2<TYPES>,
77 /// Highest qc seen in the test for restarting nodes
78 pub(crate) high_qc: QuorumCertificate2<TYPES>,
79 /// Next epoch highest qc seen in the test for restarting nodes
80 pub(crate) next_epoch_high_qc: Option<NextEpochQuorumCertificate2<TYPES>>,
81 /// Add specified delay to async calls
82 pub(crate) async_delay_config: HashMap<u64, DelayConfig>,
83 /// Context stored for nodes to be restarted with
84 pub(crate) restart_contexts: HashMap<usize, RestartContext<TYPES, N, I>>,
85 /// Generate network channel for restart nodes
86 pub(crate) channel_generator: AsyncGenerator<Network<TYPES, I>>,
87 /// The light client state update certificate
88 pub(crate) state_cert: Option<LightClientStateUpdateCertificateV2<TYPES>>,
89 /// Node stakes
90 pub(crate) node_stakes: TestNodeStakes,
91 /// Configured version upgrade
92 pub(crate) upgrade: versions::Upgrade,
93}
94
95#[async_trait]
96impl<
97 TYPES: NodeType<
98 InstanceState = TestInstanceState,
99 ValidatedState = TestValidatedState,
100 BlockHeader = TestBlockHeader,
101 >,
102 I: TestableNodeImplementation<TYPES>,
103 N: ConnectedNetwork<TYPES::SignatureKey>,
104> TestTaskState for SpinningTask<TYPES, N, I>
105where
106 I: TestableNodeImplementation<TYPES>,
107 I: NodeImplementation<TYPES, Network = N, Storage = TestStorage<TYPES>>,
108 <TYPES as NodeType>::Membership: TestableMembership<TYPES>,
109{
110 type Event = Event<TYPES>;
111 type Error = Error;
112
113 async fn handle_event(&mut self, (message, _id): (Self::Event, usize)) -> Result<()> {
114 let Event { view_number, event } = message;
115
116 if let EventType::Decide { leaf_chain, .. } = event {
117 let leaf = leaf_chain.first().unwrap().leaf.clone();
118 if leaf.view_number() > self.last_decided_leaf.view_number() {
119 self.last_decided_leaf = leaf;
120 }
121 } else if let EventType::QuorumProposal {
122 proposal,
123 sender: _,
124 } = event
125 {
126 if proposal.data.justify_qc().view_number() > self.high_qc.view_number() {
127 self.high_qc = proposal.data.justify_qc().clone();
128 }
129 } else if let EventType::ViewTimeout { view_number } = event {
130 tracing::error!("View timeout for view {view_number}");
131 }
132
133 let mut new_nodes = vec![];
134 let mut new_networks = vec![];
135 // if we have not seen this view before
136 if self.latest_view.is_none() || view_number > self.latest_view.unwrap() {
137 // perform operations on the nodes
138 if let Some(operations) = self.changes.remove(&view_number) {
139 for ChangeNode { idx, updown } in operations {
140 match updown {
141 NodeAction::Up => {
142 let node_id = idx.try_into().unwrap();
143 if let Some(node) = self.late_start.remove(&node_id) {
144 tracing::error!("Node {idx} spinning up late");
145 let network = if let Some(network) = node.network {
146 network
147 } else {
148 let generated_network = (self.channel_generator)(node_id).await;
149 generated_network.wait_for_ready().await;
150 generated_network
151 };
152 let node_id = idx.try_into().unwrap();
153 let context = match node.context {
154 LateNodeContext::InitializedContext(context) => context,
155 // Node not initialized. Initialize it
156 // based on the received leaf.
157 LateNodeContext::UninitializedContext(late_context_params) => {
158 let LateNodeContextParameters { storage, config } =
159 late_context_params;
160
161 // We assign node's public key and stake value rather than read from config file since it's a test
162 let validator_config: ValidatorConfig<TYPES> =
163 ValidatorConfig::generated_from_seed_indexed(
164 [0u8; 32],
165 node_id,
166 self.node_stakes.get(node_id),
167 // For tests, make the node DA based on its index
168 node_id < config.da_staked_committee_size as u64,
169 );
170
171 let memberships = <TYPES as NodeType>::Membership::new(
172 config.known_nodes_with_stake.clone(),
173 config.known_da_nodes.clone(),
174 validator_config.public_key.clone(),
175 config.epoch_height,
176 );
177
178 let initializer = HotShotInitializer::<TYPES>::load(
179 TestInstanceState::new(
180 self.async_delay_config
181 .get(&node_id)
182 .cloned()
183 .unwrap_or_default(),
184 ),
185 self.epoch_height,
186 self.epoch_start_block,
187 self.start_epoch_info.clone(),
188 self.last_decided_leaf.clone(),
189 (
190 ViewNumber::genesis(),
191 genesis_epoch_from_version(self.upgrade.base),
192 ),
193 (self.high_qc.clone(), self.next_epoch_high_qc.clone()),
194 ViewNumber::genesis(),
195 BTreeMap::new(),
196 BTreeMap::new(),
197 None,
198 self.state_cert.clone(),
199 );
200
201 TestRunner::add_node_with_config(
202 node_id,
203 network.clone(),
204 memberships,
205 initializer,
206 config,
207 self.upgrade,
208 validator_config,
209 storage,
210 )
211 .await
212 },
213 LateNodeContext::Restart => {
214 panic!("Cannot spin up a node with Restart context")
215 },
216 };
217
218 let handle = context.run_tasks().await;
219
220 // Create the node and add it to the state, so we can shut them
221 // down properly later to avoid the overflow error in the overall
222 // safety task.
223 let node = Node {
224 node_id,
225 network,
226 handle,
227 };
228 node.handle.hotshot.start_consensus().await;
229
230 self.handles.write().await.push(node);
231 }
232 },
233 NodeAction::Down => {
234 if let Some(node) = self.handles.write().await.get_mut(idx) {
235 tracing::error!("Node {idx} shutting down in view {view_number}");
236 node.handle.shut_down().await;
237 }
238 },
239 NodeAction::RestartDown(delay_views) => {
240 let node_id = idx.try_into().unwrap();
241 if let Some(node) = self.handles.write().await.get_mut(idx) {
242 tracing::error!("Node {idx} shutting down in view {view_number}");
243 node.handle.shut_down().await;
244 // For restarted nodes generate the network on correct view
245 let generated_network = (self.channel_generator)(node_id).await;
246
247 let Some(LateStartNode {
248 network: _,
249 context: LateNodeContext::Restart,
250 }) = self.late_start.get(&node_id)
251 else {
252 panic!("Restarted Nodes must have an uninitialized context");
253 };
254
255 let storage = node.handle.storage().clone();
256
257 let membership = <TYPES as NodeType>::Membership::new(
258 node.handle.hotshot.config.known_nodes_with_stake.clone(),
259 node.handle.hotshot.config.known_da_nodes.clone(),
260 node.handle.public_key().clone(),
261 node.handle.hotshot.config.epoch_height,
262 );
263
264 let config = node.handle.hotshot.config.clone();
265
266 let next_epoch_high_qc = storage.next_epoch_high_qc_cloned().await;
267 let start_view = storage.restart_view().await;
268 let last_actioned_view = storage.last_actioned_view().await;
269 let start_epoch = storage.last_actioned_epoch().await;
270 let high_qc = storage.high_qc_cloned().await.unwrap_or(
271 QuorumCertificate2::genesis(
272 &TestValidatedState::default(),
273 &TestInstanceState::default(),
274 self.upgrade,
275 )
276 .await,
277 );
278 let state_cert = storage.state_cert_cloned().await;
279 let saved_proposals = storage.proposals_cloned().await;
280 let mut vid_shares = BTreeMap::new();
281 for (view, hash_map) in storage.vids_cloned().await {
282 let mut converted_hash_map = HashMap::new();
283 for (key, proposal) in hash_map {
284 converted_hash_map
285 .entry(key)
286 .or_insert_with(BTreeMap::new)
287 .insert(
288 proposal.data.target_epoch(),
289 convert_proposal(proposal),
290 );
291 }
292 vid_shares.insert(view, converted_hash_map);
293 }
294 let decided_upgrade_certificate =
295 storage.decided_upgrade_certificate().await;
296
297 let initializer = HotShotInitializer::<TYPES>::load(
298 TestInstanceState::new(
299 self.async_delay_config
300 .get(&node_id)
301 .cloned()
302 .unwrap_or_default(),
303 ),
304 self.epoch_height,
305 self.epoch_start_block,
306 self.start_epoch_info.clone(),
307 self.last_decided_leaf.clone(),
308 (start_view, start_epoch),
309 (high_qc, next_epoch_high_qc),
310 last_actioned_view,
311 saved_proposals,
312 vid_shares,
313 decided_upgrade_certificate,
314 state_cert,
315 );
316 // We assign node's public key and stake value rather than read from config file since it's a test
317 let validator_config: ValidatorConfig<TYPES> =
318 ValidatorConfig::generated_from_seed_indexed(
319 [0u8; 32],
320 node_id,
321 self.node_stakes.get(node_id),
322 // For tests, make the node DA based on its index
323 node_id < config.da_staked_committee_size as u64,
324 );
325 let internal_chan = broadcast(EVENT_CHANNEL_SIZE);
326 // Install the test leaf fetcher on the
327 // restarted node's membership before the
328 // SystemContext spins up, so epoch-root
329 // catchup is wired against the same external
330 // channel the network task forwards events to.
331 membership.set_leaf_fetcher(
332 Arc::new(
333 hotshot_types::traits::leaf_fetcher_network::ConnectedNetworkLeafFetcher::<
334 TYPES,
335 _,
336 >::new(generated_network.clone()),
337 ),
338 storage.clone(),
339 validator_config.public_key.clone(),
340 node.handle.event_stream_known_impl(),
341 );
342 let context =
343 TestRunner::<TYPES, I, N>::add_node_with_config_and_channels(
344 node_id,
345 generated_network.clone(),
346 membership,
347 initializer,
348 config,
349 self.upgrade,
350 validator_config,
351 storage.clone(),
352 internal_chan,
353 (
354 node.handle.external_channel_sender(),
355 node.handle.event_stream_known_impl().new_receiver(),
356 ),
357 )
358 .await;
359 tracing::info!(
360 "Node {} restarting in view {} with start view {}",
361 idx,
362 view_number + delay_views,
363 start_view
364 );
365 if delay_views == 0 {
366 new_nodes.push((context, idx));
367 new_networks.push(generated_network.clone());
368 } else {
369 let up_view = view_number + delay_views;
370 let change = ChangeNode {
371 idx,
372 updown: NodeAction::RestartUp,
373 };
374 self.changes.entry(up_view).or_default().push(change);
375 let new_ctx = RestartContext {
376 context,
377 network: generated_network.clone(),
378 };
379 self.restart_contexts.insert(idx, new_ctx);
380 }
381 }
382 },
383 NodeAction::RestartUp => {
384 if let Some(ctx) = self.restart_contexts.remove(&idx) {
385 new_nodes.push((ctx.context, idx));
386 new_networks.push(ctx.network.clone());
387 }
388 },
389 NodeAction::NetworkUp => {
390 if let Some(handle) = self.handles.write().await.get(idx) {
391 tracing::error!("Node {idx} networks resuming");
392 handle.network.resume();
393 }
394 },
395 NodeAction::NetworkDown => {
396 if let Some(handle) = self.handles.write().await.get(idx) {
397 tracing::error!("Node {idx} networks pausing");
398 handle.network.pause();
399 }
400 },
401 }
402 }
403 }
404 let mut ready_futs = vec![];
405 while let Some(net) = new_networks.pop() {
406 ready_futs.push(async move {
407 net.wait_for_ready().await;
408 });
409 }
410 join_all(ready_futs).await;
411
412 let mut start_futs = vec![];
413
414 while let Some((node, id)) = new_nodes.pop() {
415 let handles = self.handles.clone();
416 let fut = async move {
417 tracing::info!("Starting node {id} back up");
418 node.network.wait_for_ready().await;
419 let handle = node.run_tasks().await;
420
421 // Create the node and add it to the state, so we can shut them
422 // down properly later to avoid the overflow error in the overall
423 // safety task.
424 let node = Node {
425 node_id: id.try_into().unwrap(),
426 network: node.network.clone(),
427 handle,
428 };
429 node.handle.hotshot.start_consensus().await;
430
431 handles.write().await[id] = node;
432 };
433 start_futs.push(fut);
434 }
435 if !start_futs.is_empty() {
436 join_all(start_futs).await;
437 tracing::info!("Nodes all started");
438 }
439
440 // update our latest view
441 self.latest_view = Some(view_number);
442 }
443
444 Ok(())
445 }
446
447 async fn check(&self) -> TestResult {
448 TestResult::Pass
449 }
450}
451
452#[derive(Clone)]
453pub(crate) struct RestartContext<
454 TYPES: NodeType,
455 N: ConnectedNetwork<TYPES::SignatureKey>,
456 I: TestableNodeImplementation<TYPES>,
457> {
458 context: Arc<SystemContext<TYPES, I>>,
459 network: Arc<N>,
460}
461
462/// Spin the node up or down
463#[derive(Clone, Debug)]
464pub enum NodeAction {
465 /// spin the node up
466 Up,
467 /// spin the node down
468 Down,
469 /// spin the node's network up
470 NetworkUp,
471 /// spin the node's network down
472 NetworkDown,
473 /// Take a node down to be restarted after a number of views
474 RestartDown(u64),
475 /// Start a node up again after it's been shutdown for restart. This
476 /// should only be created following a `RestartDown`
477 RestartUp,
478}
479
480/// denotes a change in node state
481#[derive(Clone, Debug)]
482pub struct ChangeNode {
483 /// the index of the node
484 pub idx: usize,
485 /// spin the node or node's network up or down
486 pub updown: NodeAction,
487}
488
489/// description of the spinning task
490/// (used to build a spinning task)
491#[derive(Clone, Debug)]
492pub struct SpinningTaskDescription {
493 /// the changes in node status, time -> changes
494 pub node_changes: Vec<(u64, Vec<ChangeNode>)>,
495}