1use anyhow::Context;
2use clap::Parser;
3use espresso_types::traits::{NullEventConsumer, SequencerPersistence};
4use futures::future::FutureExt;
5use hotshot_types::traits::metrics::NoMetrics;
6
7use super::{
8 Genesis, L1Params, NetworkParams,
9 api::{self, data_source::DataSourceOptions},
10 context::SequencerContext,
11 init_node, network,
12 options::{Modules, Options, PublicNodeConfig},
13 persistence,
14};
15use crate::keyset::KeySet;
16
17pub async fn main(migrated_envs: Vec<(&str, &str)>) -> anyhow::Result<()> {
18 let opt = Options::parse();
19 opt.logging.init();
20 espresso_utils::env_compat::log_migrated_env_vars(&migrated_envs);
21
22 let mut modules = opt.modules();
23 tracing::warn!(?modules, "sequencer starting up");
24
25 let public_node_config = PublicNodeConfig::new(&opt, &modules);
26
27 let genesis = Genesis::from_file(&opt.genesis_file)?;
28 tracing::warn!(?genesis, "genesis");
29
30 if let Some(storage) = modules.storage_fs.take() {
31 run_with_storage(genesis, modules, opt, storage, public_node_config).await
32 } else if let Some(storage) = modules.storage_sql.take() {
33 run_with_storage(genesis, modules, opt, storage, public_node_config).await
34 } else {
35 run_with_storage(
37 genesis,
38 modules,
39 opt,
40 persistence::fs::Options::default(),
41 public_node_config,
42 )
43 .await
44 }
45}
46
47async fn run_with_storage<S>(
48 genesis: Genesis,
49 modules: Modules,
50 opt: Options,
51 storage_opt: S,
52 public_node_config: PublicNodeConfig,
53) -> anyhow::Result<()>
54where
55 S: DataSourceOptions,
56{
57 let ctx = init_with_storage(genesis, modules, opt, storage_opt, public_node_config).await?;
58
59 ctx.start_consensus().await;
61 ctx.join().await;
62
63 Ok(())
64}
65
66pub async fn init_with_storage<S>(
67 genesis: Genesis,
68 modules: Modules,
69 opt: Options,
70 mut storage_opt: S,
71 public_node_config: PublicNodeConfig,
72) -> anyhow::Result<SequencerContext<network::Production, S::Persistence>>
73where
74 S: DataSourceOptions,
75{
76 let KeySet {
77 staking,
78 state,
79 x25519,
80 } = opt.key_set.try_into()?;
81 let l1_params = L1Params {
82 urls: opt.l1_provider_url,
83 options: opt.l1_options,
84 };
85
86 let network_params = NetworkParams {
87 cdn_endpoint: opt.cdn_endpoint,
88 cliquenet_bind_addr: opt.cliquenet_bind_address,
89 cliquenet_advertise_addr: opt.cliquenet_advertise_address,
90 x25519_secret_key: x25519,
91 libp2p_advertise_address: opt.libp2p_advertise_address,
92 libp2p_bind_address: opt.libp2p_bind_address,
93 libp2p_bootstrap_nodes: opt.libp2p_bootstrap_nodes,
94 orchestrator_url: opt.orchestrator_url,
95 builder_urls: opt.builder_urls,
96 state_relay_server_url: opt.state_relay_server_url,
97 public_api_url: opt.public_api_url,
98 private_staking_key: staking,
99 private_state_key: state,
100 state_peers: opt.state_peers,
101 config_peers: opt.config_peers,
102 catchup_backoff: opt.catchup_backoff,
103 catchup_base_timeout: opt.catchup_base_timeout,
104 local_catchup_timeout: opt.local_catchup_timeout,
105 bootstrap_epoch_catchup_timeout: opt.bootstrap_epoch_catchup_timeout,
106 new_protocol_consensus_gc_interval: opt.new_protocol_consensus_gc_interval,
107 libp2p_history_gossip: opt.libp2p_history_gossip,
108 libp2p_history_length: opt.libp2p_history_length,
109 libp2p_max_ihave_length: opt.libp2p_max_ihave_length,
110 libp2p_max_ihave_messages: opt.libp2p_max_ihave_messages,
111 libp2p_max_gossip_transmit_size: opt.libp2p_max_gossip_transmit_size,
112 libp2p_max_direct_transmit_size: opt.libp2p_max_direct_transmit_size,
113 libp2p_mesh_outbound_min: opt.libp2p_mesh_outbound_min,
114 libp2p_mesh_n: opt.libp2p_mesh_n,
115 libp2p_mesh_n_high: opt.libp2p_mesh_n_high,
116 libp2p_heartbeat_interval: opt.libp2p_heartbeat_interval,
117 libp2p_mesh_n_low: opt.libp2p_mesh_n_low,
118 libp2p_published_message_ids_cache_time: opt.libp2p_published_message_ids_cache_time,
119 libp2p_iwant_followup_time: opt.libp2p_iwant_followup_time,
120 libp2p_max_messages_per_rpc: opt.libp2p_max_messages_per_rpc,
121 libp2p_gossip_retransmission: opt.libp2p_gossip_retransmission,
122 libp2p_flood_publish: opt.libp2p_flood_publish,
123 libp2p_duplicate_cache_time: opt.libp2p_duplicate_cache_time,
124 libp2p_fanout_ttl: opt.libp2p_fanout_ttl,
125 libp2p_heartbeat_initial_delay: opt.libp2p_heartbeat_initial_delay,
126 libp2p_gossip_factor: opt.libp2p_gossip_factor,
127 libp2p_gossip_lazy: opt.libp2p_gossip_lazy,
128 };
129
130 let proposal_fetcher_config = opt.proposal_fetcher_config;
131
132 let persistence = storage_opt.create().await?;
133 persistence
134 .migrate_storage()
135 .await
136 .context("failed to migrate consensus data")?;
137
138 let ctx = match modules.http {
142 Some(http_opt) => {
143 let mut http_opt = api::Options::from(http_opt);
145 if let Some(query) = modules.query {
146 http_opt = storage_opt.enable_query_module(http_opt, query);
147 }
148 if let Some(submit) = modules.submit {
149 http_opt = http_opt.submit(submit);
150 }
151 if let Some(status) = modules.status {
152 http_opt = http_opt.status(status);
153 }
154
155 if let Some(catchup) = modules.catchup {
156 http_opt = http_opt.catchup(catchup);
157 }
158 if let Some(hotshot_events) = modules.hotshot_events {
159 http_opt = http_opt.hotshot_events(hotshot_events);
160 }
161 if let Some(explorer) = modules.explorer {
162 http_opt = http_opt.explorer(explorer);
163 }
164 if let Some(light_client) = modules.light_client {
165 http_opt = http_opt.light_client(light_client);
166 }
167 if let Some(config) = modules.config {
168 http_opt = http_opt
169 .config(config)
170 .public_node_config(public_node_config);
171 }
172
173 http_opt
174 .serve(move |metrics, consumer, storage| {
175 async move {
176 init_node(
177 genesis,
178 network_params,
179 metrics,
180 persistence,
181 l1_params,
182 storage,
183 consumer,
184 opt.is_da,
185 opt.identity,
186 proposal_fetcher_config,
187 )
188 .await
189 }
190 .boxed()
191 })
192 .await?
193 },
194 None => {
195 init_node(
196 genesis,
197 network_params,
198 Box::new(NoMetrics),
199 persistence,
200 l1_params,
201 None,
202 NullEventConsumer,
203 opt.is_da,
204 opt.identity,
205 proposal_fetcher_config,
206 )
207 .await?
208 },
209 };
210
211 Ok(ctx)
212}
213
214#[cfg(test)]
215mod test {
216 use std::time::Duration;
217
218 use espresso_types::PubKey;
219 use hotshot_types::{light_client::StateKeyPair, traits::signature_key::SignatureKey, x25519};
220 use surf_disco::{Client, Url, error::ClientError};
221 use tagged_base64::TaggedBase64;
222 use tempfile::TempDir;
223 use test_utils::reserve_tcp_port;
224 use tokio::spawn;
225 use vbs::version::Version;
226
227 use super::*;
228 use crate::{
229 SequencerApiVersion,
230 api::options::Http,
231 genesis::{L1Finalized, StakeTableConfig},
232 persistence::fs,
233 };
234
235 #[test_log::test(tokio::test(flavor = "multi_thread"))]
236 async fn test_startup_before_orchestrator() {
237 let (pub_key, priv_key) = PubKey::generated_from_seed_indexed([0; 32], 0);
238 let state_key = StateKeyPair::generate_from_seed_indexed([0; 32], 0);
239 let x25519_kp = x25519::Keypair::generate().unwrap();
240
241 let port1 = reserve_tcp_port().expect("OS should have ephemeral ports available");
242 let port2 = reserve_tcp_port().expect("OS should have ephemeral ports available");
243 let tmp = TempDir::new().unwrap();
244
245 let genesis_file = tmp.path().join("genesis.toml");
246 let genesis = Genesis {
247 chain_config: Default::default(),
248 stake_table: StakeTableConfig { capacity: 10 },
249 accounts: Default::default(),
250 l1_finalized: L1Finalized::Number { number: 0 },
251 header: Default::default(),
252 upgrades: Default::default(),
253 base_version: Version { major: 0, minor: 1 },
254 upgrade_version: Version { major: 0, minor: 2 },
255 epoch_height: None,
256 drb_difficulty: None,
257 drb_upgrade_difficulty: None,
258 epoch_start_block: None,
259 stake_table_capacity: None,
260 genesis_version: Version { major: 0, minor: 1 },
261 da_committees: None,
262 };
263 genesis.to_file(&genesis_file).unwrap();
264
265 let modules = Modules {
266 http: Some(Http::with_port(port1)),
267 query: Some(Default::default()),
268 storage_fs: Some(fs::Options::new(tmp.path().into())),
269 config: Some(Default::default()),
270 ..Default::default()
271 };
272 let opt = Options::parse_from([
273 "sequencer",
274 "--private-staking-key",
275 &priv_key.to_tagged_base64().expect("valid key").to_string(),
276 "--private-state-key",
277 &state_key
278 .sign_key_ref()
279 .to_tagged_base64()
280 .expect("valid key")
281 .to_string(),
282 "--private-x25519-key",
283 &TaggedBase64::try_from(x25519_kp.secret_key())
284 .expect("valid key")
285 .to_string(),
286 "--cliquenet-bind-address",
287 &format!("127.0.0.1:{port2}"),
288 "--libp2p-advertise-address",
292 "127.0.0.1:0",
293 "--genesis-file",
294 &genesis_file.display().to_string(),
295 ]);
296
297 tracing::info!(port = %port1, "starting sequencer");
301 let public_node_config = PublicNodeConfig::new(&opt, &modules);
302 let task = spawn(async move {
303 if let Err(err) = init_with_storage(
304 genesis,
305 modules,
306 opt,
307 fs::Options::new(tmp.path().into()),
308 public_node_config,
309 )
310 .await
311 {
312 tracing::error!("failed to start sequencer: {err:#}");
313 }
314 });
315
316 tracing::info!("waiting for API to start");
319 let url: Url = format!("http://localhost:{port1}").parse().unwrap();
320 let client = Client::<ClientError, SequencerApiVersion>::new(url.clone());
321 assert!(client.connect(Some(Duration::from_secs(60))).await);
322 client.get::<()>("healthcheck").send().await.unwrap();
323
324 let res = reqwest::get(url.join("/status/metrics").unwrap())
327 .await
328 .unwrap();
329 assert!(res.status().is_success(), "{}", res.status());
330 let metrics = res.text().await.unwrap();
331 let lines = metrics.lines().collect::<Vec<_>>();
332 assert!(
333 lines.contains(&format!("consensus_node{{key=\"{pub_key}\"}} 1").as_str()),
334 "{lines:#?}"
335 );
336 assert!(
337 lines.contains(
338 &format!(
339 "consensus_version{{desc=\"{}\",rev=\"{}\",timestamp=\"{}\"}} 1",
340 espresso_utils::build_info::GIT_DESCRIBE,
341 espresso_utils::build_info::GIT_SHA,
342 espresso_utils::build_info::GIT_COMMIT_TIMESTAMP,
343 )
344 .as_str()
345 ),
346 "{lines:#?}"
347 );
348 let build_info_line = lines
349 .iter()
350 .find(|l| l.starts_with("consensus_build_info{"));
351 assert!(
352 build_info_line.is_some(),
353 "missing consensus_build_info metric: {lines:#?}"
354 );
355 let build_info_line = build_info_line.unwrap();
356 assert!(
357 build_info_line.contains("modified="),
358 "expected modified= in build_info: {lines:#?}"
359 );
360 assert!(
361 build_info_line.contains("features="),
362 "expected features= in build_info: {lines:#?}"
363 );
364 assert!(
365 build_info_line.contains("testing"),
366 "expected testing in features: {lines:#?}"
367 );
368
369 let res = reqwest::Client::new()
373 .get(url.join("/config/runtime").unwrap())
374 .header(reqwest::header::ACCEPT, "application/json")
375 .send()
376 .await
377 .unwrap();
378 assert!(
379 res.status().is_success(),
380 "config/runtime status: {}",
381 res.status()
382 );
383 let node_cfg: serde_json::Value = res.json().await.expect("config/runtime returns JSON");
384 assert_eq!(
385 node_cfg["cliquenet_bind_address"],
386 serde_json::Value::String(format!("127.0.0.1:{port2}")),
387 "cliquenet_bind_address mismatch: {node_cfg}"
388 );
389 assert_eq!(
390 node_cfg["is_da"],
391 serde_json::Value::Bool(false),
392 "is_da mismatch: {node_cfg}"
393 );
394 let top_level = node_cfg
395 .as_object()
396 .expect("config/runtime returns a JSON object");
397 for key in top_level.keys() {
398 assert!(
399 !key.to_lowercase().contains("private"),
400 "top-level key '{key}' contains 'private': {node_cfg}"
401 );
402 }
403
404 task.abort();
405 }
406}