Skip to main content

espresso_node/
run.rs

1use anyhow::Context;
2use clap::Parser;
3use espresso_types::traits::{NullEventConsumer, SequencerPersistence};
4use futures::future::FutureExt;
5use hotshot_types::traits::metrics::NoMetrics;
6
7use super::{
8    Genesis, L1Params, NetworkParams,
9    api::{self, data_source::DataSourceOptions},
10    context::SequencerContext,
11    init_node, network,
12    options::{Modules, Options, PublicNodeConfig},
13    persistence,
14};
15use crate::keyset::KeySet;
16
17pub async fn main(migrated_envs: Vec<(&str, &str)>) -> anyhow::Result<()> {
18    let opt = Options::parse();
19    opt.logging.init();
20    espresso_utils::env_compat::log_migrated_env_vars(&migrated_envs);
21
22    let mut modules = opt.modules();
23    tracing::warn!(?modules, "sequencer starting up");
24
25    let public_node_config = PublicNodeConfig::new(&opt, &modules);
26
27    let genesis = Genesis::from_file(&opt.genesis_file)?;
28    tracing::warn!(?genesis, "genesis");
29
30    if let Some(storage) = modules.storage_fs.take() {
31        run_with_storage(genesis, modules, opt, storage, public_node_config).await
32    } else if let Some(storage) = modules.storage_sql.take() {
33        run_with_storage(genesis, modules, opt, storage, public_node_config).await
34    } else {
35        // Persistence is required. If none is provided, just use the local file system.
36        run_with_storage(
37            genesis,
38            modules,
39            opt,
40            persistence::fs::Options::default(),
41            public_node_config,
42        )
43        .await
44    }
45}
46
47async fn run_with_storage<S>(
48    genesis: Genesis,
49    modules: Modules,
50    opt: Options,
51    storage_opt: S,
52    public_node_config: PublicNodeConfig,
53) -> anyhow::Result<()>
54where
55    S: DataSourceOptions,
56{
57    let ctx = init_with_storage(genesis, modules, opt, storage_opt, public_node_config).await?;
58
59    // Start doing consensus.
60    ctx.start_consensus().await;
61    ctx.join().await;
62
63    Ok(())
64}
65
66pub async fn init_with_storage<S>(
67    genesis: Genesis,
68    modules: Modules,
69    opt: Options,
70    mut storage_opt: S,
71    public_node_config: PublicNodeConfig,
72) -> anyhow::Result<SequencerContext<network::Production, S::Persistence>>
73where
74    S: DataSourceOptions,
75{
76    let KeySet {
77        staking,
78        state,
79        x25519,
80    } = opt.key_set.try_into()?;
81    let l1_params = L1Params {
82        urls: opt.l1_provider_url,
83        options: opt.l1_options,
84    };
85
86    let network_params = NetworkParams {
87        cdn_endpoint: opt.cdn_endpoint,
88        cliquenet_bind_addr: opt.cliquenet_bind_address,
89        cliquenet_advertise_addr: opt.cliquenet_advertise_address,
90        x25519_secret_key: x25519,
91        libp2p_advertise_address: opt.libp2p_advertise_address,
92        libp2p_bind_address: opt.libp2p_bind_address,
93        libp2p_bootstrap_nodes: opt.libp2p_bootstrap_nodes,
94        orchestrator_url: opt.orchestrator_url,
95        builder_urls: opt.builder_urls,
96        state_relay_server_url: opt.state_relay_server_url,
97        public_api_url: opt.public_api_url,
98        private_staking_key: staking,
99        private_state_key: state,
100        state_peers: opt.state_peers,
101        config_peers: opt.config_peers,
102        catchup_backoff: opt.catchup_backoff,
103        catchup_base_timeout: opt.catchup_base_timeout,
104        local_catchup_timeout: opt.local_catchup_timeout,
105        bootstrap_epoch_catchup_timeout: opt.bootstrap_epoch_catchup_timeout,
106        new_protocol_consensus_gc_interval: opt.new_protocol_consensus_gc_interval,
107        libp2p_history_gossip: opt.libp2p_history_gossip,
108        libp2p_history_length: opt.libp2p_history_length,
109        libp2p_max_ihave_length: opt.libp2p_max_ihave_length,
110        libp2p_max_ihave_messages: opt.libp2p_max_ihave_messages,
111        libp2p_max_gossip_transmit_size: opt.libp2p_max_gossip_transmit_size,
112        libp2p_max_direct_transmit_size: opt.libp2p_max_direct_transmit_size,
113        libp2p_mesh_outbound_min: opt.libp2p_mesh_outbound_min,
114        libp2p_mesh_n: opt.libp2p_mesh_n,
115        libp2p_mesh_n_high: opt.libp2p_mesh_n_high,
116        libp2p_heartbeat_interval: opt.libp2p_heartbeat_interval,
117        libp2p_mesh_n_low: opt.libp2p_mesh_n_low,
118        libp2p_published_message_ids_cache_time: opt.libp2p_published_message_ids_cache_time,
119        libp2p_iwant_followup_time: opt.libp2p_iwant_followup_time,
120        libp2p_max_messages_per_rpc: opt.libp2p_max_messages_per_rpc,
121        libp2p_gossip_retransmission: opt.libp2p_gossip_retransmission,
122        libp2p_flood_publish: opt.libp2p_flood_publish,
123        libp2p_duplicate_cache_time: opt.libp2p_duplicate_cache_time,
124        libp2p_fanout_ttl: opt.libp2p_fanout_ttl,
125        libp2p_heartbeat_initial_delay: opt.libp2p_heartbeat_initial_delay,
126        libp2p_gossip_factor: opt.libp2p_gossip_factor,
127        libp2p_gossip_lazy: opt.libp2p_gossip_lazy,
128    };
129
130    let proposal_fetcher_config = opt.proposal_fetcher_config;
131
132    let persistence = storage_opt.create().await?;
133    persistence
134        .migrate_storage()
135        .await
136        .context("failed to migrate consensus data")?;
137
138    // Initialize HotShot. If the user requested the HTTP module, we must initialize the handle in
139    // a special way, in order to populate the API with consensus metrics. Otherwise, we initialize
140    // the handle directly, with no metrics.
141    let ctx = match modules.http {
142        Some(http_opt) => {
143            // Add optional API modules as requested.
144            let mut http_opt = api::Options::from(http_opt);
145            if let Some(query) = modules.query {
146                http_opt = storage_opt.enable_query_module(http_opt, query);
147            }
148            if let Some(submit) = modules.submit {
149                http_opt = http_opt.submit(submit);
150            }
151            if let Some(status) = modules.status {
152                http_opt = http_opt.status(status);
153            }
154
155            if let Some(catchup) = modules.catchup {
156                http_opt = http_opt.catchup(catchup);
157            }
158            if let Some(hotshot_events) = modules.hotshot_events {
159                http_opt = http_opt.hotshot_events(hotshot_events);
160            }
161            if let Some(explorer) = modules.explorer {
162                http_opt = http_opt.explorer(explorer);
163            }
164            if let Some(light_client) = modules.light_client {
165                http_opt = http_opt.light_client(light_client);
166            }
167            if let Some(config) = modules.config {
168                http_opt = http_opt
169                    .config(config)
170                    .public_node_config(public_node_config);
171            }
172
173            http_opt
174                .serve(move |metrics, consumer, storage| {
175                    async move {
176                        init_node(
177                            genesis,
178                            network_params,
179                            metrics,
180                            persistence,
181                            l1_params,
182                            storage,
183                            consumer,
184                            opt.is_da,
185                            opt.identity,
186                            proposal_fetcher_config,
187                        )
188                        .await
189                    }
190                    .boxed()
191                })
192                .await?
193        },
194        None => {
195            init_node(
196                genesis,
197                network_params,
198                Box::new(NoMetrics),
199                persistence,
200                l1_params,
201                None,
202                NullEventConsumer,
203                opt.is_da,
204                opt.identity,
205                proposal_fetcher_config,
206            )
207            .await?
208        },
209    };
210
211    Ok(ctx)
212}
213
214#[cfg(test)]
215mod test {
216    use std::time::Duration;
217
218    use espresso_types::PubKey;
219    use hotshot_types::{light_client::StateKeyPair, traits::signature_key::SignatureKey, x25519};
220    use surf_disco::{Client, Url, error::ClientError};
221    use tagged_base64::TaggedBase64;
222    use tempfile::TempDir;
223    use test_utils::reserve_tcp_port;
224    use tokio::spawn;
225    use vbs::version::Version;
226
227    use super::*;
228    use crate::{
229        SequencerApiVersion,
230        api::options::Http,
231        genesis::{L1Finalized, StakeTableConfig},
232        persistence::fs,
233    };
234
235    #[test_log::test(tokio::test(flavor = "multi_thread"))]
236    async fn test_startup_before_orchestrator() {
237        let (pub_key, priv_key) = PubKey::generated_from_seed_indexed([0; 32], 0);
238        let state_key = StateKeyPair::generate_from_seed_indexed([0; 32], 0);
239        let x25519_kp = x25519::Keypair::generate().unwrap();
240
241        let port1 = reserve_tcp_port().expect("OS should have ephemeral ports available");
242        let port2 = reserve_tcp_port().expect("OS should have ephemeral ports available");
243        let tmp = TempDir::new().unwrap();
244
245        let genesis_file = tmp.path().join("genesis.toml");
246        let genesis = Genesis {
247            chain_config: Default::default(),
248            stake_table: StakeTableConfig { capacity: 10 },
249            accounts: Default::default(),
250            l1_finalized: L1Finalized::Number { number: 0 },
251            header: Default::default(),
252            upgrades: Default::default(),
253            base_version: Version { major: 0, minor: 1 },
254            upgrade_version: Version { major: 0, minor: 2 },
255            epoch_height: None,
256            drb_difficulty: None,
257            drb_upgrade_difficulty: None,
258            epoch_start_block: None,
259            stake_table_capacity: None,
260            genesis_version: Version { major: 0, minor: 1 },
261            da_committees: None,
262        };
263        genesis.to_file(&genesis_file).unwrap();
264
265        let modules = Modules {
266            http: Some(Http::with_port(port1)),
267            query: Some(Default::default()),
268            storage_fs: Some(fs::Options::new(tmp.path().into())),
269            config: Some(Default::default()),
270            ..Default::default()
271        };
272        let opt = Options::parse_from([
273            "sequencer",
274            "--private-staking-key",
275            &priv_key.to_tagged_base64().expect("valid key").to_string(),
276            "--private-state-key",
277            &state_key
278                .sign_key_ref()
279                .to_tagged_base64()
280                .expect("valid key")
281                .to_string(),
282            "--private-x25519-key",
283            &TaggedBase64::try_from(x25519_kp.secret_key())
284                .expect("valid key")
285                .to_string(),
286            "--cliquenet-bind-address",
287            &format!("127.0.0.1:{port2}"),
288            // Never bound: this test blocks at orchestrator before libp2p starts. Port 0 is a
289            // placeholder to satisfy the orchestrator-bootstrap requirement on the advertise
290            // address.
291            "--libp2p-advertise-address",
292            "127.0.0.1:0",
293            "--genesis-file",
294            &genesis_file.display().to_string(),
295        ]);
296
297        // Start the sequencer in a background task. This process will not complete, because it will
298        // be waiting for the orchestrator, but it should at least start up the API server and
299        // populate some metrics.
300        tracing::info!(port = %port1, "starting sequencer");
301        let public_node_config = PublicNodeConfig::new(&opt, &modules);
302        let task = spawn(async move {
303            if let Err(err) = init_with_storage(
304                genesis,
305                modules,
306                opt,
307                fs::Options::new(tmp.path().into()),
308                public_node_config,
309            )
310            .await
311            {
312                tracing::error!("failed to start sequencer: {err:#}");
313            }
314        });
315
316        // The healthcheck should eventually come up even though the node is waiting for the
317        // orchestrator.
318        tracing::info!("waiting for API to start");
319        let url: Url = format!("http://localhost:{port1}").parse().unwrap();
320        let client = Client::<ClientError, SequencerApiVersion>::new(url.clone());
321        assert!(client.connect(Some(Duration::from_secs(60))).await);
322        client.get::<()>("healthcheck").send().await.unwrap();
323
324        // The metrics should include information about the node and software version. surf-disco
325        // doesn't currently support fetching a plaintext file, so we use a raw reqwest client.
326        let res = reqwest::get(url.join("/status/metrics").unwrap())
327            .await
328            .unwrap();
329        assert!(res.status().is_success(), "{}", res.status());
330        let metrics = res.text().await.unwrap();
331        let lines = metrics.lines().collect::<Vec<_>>();
332        assert!(
333            lines.contains(&format!("consensus_node{{key=\"{pub_key}\"}} 1").as_str()),
334            "{lines:#?}"
335        );
336        assert!(
337            lines.contains(
338                &format!(
339                    "consensus_version{{desc=\"{}\",rev=\"{}\",timestamp=\"{}\"}} 1",
340                    espresso_utils::build_info::GIT_DESCRIBE,
341                    espresso_utils::build_info::GIT_SHA,
342                    espresso_utils::build_info::GIT_COMMIT_TIMESTAMP,
343                )
344                .as_str()
345            ),
346            "{lines:#?}"
347        );
348        let build_info_line = lines
349            .iter()
350            .find(|l| l.starts_with("consensus_build_info{"));
351        assert!(
352            build_info_line.is_some(),
353            "missing consensus_build_info metric: {lines:#?}"
354        );
355        let build_info_line = build_info_line.unwrap();
356        assert!(
357            build_info_line.contains("modified="),
358            "expected modified= in build_info: {lines:#?}"
359        );
360        assert!(
361            build_info_line.contains("features="),
362            "expected features= in build_info: {lines:#?}"
363        );
364        assert!(
365            build_info_line.contains("testing"),
366            "expected testing in features: {lines:#?}"
367        );
368
369        // The /config/runtime endpoint should be available and reflect CLI overrides. Use a raw
370        // reqwest client to fetch JSON, since surf-disco defaults to bincode encoding which can't
371        // round-trip arbitrary JSON via `serde_json::Value`.
372        let res = reqwest::Client::new()
373            .get(url.join("/config/runtime").unwrap())
374            .header(reqwest::header::ACCEPT, "application/json")
375            .send()
376            .await
377            .unwrap();
378        assert!(
379            res.status().is_success(),
380            "config/runtime status: {}",
381            res.status()
382        );
383        let node_cfg: serde_json::Value = res.json().await.expect("config/runtime returns JSON");
384        assert_eq!(
385            node_cfg["cliquenet_bind_address"],
386            serde_json::Value::String(format!("127.0.0.1:{port2}")),
387            "cliquenet_bind_address mismatch: {node_cfg}"
388        );
389        assert_eq!(
390            node_cfg["is_da"],
391            serde_json::Value::Bool(false),
392            "is_da mismatch: {node_cfg}"
393        );
394        let top_level = node_cfg
395            .as_object()
396            .expect("config/runtime returns a JSON object");
397        for key in top_level.keys() {
398            assert!(
399                !key.to_lowercase().contains("private"),
400                "top-level key '{key}' contains 'private': {node_cfg}"
401            );
402        }
403
404        task.abort();
405    }
406}