espresso_node/
run.rs

1use anyhow::Context;
2use clap::Parser;
3use espresso_types::traits::{NullEventConsumer, SequencerPersistence};
4use futures::future::FutureExt;
5use hotshot_types::traits::metrics::NoMetrics;
6
7use super::{
8    Genesis, L1Params, NetworkParams,
9    api::{self, data_source::DataSourceOptions},
10    context::SequencerContext,
11    init_node, network,
12    options::{Modules, Options},
13    persistence,
14};
15use crate::keyset::KeySet;
16
17pub async fn main() -> anyhow::Result<()> {
18    let opt = Options::parse();
19    opt.logging.init();
20
21    let mut modules = opt.modules();
22    tracing::warn!(?modules, "sequencer starting up");
23
24    let genesis = Genesis::from_file(&opt.genesis_file)?;
25    tracing::warn!(?genesis, "genesis");
26
27    if let Some(storage) = modules.storage_fs.take() {
28        run_with_storage(genesis, modules, opt, storage).await
29    } else if let Some(storage) = modules.storage_sql.take() {
30        run_with_storage(genesis, modules, opt, storage).await
31    } else {
32        // Persistence is required. If none is provided, just use the local file system.
33        run_with_storage(genesis, modules, opt, persistence::fs::Options::default()).await
34    }
35}
36
37async fn run_with_storage<S>(
38    genesis: Genesis,
39    modules: Modules,
40    opt: Options,
41    storage_opt: S,
42) -> anyhow::Result<()>
43where
44    S: DataSourceOptions,
45{
46    let ctx = init_with_storage(genesis, modules, opt, storage_opt).await?;
47
48    // Start doing consensus.
49    ctx.start_consensus().await;
50    ctx.join().await;
51
52    Ok(())
53}
54
55pub async fn init_with_storage<S>(
56    genesis: Genesis,
57    modules: Modules,
58    opt: Options,
59    mut storage_opt: S,
60) -> anyhow::Result<SequencerContext<network::Production, S::Persistence>>
61where
62    S: DataSourceOptions,
63{
64    let KeySet {
65        staking,
66        state,
67        x25519,
68    } = opt.key_set.try_into()?;
69    let l1_params = L1Params {
70        urls: opt.l1_provider_url,
71        options: opt.l1_options,
72    };
73
74    let network_params = NetworkParams {
75        cdn_endpoint: opt.cdn_endpoint,
76        cliquenet_bind_addr: opt.cliquenet_bind_address,
77        x25519_secret_key: x25519,
78        libp2p_advertise_address: opt.libp2p_advertise_address,
79        libp2p_bind_address: opt.libp2p_bind_address,
80        libp2p_bootstrap_nodes: opt.libp2p_bootstrap_nodes,
81        orchestrator_url: opt.orchestrator_url,
82        builder_urls: opt.builder_urls,
83        state_relay_server_url: opt.state_relay_server_url,
84        public_api_url: opt.public_api_url,
85        private_staking_key: staking,
86        private_state_key: state,
87        state_peers: opt.state_peers,
88        config_peers: opt.config_peers,
89        catchup_backoff: opt.catchup_backoff,
90        catchup_base_timeout: opt.catchup_base_timeout,
91        local_catchup_timeout: opt.local_catchup_timeout,
92        libp2p_history_gossip: opt.libp2p_history_gossip,
93        libp2p_history_length: opt.libp2p_history_length,
94        libp2p_max_ihave_length: opt.libp2p_max_ihave_length,
95        libp2p_max_ihave_messages: opt.libp2p_max_ihave_messages,
96        libp2p_max_gossip_transmit_size: opt.libp2p_max_gossip_transmit_size,
97        libp2p_max_direct_transmit_size: opt.libp2p_max_direct_transmit_size,
98        libp2p_mesh_outbound_min: opt.libp2p_mesh_outbound_min,
99        libp2p_mesh_n: opt.libp2p_mesh_n,
100        libp2p_mesh_n_high: opt.libp2p_mesh_n_high,
101        libp2p_heartbeat_interval: opt.libp2p_heartbeat_interval,
102        libp2p_mesh_n_low: opt.libp2p_mesh_n_low,
103        libp2p_published_message_ids_cache_time: opt.libp2p_published_message_ids_cache_time,
104        libp2p_iwant_followup_time: opt.libp2p_iwant_followup_time,
105        libp2p_max_messages_per_rpc: opt.libp2p_max_messages_per_rpc,
106        libp2p_gossip_retransmission: opt.libp2p_gossip_retransmission,
107        libp2p_flood_publish: opt.libp2p_flood_publish,
108        libp2p_duplicate_cache_time: opt.libp2p_duplicate_cache_time,
109        libp2p_fanout_ttl: opt.libp2p_fanout_ttl,
110        libp2p_heartbeat_initial_delay: opt.libp2p_heartbeat_initial_delay,
111        libp2p_gossip_factor: opt.libp2p_gossip_factor,
112        libp2p_gossip_lazy: opt.libp2p_gossip_lazy,
113    };
114
115    let proposal_fetcher_config = opt.proposal_fetcher_config;
116
117    let persistence = storage_opt.create().await?;
118    persistence
119        .migrate_storage()
120        .await
121        .context("failed to migrate consensus data")?;
122
123    // Initialize HotShot. If the user requested the HTTP module, we must initialize the handle in
124    // a special way, in order to populate the API with consensus metrics. Otherwise, we initialize
125    // the handle directly, with no metrics.
126    let ctx = match modules.http {
127        Some(http_opt) => {
128            // Add optional API modules as requested.
129            let mut http_opt = api::Options::from(http_opt);
130            if let Some(query) = modules.query {
131                http_opt = storage_opt.enable_query_module(http_opt, query);
132            }
133            if let Some(submit) = modules.submit {
134                http_opt = http_opt.submit(submit);
135            }
136            if let Some(status) = modules.status {
137                http_opt = http_opt.status(status);
138            }
139
140            if let Some(catchup) = modules.catchup {
141                http_opt = http_opt.catchup(catchup);
142            }
143            if let Some(hotshot_events) = modules.hotshot_events {
144                http_opt = http_opt.hotshot_events(hotshot_events);
145            }
146            if let Some(explorer) = modules.explorer {
147                http_opt = http_opt.explorer(explorer);
148            }
149            if let Some(light_client) = modules.light_client {
150                http_opt = http_opt.light_client(light_client);
151            }
152            if let Some(config) = modules.config {
153                http_opt = http_opt.config(config);
154            }
155
156            http_opt
157                .serve(move |metrics, consumer, storage| {
158                    async move {
159                        init_node(
160                            genesis,
161                            network_params,
162                            metrics,
163                            persistence,
164                            l1_params,
165                            storage,
166                            consumer,
167                            opt.is_da,
168                            opt.identity,
169                            proposal_fetcher_config,
170                        )
171                        .await
172                    }
173                    .boxed()
174                })
175                .await?
176        },
177        None => {
178            init_node(
179                genesis,
180                network_params,
181                Box::new(NoMetrics),
182                persistence,
183                l1_params,
184                None,
185                NullEventConsumer,
186                opt.is_da,
187                opt.identity,
188                proposal_fetcher_config,
189            )
190            .await?
191        },
192    };
193
194    Ok(ctx)
195}
196
197#[cfg(test)]
198mod test {
199    use std::time::Duration;
200
201    use espresso_types::PubKey;
202    use hotshot_types::{light_client::StateKeyPair, traits::signature_key::SignatureKey, x25519};
203    use surf_disco::{Client, Url, error::ClientError};
204    use tagged_base64::TaggedBase64;
205    use tempfile::TempDir;
206    use test_utils::reserve_tcp_port;
207    use tokio::spawn;
208    use vbs::version::Version;
209
210    use super::*;
211    use crate::{
212        SequencerApiVersion,
213        api::options::Http,
214        genesis::{L1Finalized, StakeTableConfig},
215        persistence::fs,
216    };
217
218    #[test_log::test(tokio::test(flavor = "multi_thread"))]
219    async fn test_startup_before_orchestrator() {
220        let (pub_key, priv_key) = PubKey::generated_from_seed_indexed([0; 32], 0);
221        let state_key = StateKeyPair::generate_from_seed_indexed([0; 32], 0);
222        let x25519_kp = x25519::Keypair::generate().unwrap();
223
224        let port1 = reserve_tcp_port().expect("OS should have ephemeral ports available");
225        let port2 = reserve_tcp_port().expect("OS should have ephemeral ports available");
226        let tmp = TempDir::new().unwrap();
227
228        let genesis_file = tmp.path().join("genesis.toml");
229        let genesis = Genesis {
230            chain_config: Default::default(),
231            stake_table: StakeTableConfig { capacity: 10 },
232            accounts: Default::default(),
233            l1_finalized: L1Finalized::Number { number: 0 },
234            header: Default::default(),
235            upgrades: Default::default(),
236            base_version: Version { major: 0, minor: 1 },
237            upgrade_version: Version { major: 0, minor: 2 },
238            epoch_height: None,
239            drb_difficulty: None,
240            drb_upgrade_difficulty: None,
241            epoch_start_block: None,
242            stake_table_capacity: None,
243            genesis_version: Version { major: 0, minor: 1 },
244            da_committees: None,
245        };
246        genesis.to_file(&genesis_file).unwrap();
247
248        let modules = Modules {
249            http: Some(Http::with_port(port1)),
250            query: Some(Default::default()),
251            storage_fs: Some(fs::Options::new(tmp.path().into())),
252            ..Default::default()
253        };
254        let opt = Options::parse_from([
255            "sequencer",
256            "--private-staking-key",
257            &priv_key.to_tagged_base64().expect("valid key").to_string(),
258            "--private-state-key",
259            &state_key
260                .sign_key_ref()
261                .to_tagged_base64()
262                .expect("valid key")
263                .to_string(),
264            "--private-x25519-key",
265            &TaggedBase64::try_from(x25519_kp.secret_key())
266                .expect("valid key")
267                .to_string(),
268            "--cliquenet-bind-address",
269            &format!("127.0.0.1:{port2}"),
270            "--genesis-file",
271            &genesis_file.display().to_string(),
272        ]);
273
274        // Start the sequencer in a background task. This process will not complete, because it will
275        // be waiting for the orchestrator, but it should at least start up the API server and
276        // populate some metrics.
277        tracing::info!(port = %port1, "starting sequencer");
278        let task = spawn(async move {
279            if let Err(err) =
280                init_with_storage(genesis, modules, opt, fs::Options::new(tmp.path().into())).await
281            {
282                tracing::error!("failed to start sequencer: {err:#}");
283            }
284        });
285
286        // The healthcheck should eventually come up even though the node is waiting for the
287        // orchestrator.
288        tracing::info!("waiting for API to start");
289        let url: Url = format!("http://localhost:{port1}").parse().unwrap();
290        let client = Client::<ClientError, SequencerApiVersion>::new(url.clone());
291        assert!(client.connect(Some(Duration::from_secs(60))).await);
292        client.get::<()>("healthcheck").send().await.unwrap();
293
294        // The metrics should include information about the node and software version. surf-disco
295        // doesn't currently support fetching a plaintext file, so we use a raw reqwest client.
296        let res = reqwest::get(url.join("/status/metrics").unwrap())
297            .await
298            .unwrap();
299        assert!(res.status().is_success(), "{}", res.status());
300        let metrics = res.text().await.unwrap();
301        let lines = metrics.lines().collect::<Vec<_>>();
302        assert!(
303            lines.contains(&format!("consensus_node{{key=\"{pub_key}\"}} 1").as_str()),
304            "{lines:#?}"
305        );
306        assert!(
307            lines.contains(
308                &format!(
309                    "consensus_version{{desc=\"{}\",rev=\"{}\",timestamp=\"{}\"}} 1",
310                    espresso_utils::build_info::GIT_DESCRIBE,
311                    espresso_utils::build_info::GIT_SHA,
312                    espresso_utils::build_info::GIT_COMMIT_TIMESTAMP,
313                )
314                .as_str()
315            ),
316            "{lines:#?}"
317        );
318        let build_info_line = lines
319            .iter()
320            .find(|l| l.starts_with("consensus_build_info{"));
321        assert!(
322            build_info_line.is_some(),
323            "missing consensus_build_info metric: {lines:#?}"
324        );
325        let build_info_line = build_info_line.unwrap();
326        assert!(
327            build_info_line.contains("modified="),
328            "expected modified= in build_info: {lines:#?}"
329        );
330        assert!(
331            build_info_line.contains("features="),
332            "expected features= in build_info: {lines:#?}"
333        );
334        assert!(
335            build_info_line.contains("testing"),
336            "expected testing in features: {lines:#?}"
337        );
338
339        task.abort();
340    }
341}