hotshot_orchestrator/
lib.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7//! Orchestrator for manipulating nodes and recording results during a run of `HotShot` tests
8
9/// The orchestrator's clients
10pub mod client;
11
12use std::{
13    collections::{HashMap, HashSet},
14    fs,
15    fs::OpenOptions,
16    io,
17    time::Duration,
18};
19
20use alloy::primitives::U256;
21use async_lock::RwLock;
22use client::{BenchResults, BenchResultsDownloadConfig};
23use csv::Writer;
24use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
25use hotshot_types::{
26    PeerConfig,
27    network::{BuilderType, NetworkConfig, PublicKeysFile},
28    traits::{
29        node_implementation::NodeType,
30        signature_key::{SignatureKey, StakeTableEntryType},
31    },
32};
33use libp2p_identity::{
34    Keypair, PeerId,
35    ed25519::{Keypair as EdKeypair, SecretKey},
36};
37use multiaddr::Multiaddr;
38use surf_disco::Url;
39use tide_disco::{
40    Api, App, RequestError,
41    api::ApiError,
42    error::ServerError,
43    method::{ReadState, WriteState},
44};
45use vbs::{
46    BinarySerializer,
47    version::{StaticVersion, StaticVersionType},
48};
49
50/// Orchestrator is not, strictly speaking, bound to the network; it can have its own versioning.
51/// Orchestrator Version (major)
52pub const ORCHESTRATOR_MAJOR_VERSION: u16 = 0;
53/// Orchestrator Version (minor)
54pub const ORCHESTRATOR_MINOR_VERSION: u16 = 1;
55/// Orchestrator Version as a type
56pub type OrchestratorVersion =
57    StaticVersion<ORCHESTRATOR_MAJOR_VERSION, ORCHESTRATOR_MINOR_VERSION>;
58/// Orchestrator Version as a type-binding instance
59pub const ORCHESTRATOR_VERSION: OrchestratorVersion = StaticVersion {};
60
61/// Generate an keypair based on a `seed` and an `index`
62/// # Panics
63/// This panics if libp2p is unable to generate a secret key from the seed
64#[must_use]
65pub fn libp2p_generate_indexed_identity(seed: [u8; 32], index: u64) -> Keypair {
66    let mut hasher = blake3::Hasher::new();
67    hasher.update(&seed);
68    hasher.update(&index.to_le_bytes());
69    let new_seed = *hasher.finalize().as_bytes();
70    let sk_bytes = SecretKey::try_from_bytes(new_seed).unwrap();
71    <EdKeypair as From<SecretKey>>::from(sk_bytes).into()
72}
73
74/// The state of the orchestrator
75#[derive(Default, Clone)]
76#[allow(clippy::struct_excessive_bools)]
77struct OrchestratorState<TYPES: NodeType> {
78    /// Tracks the latest node index we have generated a configuration for
79    latest_index: u16,
80    /// Tracks the latest temporary index we have generated for init validator's key pair
81    tmp_latest_index: u16,
82    /// The network configuration
83    config: NetworkConfig<TYPES>,
84    /// Whether the network configuration has been updated with all the peer's public keys/configs
85    peer_pub_ready: bool,
86    /// A map from public keys to `(node_index, is_da)`.
87    pub_posted: HashMap<Vec<u8>, (u64, bool)>,
88    /// Whether nodes should start their HotShot instances
89    /// Will be set to true once all nodes post they are ready to start
90    start: bool,
91    /// The total nodes that have posted they are ready to start
92    nodes_connected: HashSet<PeerConfig<TYPES>>,
93    /// The results of the benchmarks
94    bench_results: BenchResults,
95    /// The number of nodes that have posted their results
96    nodes_post_results: u64,
97    /// Whether the orchestrator can be started manually
98    manual_start_allowed: bool,
99    /// Whether we are still accepting new keys for registration
100    accepting_new_keys: bool,
101    /// Builder address pool
102    builders: Vec<Url>,
103    /// whether we are using a fixed stake table, disabling public key registration
104    fixed_stake_table: bool,
105}
106
107impl<TYPES: NodeType> OrchestratorState<TYPES> {
108    /// create a new [`OrchestratorState`]
109    pub fn new(network_config: NetworkConfig<TYPES>) -> Self {
110        let mut peer_pub_ready = false;
111        let mut fixed_stake_table = false;
112
113        if network_config.config.known_nodes_with_stake.is_empty() {
114            println!(
115                "No nodes were loaded from the config file. Nodes will be allowed to register \
116                 dynamically."
117            );
118        } else {
119            println!("Initializing orchestrator with fixed stake table.");
120            peer_pub_ready = true;
121            fixed_stake_table = true;
122        }
123
124        let builders = if matches!(network_config.builder, BuilderType::External) {
125            network_config.config.builder_urls.clone().into()
126        } else {
127            vec![]
128        };
129
130        OrchestratorState {
131            latest_index: 0,
132            tmp_latest_index: 0,
133            config: network_config,
134            peer_pub_ready,
135            pub_posted: HashMap::new(),
136            nodes_connected: HashSet::new(),
137            start: false,
138            bench_results: BenchResults::default(),
139            nodes_post_results: 0,
140            manual_start_allowed: true,
141            accepting_new_keys: true,
142            builders,
143            fixed_stake_table,
144        }
145    }
146
147    /// Output the results to a csv file according to orchestrator state
148    pub fn output_to_csv(&self) {
149        let output_csv = BenchResultsDownloadConfig {
150            commit_sha: self.config.commit_sha.clone(),
151            total_nodes: self.config.config.num_nodes_with_stake.into(),
152            da_committee_size: self.config.config.da_staked_committee_size,
153            fixed_leader_for_gpuvid: self.config.config.fixed_leader_for_gpuvid,
154            transactions_per_round: self.config.transactions_per_round,
155            transaction_size: self.bench_results.transaction_size_in_bytes,
156            rounds: self.config.rounds,
157            partial_results: self.bench_results.partial_results.clone(),
158            avg_latency_in_sec: self.bench_results.avg_latency_in_sec,
159            minimum_latency_in_sec: self.bench_results.minimum_latency_in_sec,
160            maximum_latency_in_sec: self.bench_results.maximum_latency_in_sec,
161            throughput_bytes_per_sec: self.bench_results.throughput_bytes_per_sec,
162            total_transactions_committed: self.bench_results.total_transactions_committed,
163            total_time_elapsed_in_sec: self.bench_results.total_time_elapsed_in_sec,
164            total_num_views: self.bench_results.total_num_views,
165            failed_num_views: self.bench_results.failed_num_views,
166            committee_type: self.bench_results.committee_type.clone(),
167        };
168        // Open the CSV file in append mode
169        let results_csv_file = OpenOptions::new()
170            .create(true)
171            .append(true) // Open in append mode
172            .open("scripts/benchmarks_results/results.csv")
173            .unwrap();
174        // Open a file for writing
175        let mut wtr = Writer::from_writer(results_csv_file);
176        let _ = wtr.serialize(output_csv);
177        let _ = wtr.flush();
178        println!("Results successfully saved in scripts/benchmarks_results/results.csv");
179    }
180}
181
182/// An api exposed by the orchestrator
183pub trait OrchestratorApi<TYPES: NodeType> {
184    /// Post an identity to the orchestrator. Takes in optional
185    /// arguments so others can identify us on the Libp2p network.
186    /// # Errors
187    /// If we were unable to serve the request
188    fn post_identity(
189        &mut self,
190        libp2p_address: Option<Multiaddr>,
191        libp2p_public_key: Option<PeerId>,
192    ) -> Result<u16, ServerError>;
193    /// post endpoint for each node's config
194    /// # Errors
195    /// if unable to serve
196    fn post_getconfig(&mut self, _node_index: u16) -> Result<NetworkConfig<TYPES>, ServerError>;
197    /// get endpoint for the next available temporary node index
198    /// # Errors
199    /// if unable to serve
200    fn get_tmp_node_index(&mut self) -> Result<u16, ServerError>;
201    /// post endpoint for each node's public key
202    /// # Errors
203    /// if unable to serve
204    fn register_public_key(
205        &mut self,
206        pubkey: &mut Vec<u8>,
207        is_da: bool,
208        libp2p_address: Option<Multiaddr>,
209        libp2p_public_key: Option<PeerId>,
210    ) -> Result<(u64, bool), ServerError>;
211    /// post endpoint for whether or not all peers public keys are ready
212    /// # Errors
213    /// if unable to serve
214    fn peer_pub_ready(&self) -> Result<bool, ServerError>;
215    /// get endpoint for the network config after all peers public keys are collected
216    /// # Errors
217    /// if unable to serve
218    fn post_config_after_peer_collected(&mut self) -> Result<NetworkConfig<TYPES>, ServerError>;
219    /// get endpoint for whether or not the run has started
220    /// # Errors
221    /// if unable to serve
222    fn get_start(&self) -> Result<bool, ServerError>;
223    /// post endpoint for the results of the run
224    /// # Errors
225    /// if unable to serve
226    fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError>;
227    /// A node POSTs its public key to let the orchestrator know that it is ready
228    /// # Errors
229    /// if unable to serve
230    fn post_ready(&mut self, peer_config: &PeerConfig<TYPES>) -> Result<(), ServerError>;
231    /// post endpoint for manually starting the orchestrator
232    /// # Errors
233    /// if unable to serve
234    fn post_manual_start(&mut self, password_bytes: Vec<u8>) -> Result<(), ServerError>;
235    /// post endpoint for registering a builder with the orchestrator
236    /// # Errors
237    /// if unable to serve
238    fn post_builder(&mut self, builder: Url) -> Result<(), ServerError>;
239    /// get endpoints for builders
240    /// # Errors
241    /// if not all builders are registered yet
242    fn get_builders(&self) -> Result<Vec<Url>, ServerError>;
243}
244
245impl<TYPES: NodeType> OrchestratorState<TYPES>
246where
247    TYPES::SignatureKey: serde::Serialize + Clone + SignatureKey + 'static,
248{
249    /// register a node with an unknown public key.
250    /// this method should be used when we don't have a fixed stake table
251    fn register_unknown(
252        &mut self,
253        pubkey: &mut Vec<u8>,
254        da_requested: bool,
255        libp2p_address: Option<Multiaddr>,
256        libp2p_public_key: Option<PeerId>,
257    ) -> Result<(u64, bool), ServerError> {
258        if let Some((node_index, is_da)) = self.pub_posted.get(pubkey) {
259            return Ok((*node_index, *is_da));
260        }
261
262        if !self.accepting_new_keys {
263            return Err(ServerError {
264                status: tide_disco::StatusCode::FORBIDDEN,
265                message: "Network has been started manually, and is no longer registering new \
266                          keys."
267                    .to_string(),
268            });
269        }
270
271        let node_index = self.pub_posted.len() as u64;
272
273        // Deserialize the public key
274        let staked_pubkey = PeerConfig::<TYPES>::from_bytes(pubkey).unwrap();
275
276        self.config
277            .config
278            .known_nodes_with_stake
279            .push(staked_pubkey.clone());
280
281        let mut added_to_da = false;
282
283        let da_full =
284            self.config.config.known_da_nodes.len() >= self.config.config.da_staked_committee_size;
285
286        #[allow(clippy::nonminimal_bool)]
287        // We add the node to the DA committee depending on either its node index or whether it requested membership.
288        //
289        // Since we issue `node_index` incrementally, if we are deciding DA membership by node_index
290        // we only need to check that the DA committee is not yet full.
291        //
292        // Note: this logically simplifies to (self.config.indexed_da || da_requested) && !da_full,
293        // but writing it that way makes it a little less clear to me.
294        if (self.config.indexed_da || (!self.config.indexed_da && da_requested)) && !da_full {
295            self.config.config.known_da_nodes.push(staked_pubkey);
296            added_to_da = true;
297        }
298
299        self.pub_posted
300            .insert(pubkey.clone(), (node_index, added_to_da));
301
302        // If the orchestrator is set up for libp2p and we have supplied the proper
303        // Libp2p data, add our node to the list of bootstrap nodes.
304        if self.config.libp2p_config.clone().is_some()
305            && let (Some(libp2p_public_key), Some(libp2p_address)) =
306                (libp2p_public_key, libp2p_address)
307        {
308            // Push to our bootstrap nodes
309            self.config
310                .libp2p_config
311                .as_mut()
312                .unwrap()
313                .bootstrap_nodes
314                .push((libp2p_public_key, libp2p_address));
315        }
316
317        tracing::error!("Posted public key for node_index {node_index}");
318
319        // node_index starts at 0, so once it matches `num_nodes_with_stake`
320        // we will have registered one node too many. hence, we want `node_index + 1`.
321        if node_index + 1 >= (self.config.config.num_nodes_with_stake.get() as u64) {
322            self.peer_pub_ready = true;
323            self.accepting_new_keys = false;
324        }
325        Ok((node_index, added_to_da))
326    }
327
328    /// register a node on the fixed stake table, which was loaded at startup
329    fn register_from_list(
330        &mut self,
331        pubkey: &mut Vec<u8>,
332        da_requested: bool,
333        libp2p_address: Option<Multiaddr>,
334        libp2p_public_key: Option<PeerId>,
335    ) -> Result<(u64, bool), ServerError> {
336        // if we've already registered this node before, we just retrieve its info from `pub_posted`
337        if let Some((node_index, is_da)) = self.pub_posted.get(pubkey) {
338            return Ok((*node_index, *is_da));
339        }
340
341        // Deserialize the public key
342        let staked_pubkey = PeerConfig::<TYPES>::from_bytes(pubkey).unwrap();
343
344        // Check if the node is allowed to connect, returning its index and config entry if so.
345        let Some((node_index, node_config)) =
346            self.config.public_keys.iter().enumerate().find(|keys| {
347                keys.1.stake_table_key == staked_pubkey.stake_table_entry.public_key()
348            })
349        else {
350            return Err(ServerError {
351                status: tide_disco::StatusCode::FORBIDDEN,
352                message: "You are unauthorized to register with the orchestrator".to_string(),
353            });
354        };
355
356        // Check that our recorded DA status for the node matches what the node actually requested
357        if node_config.da != da_requested {
358            return Err(ServerError {
359                status: tide_disco::StatusCode::BAD_REQUEST,
360                message: format!(
361                    "Mismatch in DA status in registration for node {}. DA requested: {}, \
362                     expected: {}",
363                    node_index, da_requested, node_config.da
364                ),
365            });
366        }
367
368        let added_to_da = node_config.da;
369
370        self.pub_posted
371            .insert(pubkey.clone(), (node_index as u64, added_to_da));
372
373        // If the orchestrator is set up for libp2p and we have supplied the proper
374        // Libp2p data, add our node to the list of bootstrap nodes.
375        if self.config.libp2p_config.clone().is_some()
376            && let (Some(libp2p_public_key), Some(libp2p_address)) =
377                (libp2p_public_key, libp2p_address)
378        {
379            // Push to our bootstrap nodes
380            self.config
381                .libp2p_config
382                .as_mut()
383                .unwrap()
384                .bootstrap_nodes
385                .push((libp2p_public_key, libp2p_address));
386        }
387
388        tracing::error!("Node {node_index} has registered.");
389
390        Ok((node_index as u64, added_to_da))
391    }
392}
393
394impl<TYPES: NodeType> OrchestratorApi<TYPES> for OrchestratorState<TYPES>
395where
396    TYPES::SignatureKey: serde::Serialize + Clone + SignatureKey + 'static,
397{
398    /// Post an identity to the orchestrator. Takes in optional
399    /// arguments so others can identify us on the Libp2p network.
400    /// # Errors
401    /// If we were unable to serve the request
402    fn post_identity(
403        &mut self,
404        libp2p_address: Option<Multiaddr>,
405        libp2p_public_key: Option<PeerId>,
406    ) -> Result<u16, ServerError> {
407        let node_index = self.latest_index;
408        self.latest_index += 1;
409
410        if usize::from(node_index) >= self.config.config.num_nodes_with_stake.get() {
411            return Err(ServerError {
412                status: tide_disco::StatusCode::BAD_REQUEST,
413                message: "Network has reached capacity".to_string(),
414            });
415        }
416
417        // If the orchestrator is set up for libp2p and we have supplied the proper
418        // Libp2p data, add our node to the list of bootstrap nodes.
419        if self.config.libp2p_config.clone().is_some()
420            && let (Some(libp2p_public_key), Some(libp2p_address)) =
421                (libp2p_public_key, libp2p_address)
422        {
423            // Push to our bootstrap nodes
424            self.config
425                .libp2p_config
426                .as_mut()
427                .unwrap()
428                .bootstrap_nodes
429                .push((libp2p_public_key, libp2p_address));
430        }
431        Ok(node_index)
432    }
433
434    // Assumes nodes will set their own index that they received from the
435    // 'identity' endpoint
436    fn post_getconfig(&mut self, _node_index: u16) -> Result<NetworkConfig<TYPES>, ServerError> {
437        Ok(self.config.clone())
438    }
439
440    // Assumes one node do not get twice
441    fn get_tmp_node_index(&mut self) -> Result<u16, ServerError> {
442        let tmp_node_index = self.tmp_latest_index;
443        self.tmp_latest_index += 1;
444
445        if usize::from(tmp_node_index) >= self.config.config.num_nodes_with_stake.get() {
446            return Err(ServerError {
447                status: tide_disco::StatusCode::BAD_REQUEST,
448                message: "Node index getter for key pair generation has reached capacity"
449                    .to_string(),
450            });
451        }
452        Ok(tmp_node_index)
453    }
454
455    fn register_public_key(
456        &mut self,
457        pubkey: &mut Vec<u8>,
458        da_requested: bool,
459        libp2p_address: Option<Multiaddr>,
460        libp2p_public_key: Option<PeerId>,
461    ) -> Result<(u64, bool), ServerError> {
462        if self.fixed_stake_table {
463            self.register_from_list(pubkey, da_requested, libp2p_address, libp2p_public_key)
464        } else {
465            self.register_unknown(pubkey, da_requested, libp2p_address, libp2p_public_key)
466        }
467    }
468
469    fn peer_pub_ready(&self) -> Result<bool, ServerError> {
470        if !self.peer_pub_ready {
471            return Err(ServerError {
472                status: tide_disco::StatusCode::BAD_REQUEST,
473                message: "Peer's public configs are not ready".to_string(),
474            });
475        }
476        Ok(self.peer_pub_ready)
477    }
478
479    fn post_config_after_peer_collected(&mut self) -> Result<NetworkConfig<TYPES>, ServerError> {
480        if !self.peer_pub_ready {
481            return Err(ServerError {
482                status: tide_disco::StatusCode::BAD_REQUEST,
483                message: "Peer's public configs are not ready".to_string(),
484            });
485        }
486
487        Ok(self.config.clone())
488    }
489
490    fn get_start(&self) -> Result<bool, ServerError> {
491        // println!("{}", self.start);
492        if !self.start {
493            return Err(ServerError {
494                status: tide_disco::StatusCode::BAD_REQUEST,
495                message: "Network is not ready to start".to_string(),
496            });
497        }
498        Ok(self.start)
499    }
500
501    // Assumes nodes do not post 'ready' twice
502    fn post_ready(&mut self, peer_config: &PeerConfig<TYPES>) -> Result<(), ServerError> {
503        // If we have not disabled registration verification.
504        // Is this node allowed to connect?
505        if !self
506            .config
507            .config
508            .known_nodes_with_stake
509            .contains(peer_config)
510        {
511            return Err(ServerError {
512                status: tide_disco::StatusCode::FORBIDDEN,
513                message: "You are unauthorized to register with the orchestrator".to_string(),
514            });
515        }
516
517        // `HashSet::insert()` returns whether the node was newly inserted (true) or not
518        if self.nodes_connected.insert(peer_config.clone()) {
519            tracing::error!(
520                "Node {peer_config} connected. Total nodes connected: {}",
521                self.nodes_connected.len()
522            );
523        }
524
525        // i.e. nodes_connected >= num_nodes_with_stake * (start_threshold.0 / start_threshold.1)
526        if self.nodes_connected.len() as u64 * self.config.config.start_threshold.1
527            >= (self.config.config.num_nodes_with_stake.get() as u64)
528                * self.config.config.start_threshold.0
529        {
530            self.accepting_new_keys = false;
531            self.manual_start_allowed = false;
532            self.start = true;
533        }
534
535        Ok(())
536    }
537
538    /// Manually start the network
539    fn post_manual_start(&mut self, password_bytes: Vec<u8>) -> Result<(), ServerError> {
540        if !self.manual_start_allowed {
541            return Err(ServerError {
542                status: tide_disco::StatusCode::FORBIDDEN,
543                message: "Configs have already been distributed to nodes, and the network can no \
544                          longer be started manually."
545                    .to_string(),
546            });
547        }
548
549        let password = String::from_utf8(password_bytes)
550            .expect("Failed to decode raw password as UTF-8 string.");
551
552        // Check that the password matches
553        if self.config.manual_start_password != Some(password) {
554            return Err(ServerError {
555                status: tide_disco::StatusCode::FORBIDDEN,
556                message: "Incorrect password.".to_string(),
557            });
558        }
559
560        let registered_nodes_with_stake = self.config.config.known_nodes_with_stake.len();
561        let registered_da_nodes = self.config.config.known_da_nodes.len();
562
563        if registered_da_nodes > 1 {
564            self.config.config.num_nodes_with_stake =
565                std::num::NonZeroUsize::new(registered_nodes_with_stake)
566                    .expect("Failed to convert to NonZeroUsize; this should be impossible.");
567
568            self.config.config.da_staked_committee_size = registered_da_nodes;
569        } else {
570            return Err(ServerError {
571                status: tide_disco::StatusCode::FORBIDDEN,
572                message: format!(
573                    "We cannot manually start the network, because we only have \
574                     {registered_nodes_with_stake} nodes with stake registered, with \
575                     {registered_da_nodes} DA nodes."
576                ),
577            });
578        }
579
580        self.accepting_new_keys = false;
581        self.manual_start_allowed = false;
582        self.peer_pub_ready = true;
583        self.start = true;
584
585        Ok(())
586    }
587
588    // Aggregates results of the run from all nodes
589    fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError> {
590        if metrics.total_transactions_committed != 0 {
591            // Deal with the bench results
592            if self.bench_results.total_transactions_committed == 0 {
593                self.bench_results = metrics;
594            } else {
595                // Deal with the bench results from different nodes
596                let cur_metrics = self.bench_results.clone();
597                self.bench_results.avg_latency_in_sec = (metrics.avg_latency_in_sec
598                    * metrics.num_latency
599                    + cur_metrics.avg_latency_in_sec * cur_metrics.num_latency)
600                    / (metrics.num_latency + cur_metrics.num_latency);
601                self.bench_results.num_latency += metrics.num_latency;
602                self.bench_results.minimum_latency_in_sec = metrics
603                    .minimum_latency_in_sec
604                    .min(cur_metrics.minimum_latency_in_sec);
605                self.bench_results.maximum_latency_in_sec = metrics
606                    .maximum_latency_in_sec
607                    .max(cur_metrics.maximum_latency_in_sec);
608                self.bench_results.throughput_bytes_per_sec = metrics
609                    .throughput_bytes_per_sec
610                    .max(cur_metrics.throughput_bytes_per_sec);
611                self.bench_results.total_transactions_committed = metrics
612                    .total_transactions_committed
613                    .max(cur_metrics.total_transactions_committed);
614                self.bench_results.total_time_elapsed_in_sec = metrics
615                    .total_time_elapsed_in_sec
616                    .max(cur_metrics.total_time_elapsed_in_sec);
617                self.bench_results.total_num_views =
618                    metrics.total_num_views.min(cur_metrics.total_num_views);
619                self.bench_results.failed_num_views =
620                    metrics.failed_num_views.max(cur_metrics.failed_num_views);
621            }
622        }
623        self.nodes_post_results += 1;
624        if self.bench_results.partial_results == "Unset" {
625            self.bench_results.partial_results = "One".to_string();
626            self.bench_results.printout();
627            self.output_to_csv();
628        }
629        if self.bench_results.partial_results == "One"
630            && self.nodes_post_results >= (self.config.config.da_staked_committee_size as u64 / 2)
631        {
632            self.bench_results.partial_results = "HalfDA".to_string();
633            self.bench_results.printout();
634            self.output_to_csv();
635        }
636        if self.bench_results.partial_results == "HalfDA"
637            && self.nodes_post_results >= (self.config.config.num_nodes_with_stake.get() as u64 / 2)
638        {
639            self.bench_results.partial_results = "Half".to_string();
640            self.bench_results.printout();
641            self.output_to_csv();
642        }
643        if self.bench_results.partial_results != "Full"
644            && self.nodes_post_results >= (self.config.config.num_nodes_with_stake.get() as u64)
645        {
646            self.bench_results.partial_results = "Full".to_string();
647            self.bench_results.printout();
648            self.output_to_csv();
649        }
650        Ok(())
651    }
652
653    fn post_builder(&mut self, builder: Url) -> Result<(), ServerError> {
654        self.builders.push(builder);
655        Ok(())
656    }
657
658    fn get_builders(&self) -> Result<Vec<Url>, ServerError> {
659        if !matches!(self.config.builder, BuilderType::External)
660            && self.builders.len() != self.config.config.da_staked_committee_size
661        {
662            return Err(ServerError {
663                status: tide_disco::StatusCode::NOT_FOUND,
664                message: "Not all builders are registered yet".to_string(),
665            });
666        }
667        Ok(self.builders.clone())
668    }
669}
670
671/// Sets up all API routes
672#[allow(clippy::too_many_lines)]
673fn define_api<TYPES, State, VER>() -> Result<Api<State, ServerError, VER>, ApiError>
674where
675    TYPES: NodeType,
676    State: 'static + Send + Sync + ReadState + WriteState,
677    <State as ReadState>::State: Send + Sync + OrchestratorApi<TYPES>,
678    TYPES::SignatureKey: serde::Serialize,
679    VER: StaticVersionType + 'static,
680{
681    let api_toml = toml::from_str::<toml::Value>(include_str!(concat!(
682        env!("CARGO_MANIFEST_DIR"),
683        "/api.toml"
684    )))
685    .expect("API file is not valid toml");
686    let mut api = Api::<State, ServerError, VER>::new(api_toml)?;
687    api.post("post_identity", |req, state| {
688        async move {
689            // Read the bytes from the body
690            let mut body_bytes = req.body_bytes();
691            body_bytes.drain(..12);
692
693            // Decode the libp2p data so we can add to our bootstrap nodes (if supplied)
694            let Ok((libp2p_address, libp2p_public_key)) =
695                vbs::Serializer::<OrchestratorVersion>::deserialize(&body_bytes)
696            else {
697                return Err(ServerError {
698                    status: tide_disco::StatusCode::BAD_REQUEST,
699                    message: "Malformed body".to_string(),
700                });
701            };
702
703            // Call our state function to process the request
704            state.post_identity(libp2p_address, libp2p_public_key)
705        }
706        .boxed()
707    })?
708    .post("post_getconfig", |req, state| {
709        async move {
710            let node_index = req.integer_param("node_index")?;
711            state.post_getconfig(node_index)
712        }
713        .boxed()
714    })?
715    .post("get_tmp_node_index", |_req, state| {
716        async move { state.get_tmp_node_index() }.boxed()
717    })?
718    .post("post_pubkey", |req, state| {
719        async move {
720            let is_da = req.boolean_param("is_da")?;
721            // Read the bytes from the body
722            let mut body_bytes = req.body_bytes();
723            body_bytes.drain(..12);
724
725            // Decode the libp2p data so we can add to our bootstrap nodes (if supplied)
726            let Ok((mut pubkey, libp2p_address, libp2p_public_key)) =
727                vbs::Serializer::<OrchestratorVersion>::deserialize(&body_bytes)
728            else {
729                return Err(ServerError {
730                    status: tide_disco::StatusCode::BAD_REQUEST,
731                    message: "Malformed body".to_string(),
732                });
733            };
734
735            state.register_public_key(&mut pubkey, is_da, libp2p_address, libp2p_public_key)
736        }
737        .boxed()
738    })?
739    .get("peer_pubconfig_ready", |_req, state| {
740        async move { state.peer_pub_ready() }.boxed()
741    })?
742    .post("post_config_after_peer_collected", |_req, state| {
743        async move { state.post_config_after_peer_collected() }.boxed()
744    })?
745    .post(
746        "post_ready",
747        |req, state: &mut <State as ReadState>::State| {
748            async move {
749                let mut body_bytes = req.body_bytes();
750                body_bytes.drain(..12);
751                // Decode the payload-supplied pubkey
752                let Some(pubkey) = PeerConfig::<TYPES>::from_bytes(&body_bytes) else {
753                    return Err(ServerError {
754                        status: tide_disco::StatusCode::BAD_REQUEST,
755                        message: "Malformed body".to_string(),
756                    });
757                };
758                state.post_ready(&pubkey)
759            }
760            .boxed()
761        },
762    )?
763    .post(
764        "post_manual_start",
765        |req, state: &mut <State as ReadState>::State| {
766            async move {
767                let password = req.body_bytes();
768                state.post_manual_start(password)
769            }
770            .boxed()
771        },
772    )?
773    .get("get_start", |_req, state| {
774        async move { state.get_start() }.boxed()
775    })?
776    .post("post_results", |req, state| {
777        async move {
778            let metrics: Result<BenchResults, RequestError> = req.body_json();
779            state.post_run_results(metrics.unwrap())
780        }
781        .boxed()
782    })?
783    .post("post_builder", |req, state| {
784        async move {
785            // Read the bytes from the body
786            let mut body_bytes = req.body_bytes();
787            body_bytes.drain(..12);
788
789            let Ok(urls) =
790                vbs::Serializer::<OrchestratorVersion>::deserialize::<Vec<Url>>(&body_bytes)
791            else {
792                return Err(ServerError {
793                    status: tide_disco::StatusCode::BAD_REQUEST,
794                    message: "Malformed body".to_string(),
795                });
796            };
797
798            let mut futures = urls
799                .into_iter()
800                .map(|url| async {
801                    let client: surf_disco::Client<ServerError, OrchestratorVersion> =
802                        surf_disco::client::Client::builder(url.clone()).build();
803                    if client.connect(Some(Duration::from_secs(2))).await {
804                        Some(url)
805                    } else {
806                        None
807                    }
808                })
809                .collect::<FuturesUnordered<_>>()
810                .filter_map(futures::future::ready);
811
812            if let Some(url) = futures.next().await {
813                state.post_builder(url)
814            } else {
815                Err(ServerError {
816                    status: tide_disco::StatusCode::BAD_REQUEST,
817                    message: "No reachable addresses".to_string(),
818                })
819            }
820        }
821        .boxed()
822    })?
823    .get("get_builders", |_req, state| {
824        async move { state.get_builders() }.boxed()
825    })?;
826    Ok(api)
827}
828
829/// Runs the orchestrator
830/// # Errors
831/// This errors if tide disco runs into an issue during serving
832/// # Panics
833/// This panics if unable to register the api with tide disco
834pub async fn run_orchestrator<TYPES: NodeType>(
835    mut network_config: NetworkConfig<TYPES>,
836    url: Url,
837) -> io::Result<()>
838where
839    TYPES::SignatureKey: 'static + serde::Serialize,
840{
841    let env_password = std::env::var("ORCHESTRATOR_MANUAL_START_PASSWORD");
842
843    if env_password.is_ok() {
844        tracing::warn!(
845            "Took orchestrator manual start password from the environment variable: \
846             ORCHESTRATOR_MANUAL_START_PASSWORD={:?}",
847            env_password
848        );
849        network_config.manual_start_password = env_password.ok();
850    }
851
852    // Try to overwrite the network_config public keys
853    // from the file the env var points to, or panic.
854    {
855        let env_public_keys = std::env::var("ORCHESTRATOR_PUBLIC_KEYS");
856
857        if let Ok(filepath) = env_public_keys {
858            #[allow(clippy::panic)]
859            let config_file_as_string: String = fs::read_to_string(filepath.clone())
860                .unwrap_or_else(|_| panic!("Could not read config file located at {filepath}"));
861
862            let file: PublicKeysFile<TYPES> =
863                toml::from_str::<PublicKeysFile<TYPES>>(&config_file_as_string)
864                    .expect("Unable to convert config file to TOML");
865
866            network_config.public_keys = file.public_keys;
867        }
868    }
869
870    network_config.config.known_nodes_with_stake = network_config
871        .public_keys
872        .iter()
873        .map(|keys| PeerConfig {
874            stake_table_entry: keys
875                .stake_table_key
876                .stake_table_entry(U256::from(keys.stake)),
877            state_ver_key: keys.state_ver_key.clone(),
878            connect_info: keys.connect_info.clone(),
879        })
880        .collect();
881
882    network_config.config.known_da_nodes = network_config
883        .public_keys
884        .iter()
885        .filter(|keys| keys.da)
886        .map(|keys| PeerConfig {
887            stake_table_entry: keys
888                .stake_table_key
889                .stake_table_entry(U256::from(keys.stake)),
890            state_ver_key: keys.state_ver_key.clone(),
891            connect_info: keys.connect_info.clone(),
892        })
893        .collect();
894
895    let web_api = define_api().map_err(|_e| io::Error::other("Failed to define api"));
896
897    let state: RwLock<OrchestratorState<TYPES>> =
898        RwLock::new(OrchestratorState::new(network_config));
899
900    let mut app = App::<RwLock<OrchestratorState<TYPES>>, ServerError>::with_state(state);
901    app.register_module::<ServerError, OrchestratorVersion>("api", web_api.unwrap())
902        .expect("Error registering api");
903    tracing::error!("listening on {url:?}");
904    app.serve(url, ORCHESTRATOR_VERSION).await
905}