1use anyhow::Context;
2use clap::Parser;
3use espresso_types::traits::{NullEventConsumer, SequencerPersistence};
4use futures::future::FutureExt;
5use hotshot_types::traits::metrics::NoMetrics;
6
7use super::{
8 Genesis, L1Params, NetworkParams,
9 api::{self, data_source::DataSourceOptions},
10 context::SequencerContext,
11 init_node, network,
12 options::{Modules, Options},
13 persistence,
14};
15use crate::keyset::KeySet;
16
17pub async fn main() -> anyhow::Result<()> {
18 let opt = Options::parse();
19 opt.logging.init();
20
21 let mut modules = opt.modules();
22 tracing::warn!(?modules, "sequencer starting up");
23
24 let genesis = Genesis::from_file(&opt.genesis_file)?;
25 tracing::warn!(?genesis, "genesis");
26
27 if let Some(storage) = modules.storage_fs.take() {
28 run_with_storage(genesis, modules, opt, storage).await
29 } else if let Some(storage) = modules.storage_sql.take() {
30 run_with_storage(genesis, modules, opt, storage).await
31 } else {
32 run_with_storage(genesis, modules, opt, persistence::fs::Options::default()).await
34 }
35}
36
37async fn run_with_storage<S>(
38 genesis: Genesis,
39 modules: Modules,
40 opt: Options,
41 storage_opt: S,
42) -> anyhow::Result<()>
43where
44 S: DataSourceOptions,
45{
46 let ctx = init_with_storage(genesis, modules, opt, storage_opt).await?;
47
48 ctx.start_consensus().await;
50 ctx.join().await;
51
52 Ok(())
53}
54
55pub async fn init_with_storage<S>(
56 genesis: Genesis,
57 modules: Modules,
58 opt: Options,
59 mut storage_opt: S,
60) -> anyhow::Result<SequencerContext<network::Production, S::Persistence>>
61where
62 S: DataSourceOptions,
63{
64 let KeySet {
65 staking,
66 state,
67 x25519,
68 } = opt.key_set.try_into()?;
69 let l1_params = L1Params {
70 urls: opt.l1_provider_url,
71 options: opt.l1_options,
72 };
73
74 let network_params = NetworkParams {
75 cdn_endpoint: opt.cdn_endpoint,
76 cliquenet_bind_addr: opt.cliquenet_bind_address,
77 x25519_secret_key: x25519,
78 libp2p_advertise_address: opt.libp2p_advertise_address,
79 libp2p_bind_address: opt.libp2p_bind_address,
80 libp2p_bootstrap_nodes: opt.libp2p_bootstrap_nodes,
81 orchestrator_url: opt.orchestrator_url,
82 builder_urls: opt.builder_urls,
83 state_relay_server_url: opt.state_relay_server_url,
84 public_api_url: opt.public_api_url,
85 private_staking_key: staking,
86 private_state_key: state,
87 state_peers: opt.state_peers,
88 config_peers: opt.config_peers,
89 catchup_backoff: opt.catchup_backoff,
90 catchup_base_timeout: opt.catchup_base_timeout,
91 local_catchup_timeout: opt.local_catchup_timeout,
92 libp2p_history_gossip: opt.libp2p_history_gossip,
93 libp2p_history_length: opt.libp2p_history_length,
94 libp2p_max_ihave_length: opt.libp2p_max_ihave_length,
95 libp2p_max_ihave_messages: opt.libp2p_max_ihave_messages,
96 libp2p_max_gossip_transmit_size: opt.libp2p_max_gossip_transmit_size,
97 libp2p_max_direct_transmit_size: opt.libp2p_max_direct_transmit_size,
98 libp2p_mesh_outbound_min: opt.libp2p_mesh_outbound_min,
99 libp2p_mesh_n: opt.libp2p_mesh_n,
100 libp2p_mesh_n_high: opt.libp2p_mesh_n_high,
101 libp2p_heartbeat_interval: opt.libp2p_heartbeat_interval,
102 libp2p_mesh_n_low: opt.libp2p_mesh_n_low,
103 libp2p_published_message_ids_cache_time: opt.libp2p_published_message_ids_cache_time,
104 libp2p_iwant_followup_time: opt.libp2p_iwant_followup_time,
105 libp2p_max_messages_per_rpc: opt.libp2p_max_messages_per_rpc,
106 libp2p_gossip_retransmission: opt.libp2p_gossip_retransmission,
107 libp2p_flood_publish: opt.libp2p_flood_publish,
108 libp2p_duplicate_cache_time: opt.libp2p_duplicate_cache_time,
109 libp2p_fanout_ttl: opt.libp2p_fanout_ttl,
110 libp2p_heartbeat_initial_delay: opt.libp2p_heartbeat_initial_delay,
111 libp2p_gossip_factor: opt.libp2p_gossip_factor,
112 libp2p_gossip_lazy: opt.libp2p_gossip_lazy,
113 };
114
115 let proposal_fetcher_config = opt.proposal_fetcher_config;
116
117 let persistence = storage_opt.create().await?;
118 persistence
119 .migrate_storage()
120 .await
121 .context("failed to migrate consensus data")?;
122
123 let ctx = match modules.http {
127 Some(http_opt) => {
128 let mut http_opt = api::Options::from(http_opt);
130 if let Some(query) = modules.query {
131 http_opt = storage_opt.enable_query_module(http_opt, query);
132 }
133 if let Some(submit) = modules.submit {
134 http_opt = http_opt.submit(submit);
135 }
136 if let Some(status) = modules.status {
137 http_opt = http_opt.status(status);
138 }
139
140 if let Some(catchup) = modules.catchup {
141 http_opt = http_opt.catchup(catchup);
142 }
143 if let Some(hotshot_events) = modules.hotshot_events {
144 http_opt = http_opt.hotshot_events(hotshot_events);
145 }
146 if let Some(explorer) = modules.explorer {
147 http_opt = http_opt.explorer(explorer);
148 }
149 if let Some(light_client) = modules.light_client {
150 http_opt = http_opt.light_client(light_client);
151 }
152 if let Some(config) = modules.config {
153 http_opt = http_opt.config(config);
154 }
155
156 http_opt
157 .serve(move |metrics, consumer, storage| {
158 async move {
159 init_node(
160 genesis,
161 network_params,
162 metrics,
163 persistence,
164 l1_params,
165 storage,
166 consumer,
167 opt.is_da,
168 opt.identity,
169 proposal_fetcher_config,
170 )
171 .await
172 }
173 .boxed()
174 })
175 .await?
176 },
177 None => {
178 init_node(
179 genesis,
180 network_params,
181 Box::new(NoMetrics),
182 persistence,
183 l1_params,
184 None,
185 NullEventConsumer,
186 opt.is_da,
187 opt.identity,
188 proposal_fetcher_config,
189 )
190 .await?
191 },
192 };
193
194 Ok(ctx)
195}
196
197#[cfg(test)]
198mod test {
199 use std::time::Duration;
200
201 use espresso_types::PubKey;
202 use hotshot_types::{light_client::StateKeyPair, traits::signature_key::SignatureKey, x25519};
203 use surf_disco::{Client, Url, error::ClientError};
204 use tagged_base64::TaggedBase64;
205 use tempfile::TempDir;
206 use test_utils::reserve_tcp_port;
207 use tokio::spawn;
208 use vbs::version::Version;
209
210 use super::*;
211 use crate::{
212 SequencerApiVersion,
213 api::options::Http,
214 genesis::{L1Finalized, StakeTableConfig},
215 persistence::fs,
216 };
217
218 #[test_log::test(tokio::test(flavor = "multi_thread"))]
219 async fn test_startup_before_orchestrator() {
220 let (pub_key, priv_key) = PubKey::generated_from_seed_indexed([0; 32], 0);
221 let state_key = StateKeyPair::generate_from_seed_indexed([0; 32], 0);
222 let x25519_kp = x25519::Keypair::generate().unwrap();
223
224 let port1 = reserve_tcp_port().expect("OS should have ephemeral ports available");
225 let port2 = reserve_tcp_port().expect("OS should have ephemeral ports available");
226 let tmp = TempDir::new().unwrap();
227
228 let genesis_file = tmp.path().join("genesis.toml");
229 let genesis = Genesis {
230 chain_config: Default::default(),
231 stake_table: StakeTableConfig { capacity: 10 },
232 accounts: Default::default(),
233 l1_finalized: L1Finalized::Number { number: 0 },
234 header: Default::default(),
235 upgrades: Default::default(),
236 base_version: Version { major: 0, minor: 1 },
237 upgrade_version: Version { major: 0, minor: 2 },
238 epoch_height: None,
239 drb_difficulty: None,
240 drb_upgrade_difficulty: None,
241 epoch_start_block: None,
242 stake_table_capacity: None,
243 genesis_version: Version { major: 0, minor: 1 },
244 da_committees: None,
245 };
246 genesis.to_file(&genesis_file).unwrap();
247
248 let modules = Modules {
249 http: Some(Http::with_port(port1)),
250 query: Some(Default::default()),
251 storage_fs: Some(fs::Options::new(tmp.path().into())),
252 ..Default::default()
253 };
254 let opt = Options::parse_from([
255 "sequencer",
256 "--private-staking-key",
257 &priv_key.to_tagged_base64().expect("valid key").to_string(),
258 "--private-state-key",
259 &state_key
260 .sign_key_ref()
261 .to_tagged_base64()
262 .expect("valid key")
263 .to_string(),
264 "--private-x25519-key",
265 &TaggedBase64::try_from(x25519_kp.secret_key())
266 .expect("valid key")
267 .to_string(),
268 "--cliquenet-bind-address",
269 &format!("127.0.0.1:{port2}"),
270 "--genesis-file",
271 &genesis_file.display().to_string(),
272 ]);
273
274 tracing::info!(port = %port1, "starting sequencer");
278 let task = spawn(async move {
279 if let Err(err) =
280 init_with_storage(genesis, modules, opt, fs::Options::new(tmp.path().into())).await
281 {
282 tracing::error!("failed to start sequencer: {err:#}");
283 }
284 });
285
286 tracing::info!("waiting for API to start");
289 let url: Url = format!("http://localhost:{port1}").parse().unwrap();
290 let client = Client::<ClientError, SequencerApiVersion>::new(url.clone());
291 assert!(client.connect(Some(Duration::from_secs(60))).await);
292 client.get::<()>("healthcheck").send().await.unwrap();
293
294 let res = reqwest::get(url.join("/status/metrics").unwrap())
297 .await
298 .unwrap();
299 assert!(res.status().is_success(), "{}", res.status());
300 let metrics = res.text().await.unwrap();
301 let lines = metrics.lines().collect::<Vec<_>>();
302 assert!(
303 lines.contains(&format!("consensus_node{{key=\"{pub_key}\"}} 1").as_str()),
304 "{lines:#?}"
305 );
306 assert!(
307 lines.contains(
308 &format!(
309 "consensus_version{{desc=\"{}\",rev=\"{}\",timestamp=\"{}\"}} 1",
310 espresso_utils::build_info::GIT_DESCRIBE,
311 espresso_utils::build_info::GIT_SHA,
312 espresso_utils::build_info::GIT_COMMIT_TIMESTAMP,
313 )
314 .as_str()
315 ),
316 "{lines:#?}"
317 );
318 let build_info_line = lines
319 .iter()
320 .find(|l| l.starts_with("consensus_build_info{"));
321 assert!(
322 build_info_line.is_some(),
323 "missing consensus_build_info metric: {lines:#?}"
324 );
325 let build_info_line = build_info_line.unwrap();
326 assert!(
327 build_info_line.contains("modified="),
328 "expected modified= in build_info: {lines:#?}"
329 );
330 assert!(
331 build_info_line.contains("features="),
332 "expected features= in build_info: {lines:#?}"
333 );
334 assert!(
335 build_info_line.contains("testing"),
336 "expected testing in features: {lines:#?}"
337 );
338
339 task.abort();
340 }
341}