1pub mod client;
11
12use std::{
13 collections::{HashMap, HashSet},
14 fs,
15 fs::OpenOptions,
16 io,
17 time::Duration,
18};
19
20use alloy::primitives::U256;
21use async_lock::RwLock;
22use client::{BenchResults, BenchResultsDownloadConfig};
23use csv::Writer;
24use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
25use hotshot_types::{
26 PeerConfig,
27 network::{BuilderType, NetworkConfig, PublicKeysFile},
28 traits::{
29 node_implementation::NodeType,
30 signature_key::{SignatureKey, StakeTableEntryType},
31 },
32};
33use libp2p_identity::{
34 Keypair, PeerId,
35 ed25519::{Keypair as EdKeypair, SecretKey},
36};
37use multiaddr::Multiaddr;
38use surf_disco::Url;
39use tide_disco::{
40 Api, App, RequestError,
41 api::ApiError,
42 error::ServerError,
43 method::{ReadState, WriteState},
44};
45use vbs::{
46 BinarySerializer,
47 version::{StaticVersion, StaticVersionType},
48};
49
50pub const ORCHESTRATOR_MAJOR_VERSION: u16 = 0;
53pub const ORCHESTRATOR_MINOR_VERSION: u16 = 1;
55pub type OrchestratorVersion =
57 StaticVersion<ORCHESTRATOR_MAJOR_VERSION, ORCHESTRATOR_MINOR_VERSION>;
58pub const ORCHESTRATOR_VERSION: OrchestratorVersion = StaticVersion {};
60
61#[must_use]
65pub fn libp2p_generate_indexed_identity(seed: [u8; 32], index: u64) -> Keypair {
66 let mut hasher = blake3::Hasher::new();
67 hasher.update(&seed);
68 hasher.update(&index.to_le_bytes());
69 let new_seed = *hasher.finalize().as_bytes();
70 let sk_bytes = SecretKey::try_from_bytes(new_seed).unwrap();
71 <EdKeypair as From<SecretKey>>::from(sk_bytes).into()
72}
73
74#[derive(Default, Clone)]
76#[allow(clippy::struct_excessive_bools)]
77struct OrchestratorState<TYPES: NodeType> {
78 latest_index: u16,
80 tmp_latest_index: u16,
82 config: NetworkConfig<TYPES>,
84 peer_pub_ready: bool,
86 pub_posted: HashMap<Vec<u8>, (u64, bool)>,
88 start: bool,
91 nodes_connected: HashSet<PeerConfig<TYPES>>,
93 bench_results: BenchResults,
95 nodes_post_results: u64,
97 manual_start_allowed: bool,
99 accepting_new_keys: bool,
101 builders: Vec<Url>,
103 fixed_stake_table: bool,
105}
106
107impl<TYPES: NodeType> OrchestratorState<TYPES> {
108 pub fn new(network_config: NetworkConfig<TYPES>) -> Self {
110 let mut peer_pub_ready = false;
111 let mut fixed_stake_table = false;
112
113 if network_config.config.known_nodes_with_stake.is_empty() {
114 println!(
115 "No nodes were loaded from the config file. Nodes will be allowed to register \
116 dynamically."
117 );
118 } else {
119 println!("Initializing orchestrator with fixed stake table.");
120 peer_pub_ready = true;
121 fixed_stake_table = true;
122 }
123
124 let builders = if matches!(network_config.builder, BuilderType::External) {
125 network_config.config.builder_urls.clone().into()
126 } else {
127 vec![]
128 };
129
130 OrchestratorState {
131 latest_index: 0,
132 tmp_latest_index: 0,
133 config: network_config,
134 peer_pub_ready,
135 pub_posted: HashMap::new(),
136 nodes_connected: HashSet::new(),
137 start: false,
138 bench_results: BenchResults::default(),
139 nodes_post_results: 0,
140 manual_start_allowed: true,
141 accepting_new_keys: true,
142 builders,
143 fixed_stake_table,
144 }
145 }
146
147 pub fn output_to_csv(&self) {
149 let output_csv = BenchResultsDownloadConfig {
150 commit_sha: self.config.commit_sha.clone(),
151 total_nodes: self.config.config.num_nodes_with_stake.into(),
152 da_committee_size: self.config.config.da_staked_committee_size,
153 fixed_leader_for_gpuvid: self.config.config.fixed_leader_for_gpuvid,
154 transactions_per_round: self.config.transactions_per_round,
155 transaction_size: self.bench_results.transaction_size_in_bytes,
156 rounds: self.config.rounds,
157 partial_results: self.bench_results.partial_results.clone(),
158 avg_latency_in_sec: self.bench_results.avg_latency_in_sec,
159 minimum_latency_in_sec: self.bench_results.minimum_latency_in_sec,
160 maximum_latency_in_sec: self.bench_results.maximum_latency_in_sec,
161 throughput_bytes_per_sec: self.bench_results.throughput_bytes_per_sec,
162 total_transactions_committed: self.bench_results.total_transactions_committed,
163 total_time_elapsed_in_sec: self.bench_results.total_time_elapsed_in_sec,
164 total_num_views: self.bench_results.total_num_views,
165 failed_num_views: self.bench_results.failed_num_views,
166 committee_type: self.bench_results.committee_type.clone(),
167 };
168 let results_csv_file = OpenOptions::new()
170 .create(true)
171 .append(true) .open("scripts/benchmarks_results/results.csv")
173 .unwrap();
174 let mut wtr = Writer::from_writer(results_csv_file);
176 let _ = wtr.serialize(output_csv);
177 let _ = wtr.flush();
178 println!("Results successfully saved in scripts/benchmarks_results/results.csv");
179 }
180}
181
182pub trait OrchestratorApi<TYPES: NodeType> {
184 fn post_identity(
189 &mut self,
190 libp2p_address: Option<Multiaddr>,
191 libp2p_public_key: Option<PeerId>,
192 ) -> Result<u16, ServerError>;
193 fn post_getconfig(&mut self, _node_index: u16) -> Result<NetworkConfig<TYPES>, ServerError>;
197 fn get_tmp_node_index(&mut self) -> Result<u16, ServerError>;
201 fn register_public_key(
205 &mut self,
206 pubkey: &mut Vec<u8>,
207 is_da: bool,
208 libp2p_address: Option<Multiaddr>,
209 libp2p_public_key: Option<PeerId>,
210 ) -> Result<(u64, bool), ServerError>;
211 fn peer_pub_ready(&self) -> Result<bool, ServerError>;
215 fn post_config_after_peer_collected(&mut self) -> Result<NetworkConfig<TYPES>, ServerError>;
219 fn get_start(&self) -> Result<bool, ServerError>;
223 fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError>;
227 fn post_ready(&mut self, peer_config: &PeerConfig<TYPES>) -> Result<(), ServerError>;
231 fn post_manual_start(&mut self, password_bytes: Vec<u8>) -> Result<(), ServerError>;
235 fn post_builder(&mut self, builder: Url) -> Result<(), ServerError>;
239 fn get_builders(&self) -> Result<Vec<Url>, ServerError>;
243}
244
245impl<TYPES: NodeType> OrchestratorState<TYPES>
246where
247 TYPES::SignatureKey: serde::Serialize + Clone + SignatureKey + 'static,
248{
249 fn register_unknown(
252 &mut self,
253 pubkey: &mut Vec<u8>,
254 da_requested: bool,
255 libp2p_address: Option<Multiaddr>,
256 libp2p_public_key: Option<PeerId>,
257 ) -> Result<(u64, bool), ServerError> {
258 if let Some((node_index, is_da)) = self.pub_posted.get(pubkey) {
259 return Ok((*node_index, *is_da));
260 }
261
262 if !self.accepting_new_keys {
263 return Err(ServerError {
264 status: tide_disco::StatusCode::FORBIDDEN,
265 message: "Network has been started manually, and is no longer registering new \
266 keys."
267 .to_string(),
268 });
269 }
270
271 let node_index = self.pub_posted.len() as u64;
272
273 let staked_pubkey = PeerConfig::<TYPES>::from_bytes(pubkey).unwrap();
275
276 self.config
277 .config
278 .known_nodes_with_stake
279 .push(staked_pubkey.clone());
280
281 let mut added_to_da = false;
282
283 let da_full =
284 self.config.config.known_da_nodes.len() >= self.config.config.da_staked_committee_size;
285
286 #[allow(clippy::nonminimal_bool)]
287 if (self.config.indexed_da || (!self.config.indexed_da && da_requested)) && !da_full {
295 self.config.config.known_da_nodes.push(staked_pubkey);
296 added_to_da = true;
297 }
298
299 self.pub_posted
300 .insert(pubkey.clone(), (node_index, added_to_da));
301
302 if self.config.libp2p_config.clone().is_some()
305 && let (Some(libp2p_public_key), Some(libp2p_address)) =
306 (libp2p_public_key, libp2p_address)
307 {
308 self.config
310 .libp2p_config
311 .as_mut()
312 .unwrap()
313 .bootstrap_nodes
314 .push((libp2p_public_key, libp2p_address));
315 }
316
317 tracing::error!("Posted public key for node_index {node_index}");
318
319 if node_index + 1 >= (self.config.config.num_nodes_with_stake.get() as u64) {
322 self.peer_pub_ready = true;
323 self.accepting_new_keys = false;
324 }
325 Ok((node_index, added_to_da))
326 }
327
328 fn register_from_list(
330 &mut self,
331 pubkey: &mut Vec<u8>,
332 da_requested: bool,
333 libp2p_address: Option<Multiaddr>,
334 libp2p_public_key: Option<PeerId>,
335 ) -> Result<(u64, bool), ServerError> {
336 if let Some((node_index, is_da)) = self.pub_posted.get(pubkey) {
338 return Ok((*node_index, *is_da));
339 }
340
341 let staked_pubkey = PeerConfig::<TYPES>::from_bytes(pubkey).unwrap();
343
344 let Some((node_index, node_config)) =
346 self.config.public_keys.iter().enumerate().find(|keys| {
347 keys.1.stake_table_key == staked_pubkey.stake_table_entry.public_key()
348 })
349 else {
350 return Err(ServerError {
351 status: tide_disco::StatusCode::FORBIDDEN,
352 message: "You are unauthorized to register with the orchestrator".to_string(),
353 });
354 };
355
356 if node_config.da != da_requested {
358 return Err(ServerError {
359 status: tide_disco::StatusCode::BAD_REQUEST,
360 message: format!(
361 "Mismatch in DA status in registration for node {}. DA requested: {}, \
362 expected: {}",
363 node_index, da_requested, node_config.da
364 ),
365 });
366 }
367
368 let added_to_da = node_config.da;
369
370 self.pub_posted
371 .insert(pubkey.clone(), (node_index as u64, added_to_da));
372
373 if self.config.libp2p_config.clone().is_some()
376 && let (Some(libp2p_public_key), Some(libp2p_address)) =
377 (libp2p_public_key, libp2p_address)
378 {
379 self.config
381 .libp2p_config
382 .as_mut()
383 .unwrap()
384 .bootstrap_nodes
385 .push((libp2p_public_key, libp2p_address));
386 }
387
388 tracing::error!("Node {node_index} has registered.");
389
390 Ok((node_index as u64, added_to_da))
391 }
392}
393
394impl<TYPES: NodeType> OrchestratorApi<TYPES> for OrchestratorState<TYPES>
395where
396 TYPES::SignatureKey: serde::Serialize + Clone + SignatureKey + 'static,
397{
398 fn post_identity(
403 &mut self,
404 libp2p_address: Option<Multiaddr>,
405 libp2p_public_key: Option<PeerId>,
406 ) -> Result<u16, ServerError> {
407 let node_index = self.latest_index;
408 self.latest_index += 1;
409
410 if usize::from(node_index) >= self.config.config.num_nodes_with_stake.get() {
411 return Err(ServerError {
412 status: tide_disco::StatusCode::BAD_REQUEST,
413 message: "Network has reached capacity".to_string(),
414 });
415 }
416
417 if self.config.libp2p_config.clone().is_some()
420 && let (Some(libp2p_public_key), Some(libp2p_address)) =
421 (libp2p_public_key, libp2p_address)
422 {
423 self.config
425 .libp2p_config
426 .as_mut()
427 .unwrap()
428 .bootstrap_nodes
429 .push((libp2p_public_key, libp2p_address));
430 }
431 Ok(node_index)
432 }
433
434 fn post_getconfig(&mut self, _node_index: u16) -> Result<NetworkConfig<TYPES>, ServerError> {
437 Ok(self.config.clone())
438 }
439
440 fn get_tmp_node_index(&mut self) -> Result<u16, ServerError> {
442 let tmp_node_index = self.tmp_latest_index;
443 self.tmp_latest_index += 1;
444
445 if usize::from(tmp_node_index) >= self.config.config.num_nodes_with_stake.get() {
446 return Err(ServerError {
447 status: tide_disco::StatusCode::BAD_REQUEST,
448 message: "Node index getter for key pair generation has reached capacity"
449 .to_string(),
450 });
451 }
452 Ok(tmp_node_index)
453 }
454
455 fn register_public_key(
456 &mut self,
457 pubkey: &mut Vec<u8>,
458 da_requested: bool,
459 libp2p_address: Option<Multiaddr>,
460 libp2p_public_key: Option<PeerId>,
461 ) -> Result<(u64, bool), ServerError> {
462 if self.fixed_stake_table {
463 self.register_from_list(pubkey, da_requested, libp2p_address, libp2p_public_key)
464 } else {
465 self.register_unknown(pubkey, da_requested, libp2p_address, libp2p_public_key)
466 }
467 }
468
469 fn peer_pub_ready(&self) -> Result<bool, ServerError> {
470 if !self.peer_pub_ready {
471 return Err(ServerError {
472 status: tide_disco::StatusCode::BAD_REQUEST,
473 message: "Peer's public configs are not ready".to_string(),
474 });
475 }
476 Ok(self.peer_pub_ready)
477 }
478
479 fn post_config_after_peer_collected(&mut self) -> Result<NetworkConfig<TYPES>, ServerError> {
480 if !self.peer_pub_ready {
481 return Err(ServerError {
482 status: tide_disco::StatusCode::BAD_REQUEST,
483 message: "Peer's public configs are not ready".to_string(),
484 });
485 }
486
487 Ok(self.config.clone())
488 }
489
490 fn get_start(&self) -> Result<bool, ServerError> {
491 if !self.start {
493 return Err(ServerError {
494 status: tide_disco::StatusCode::BAD_REQUEST,
495 message: "Network is not ready to start".to_string(),
496 });
497 }
498 Ok(self.start)
499 }
500
501 fn post_ready(&mut self, peer_config: &PeerConfig<TYPES>) -> Result<(), ServerError> {
503 if !self
506 .config
507 .config
508 .known_nodes_with_stake
509 .contains(peer_config)
510 {
511 return Err(ServerError {
512 status: tide_disco::StatusCode::FORBIDDEN,
513 message: "You are unauthorized to register with the orchestrator".to_string(),
514 });
515 }
516
517 if self.nodes_connected.insert(peer_config.clone()) {
519 tracing::error!(
520 "Node {peer_config} connected. Total nodes connected: {}",
521 self.nodes_connected.len()
522 );
523 }
524
525 if self.nodes_connected.len() as u64 * self.config.config.start_threshold.1
527 >= (self.config.config.num_nodes_with_stake.get() as u64)
528 * self.config.config.start_threshold.0
529 {
530 self.accepting_new_keys = false;
531 self.manual_start_allowed = false;
532 self.start = true;
533 }
534
535 Ok(())
536 }
537
538 fn post_manual_start(&mut self, password_bytes: Vec<u8>) -> Result<(), ServerError> {
540 if !self.manual_start_allowed {
541 return Err(ServerError {
542 status: tide_disco::StatusCode::FORBIDDEN,
543 message: "Configs have already been distributed to nodes, and the network can no \
544 longer be started manually."
545 .to_string(),
546 });
547 }
548
549 let password = String::from_utf8(password_bytes)
550 .expect("Failed to decode raw password as UTF-8 string.");
551
552 if self.config.manual_start_password != Some(password) {
554 return Err(ServerError {
555 status: tide_disco::StatusCode::FORBIDDEN,
556 message: "Incorrect password.".to_string(),
557 });
558 }
559
560 let registered_nodes_with_stake = self.config.config.known_nodes_with_stake.len();
561 let registered_da_nodes = self.config.config.known_da_nodes.len();
562
563 if registered_da_nodes > 1 {
564 self.config.config.num_nodes_with_stake =
565 std::num::NonZeroUsize::new(registered_nodes_with_stake)
566 .expect("Failed to convert to NonZeroUsize; this should be impossible.");
567
568 self.config.config.da_staked_committee_size = registered_da_nodes;
569 } else {
570 return Err(ServerError {
571 status: tide_disco::StatusCode::FORBIDDEN,
572 message: format!(
573 "We cannot manually start the network, because we only have \
574 {registered_nodes_with_stake} nodes with stake registered, with \
575 {registered_da_nodes} DA nodes."
576 ),
577 });
578 }
579
580 self.accepting_new_keys = false;
581 self.manual_start_allowed = false;
582 self.peer_pub_ready = true;
583 self.start = true;
584
585 Ok(())
586 }
587
588 fn post_run_results(&mut self, metrics: BenchResults) -> Result<(), ServerError> {
590 if metrics.total_transactions_committed != 0 {
591 if self.bench_results.total_transactions_committed == 0 {
593 self.bench_results = metrics;
594 } else {
595 let cur_metrics = self.bench_results.clone();
597 self.bench_results.avg_latency_in_sec = (metrics.avg_latency_in_sec
598 * metrics.num_latency
599 + cur_metrics.avg_latency_in_sec * cur_metrics.num_latency)
600 / (metrics.num_latency + cur_metrics.num_latency);
601 self.bench_results.num_latency += metrics.num_latency;
602 self.bench_results.minimum_latency_in_sec = metrics
603 .minimum_latency_in_sec
604 .min(cur_metrics.minimum_latency_in_sec);
605 self.bench_results.maximum_latency_in_sec = metrics
606 .maximum_latency_in_sec
607 .max(cur_metrics.maximum_latency_in_sec);
608 self.bench_results.throughput_bytes_per_sec = metrics
609 .throughput_bytes_per_sec
610 .max(cur_metrics.throughput_bytes_per_sec);
611 self.bench_results.total_transactions_committed = metrics
612 .total_transactions_committed
613 .max(cur_metrics.total_transactions_committed);
614 self.bench_results.total_time_elapsed_in_sec = metrics
615 .total_time_elapsed_in_sec
616 .max(cur_metrics.total_time_elapsed_in_sec);
617 self.bench_results.total_num_views =
618 metrics.total_num_views.min(cur_metrics.total_num_views);
619 self.bench_results.failed_num_views =
620 metrics.failed_num_views.max(cur_metrics.failed_num_views);
621 }
622 }
623 self.nodes_post_results += 1;
624 if self.bench_results.partial_results == "Unset" {
625 self.bench_results.partial_results = "One".to_string();
626 self.bench_results.printout();
627 self.output_to_csv();
628 }
629 if self.bench_results.partial_results == "One"
630 && self.nodes_post_results >= (self.config.config.da_staked_committee_size as u64 / 2)
631 {
632 self.bench_results.partial_results = "HalfDA".to_string();
633 self.bench_results.printout();
634 self.output_to_csv();
635 }
636 if self.bench_results.partial_results == "HalfDA"
637 && self.nodes_post_results >= (self.config.config.num_nodes_with_stake.get() as u64 / 2)
638 {
639 self.bench_results.partial_results = "Half".to_string();
640 self.bench_results.printout();
641 self.output_to_csv();
642 }
643 if self.bench_results.partial_results != "Full"
644 && self.nodes_post_results >= (self.config.config.num_nodes_with_stake.get() as u64)
645 {
646 self.bench_results.partial_results = "Full".to_string();
647 self.bench_results.printout();
648 self.output_to_csv();
649 }
650 Ok(())
651 }
652
653 fn post_builder(&mut self, builder: Url) -> Result<(), ServerError> {
654 self.builders.push(builder);
655 Ok(())
656 }
657
658 fn get_builders(&self) -> Result<Vec<Url>, ServerError> {
659 if !matches!(self.config.builder, BuilderType::External)
660 && self.builders.len() != self.config.config.da_staked_committee_size
661 {
662 return Err(ServerError {
663 status: tide_disco::StatusCode::NOT_FOUND,
664 message: "Not all builders are registered yet".to_string(),
665 });
666 }
667 Ok(self.builders.clone())
668 }
669}
670
671#[allow(clippy::too_many_lines)]
673fn define_api<TYPES, State, VER>() -> Result<Api<State, ServerError, VER>, ApiError>
674where
675 TYPES: NodeType,
676 State: 'static + Send + Sync + ReadState + WriteState,
677 <State as ReadState>::State: Send + Sync + OrchestratorApi<TYPES>,
678 TYPES::SignatureKey: serde::Serialize,
679 VER: StaticVersionType + 'static,
680{
681 let api_toml = toml::from_str::<toml::Value>(include_str!(concat!(
682 env!("CARGO_MANIFEST_DIR"),
683 "/api.toml"
684 )))
685 .expect("API file is not valid toml");
686 let mut api = Api::<State, ServerError, VER>::new(api_toml)?;
687 api.post("post_identity", |req, state| {
688 async move {
689 let mut body_bytes = req.body_bytes();
691 body_bytes.drain(..12);
692
693 let Ok((libp2p_address, libp2p_public_key)) =
695 vbs::Serializer::<OrchestratorVersion>::deserialize(&body_bytes)
696 else {
697 return Err(ServerError {
698 status: tide_disco::StatusCode::BAD_REQUEST,
699 message: "Malformed body".to_string(),
700 });
701 };
702
703 state.post_identity(libp2p_address, libp2p_public_key)
705 }
706 .boxed()
707 })?
708 .post("post_getconfig", |req, state| {
709 async move {
710 let node_index = req.integer_param("node_index")?;
711 state.post_getconfig(node_index)
712 }
713 .boxed()
714 })?
715 .post("get_tmp_node_index", |_req, state| {
716 async move { state.get_tmp_node_index() }.boxed()
717 })?
718 .post("post_pubkey", |req, state| {
719 async move {
720 let is_da = req.boolean_param("is_da")?;
721 let mut body_bytes = req.body_bytes();
723 body_bytes.drain(..12);
724
725 let Ok((mut pubkey, libp2p_address, libp2p_public_key)) =
727 vbs::Serializer::<OrchestratorVersion>::deserialize(&body_bytes)
728 else {
729 return Err(ServerError {
730 status: tide_disco::StatusCode::BAD_REQUEST,
731 message: "Malformed body".to_string(),
732 });
733 };
734
735 state.register_public_key(&mut pubkey, is_da, libp2p_address, libp2p_public_key)
736 }
737 .boxed()
738 })?
739 .get("peer_pubconfig_ready", |_req, state| {
740 async move { state.peer_pub_ready() }.boxed()
741 })?
742 .post("post_config_after_peer_collected", |_req, state| {
743 async move { state.post_config_after_peer_collected() }.boxed()
744 })?
745 .post(
746 "post_ready",
747 |req, state: &mut <State as ReadState>::State| {
748 async move {
749 let mut body_bytes = req.body_bytes();
750 body_bytes.drain(..12);
751 let Some(pubkey) = PeerConfig::<TYPES>::from_bytes(&body_bytes) else {
753 return Err(ServerError {
754 status: tide_disco::StatusCode::BAD_REQUEST,
755 message: "Malformed body".to_string(),
756 });
757 };
758 state.post_ready(&pubkey)
759 }
760 .boxed()
761 },
762 )?
763 .post(
764 "post_manual_start",
765 |req, state: &mut <State as ReadState>::State| {
766 async move {
767 let password = req.body_bytes();
768 state.post_manual_start(password)
769 }
770 .boxed()
771 },
772 )?
773 .get("get_start", |_req, state| {
774 async move { state.get_start() }.boxed()
775 })?
776 .post("post_results", |req, state| {
777 async move {
778 let metrics: Result<BenchResults, RequestError> = req.body_json();
779 state.post_run_results(metrics.unwrap())
780 }
781 .boxed()
782 })?
783 .post("post_builder", |req, state| {
784 async move {
785 let mut body_bytes = req.body_bytes();
787 body_bytes.drain(..12);
788
789 let Ok(urls) =
790 vbs::Serializer::<OrchestratorVersion>::deserialize::<Vec<Url>>(&body_bytes)
791 else {
792 return Err(ServerError {
793 status: tide_disco::StatusCode::BAD_REQUEST,
794 message: "Malformed body".to_string(),
795 });
796 };
797
798 let mut futures = urls
799 .into_iter()
800 .map(|url| async {
801 let client: surf_disco::Client<ServerError, OrchestratorVersion> =
802 surf_disco::client::Client::builder(url.clone()).build();
803 if client.connect(Some(Duration::from_secs(2))).await {
804 Some(url)
805 } else {
806 None
807 }
808 })
809 .collect::<FuturesUnordered<_>>()
810 .filter_map(futures::future::ready);
811
812 if let Some(url) = futures.next().await {
813 state.post_builder(url)
814 } else {
815 Err(ServerError {
816 status: tide_disco::StatusCode::BAD_REQUEST,
817 message: "No reachable addresses".to_string(),
818 })
819 }
820 }
821 .boxed()
822 })?
823 .get("get_builders", |_req, state| {
824 async move { state.get_builders() }.boxed()
825 })?;
826 Ok(api)
827}
828
829pub async fn run_orchestrator<TYPES: NodeType>(
835 mut network_config: NetworkConfig<TYPES>,
836 url: Url,
837) -> io::Result<()>
838where
839 TYPES::SignatureKey: 'static + serde::Serialize,
840{
841 let env_password = std::env::var("ORCHESTRATOR_MANUAL_START_PASSWORD");
842
843 if env_password.is_ok() {
844 tracing::warn!(
845 "Took orchestrator manual start password from the environment variable: \
846 ORCHESTRATOR_MANUAL_START_PASSWORD={:?}",
847 env_password
848 );
849 network_config.manual_start_password = env_password.ok();
850 }
851
852 {
855 let env_public_keys = std::env::var("ORCHESTRATOR_PUBLIC_KEYS");
856
857 if let Ok(filepath) = env_public_keys {
858 #[allow(clippy::panic)]
859 let config_file_as_string: String = fs::read_to_string(filepath.clone())
860 .unwrap_or_else(|_| panic!("Could not read config file located at {filepath}"));
861
862 let file: PublicKeysFile<TYPES> =
863 toml::from_str::<PublicKeysFile<TYPES>>(&config_file_as_string)
864 .expect("Unable to convert config file to TOML");
865
866 network_config.public_keys = file.public_keys;
867 }
868 }
869
870 network_config.config.known_nodes_with_stake = network_config
871 .public_keys
872 .iter()
873 .map(|keys| PeerConfig {
874 stake_table_entry: keys
875 .stake_table_key
876 .stake_table_entry(U256::from(keys.stake)),
877 state_ver_key: keys.state_ver_key.clone(),
878 connect_info: keys.connect_info.clone(),
879 })
880 .collect();
881
882 network_config.config.known_da_nodes = network_config
883 .public_keys
884 .iter()
885 .filter(|keys| keys.da)
886 .map(|keys| PeerConfig {
887 stake_table_entry: keys
888 .stake_table_key
889 .stake_table_entry(U256::from(keys.stake)),
890 state_ver_key: keys.state_ver_key.clone(),
891 connect_info: keys.connect_info.clone(),
892 })
893 .collect();
894
895 let web_api = define_api().map_err(|_e| io::Error::other("Failed to define api"));
896
897 let state: RwLock<OrchestratorState<TYPES>> =
898 RwLock::new(OrchestratorState::new(network_config));
899
900 let mut app = App::<RwLock<OrchestratorState<TYPES>>, ServerError>::with_state(state);
901 app.register_module::<ServerError, OrchestratorVersion>("api", web_api.unwrap())
902 .expect("Error registering api");
903 tracing::error!("listening on {url:?}");
904 app.serve(url, ORCHESTRATOR_VERSION).await
905}