espresso_node/api/
options.rs

1//! Sequencer-specific API options and initialization.
2
3use std::sync::Arc;
4
5use anyhow::{Context, bail};
6use clap::Parser;
7use espresso_types::{
8    BlockMerkleTree, PubKey, SeqTypes,
9    v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, SequencerPersistence},
10    v0_3::RewardMerkleTreeV1,
11    v0_4::RewardMerkleTreeV2,
12};
13use futures::{
14    channel::oneshot,
15    future::{BoxFuture, Future},
16};
17use hotshot_query_service::{
18    ApiState as AppState, Error,
19    data_source::{ExtensibleDataSource, MetricsDataSource},
20    fetching::provider::QueryServiceProvider,
21    status::{self, UpdateStatusData},
22};
23use hotshot_types::traits::{
24    metrics::{Metrics, NoMetrics},
25    network::ConnectedNetwork,
26};
27use jf_merkle_tree_compat::MerkleTreeScheme;
28use tide_disco::{Api, App, Url, listener::RateLimitListener, method::ReadState};
29use vbs::version::StaticVersionType;
30
31use super::{
32    ApiState, StorageState,
33    data_source::{
34        CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource, Provider,
35        SequencerDataSource, StateSignatureDataSource, SubmitDataSource, provider,
36    },
37    endpoints, fs, light_client, sql,
38    update::ApiEventConsumer,
39};
40use crate::{
41    SequencerApiVersion,
42    api::endpoints::RewardMerkleTreeVersion,
43    catchup::CatchupStorage,
44    context::{SequencerContext, TaskList},
45    persistence,
46    request_response::data_source::Storage as RequestResponseStorage,
47    state::update_state_storage_loop,
48};
49
50#[derive(Clone, Debug)]
51pub struct Options {
52    pub http: Http,
53    pub query: Option<Query>,
54    pub submit: Option<Submit>,
55    pub status: Option<Status>,
56    pub catchup: Option<Catchup>,
57    pub config: Option<Config>,
58    pub hotshot_events: Option<HotshotEvents>,
59    pub explorer: Option<Explorer>,
60    pub light_client: Option<LightClient>,
61    pub storage_fs: Option<persistence::fs::Options>,
62    pub storage_sql: Option<persistence::sql::Options>,
63}
64
65impl From<Http> for Options {
66    fn from(http: Http) -> Self {
67        Self {
68            http,
69            query: None,
70            submit: None,
71            status: None,
72            catchup: None,
73            config: None,
74            hotshot_events: None,
75            explorer: None,
76            light_client: None,
77            storage_fs: None,
78            storage_sql: None,
79        }
80    }
81}
82
83impl Options {
84    /// Default options for running a web server on the given port.
85    pub fn with_port(port: u16) -> Self {
86        Http::with_port(port).into()
87    }
88
89    /// Add a query API module backed by a Postgres database.
90    pub fn query_sql(mut self, query: Query, storage: persistence::sql::Options) -> Self {
91        self.query = Some(query);
92        self.storage_sql = Some(storage);
93        self
94    }
95
96    /// Add a query API module backed by the file system.
97    pub fn query_fs(mut self, query: Query, storage: persistence::fs::Options) -> Self {
98        self.query = Some(query);
99        self.storage_fs = Some(storage);
100        self
101    }
102
103    /// Add a submit API module.
104    pub fn submit(mut self, opt: Submit) -> Self {
105        self.submit = Some(opt);
106        self
107    }
108
109    /// Add a status API module.
110    pub fn status(mut self, opt: Status) -> Self {
111        self.status = Some(opt);
112        self
113    }
114
115    /// Add a catchup API module.
116    pub fn catchup(mut self, opt: Catchup) -> Self {
117        self.catchup = Some(opt);
118        self
119    }
120
121    /// Add a config API module.
122    pub fn config(mut self, opt: Config) -> Self {
123        self.config = Some(opt);
124        self
125    }
126
127    /// Add a Hotshot events streaming API module.
128    pub fn hotshot_events(mut self, opt: HotshotEvents) -> Self {
129        self.hotshot_events = Some(opt);
130        self
131    }
132
133    /// Add an explorer API module.
134    pub fn explorer(mut self, opt: Explorer) -> Self {
135        self.explorer = Some(opt);
136        self
137    }
138
139    /// Add a light client API module.
140    pub fn light_client(mut self, opt: LightClient) -> Self {
141        self.light_client = Some(opt);
142        self
143    }
144
145    /// Whether these options will run the query API.
146    pub fn has_query_module(&self) -> bool {
147        self.query.is_some() && (self.storage_fs.is_some() || self.storage_sql.is_some())
148    }
149
150    /// Start the server.
151    ///
152    /// The function `init_context` is used to create a sequencer context from a metrics object and
153    /// optional saved consensus state. The metrics object is created from the API data source, so
154    /// that consensus will populuate metrics that can then be read and served by the API.
155    pub async fn serve<N, P, F>(mut self, init_context: F) -> anyhow::Result<SequencerContext<N, P>>
156    where
157        N: ConnectedNetwork<PubKey>,
158        P: SequencerPersistence,
159        F: FnOnce(
160            Box<dyn Metrics>,
161            Box<dyn EventConsumer>,
162            Option<RequestResponseStorage>,
163        ) -> BoxFuture<'static, anyhow::Result<SequencerContext<N, P>>>,
164    {
165        // Create a channel to send the context to the web server after it is initialized. This
166        // allows the web server to start before initialization can complete, since initialization
167        // can take a long time (and is dependent on other nodes).
168        let (send_ctx, recv_ctx) = oneshot::channel();
169        let state = ApiState::new(async move {
170            recv_ctx
171                .await
172                .expect("context initialized and sent over channel")
173        });
174        let mut tasks = TaskList::default();
175
176        // The server state type depends on whether we are running a query or status API or not, so
177        // we handle the two cases differently.
178        #[allow(clippy::type_complexity)]
179        let (metrics, consumer, storage): (
180            Box<dyn Metrics>,
181            Box<dyn EventConsumer>,
182            Option<RequestResponseStorage>,
183        ) = if let Some(query_opt) = self.query.take() {
184            if let Some(opt) = self.storage_sql.take() {
185                self.init_with_query_module_sql(
186                    query_opt,
187                    opt,
188                    state,
189                    &mut tasks,
190                    SequencerApiVersion::instance(),
191                )
192                .await?
193            } else if let Some(opt) = self.storage_fs.take() {
194                self.init_with_query_module_fs(
195                    query_opt,
196                    opt,
197                    state,
198                    &mut tasks,
199                    SequencerApiVersion::instance(),
200                )
201                .await?
202            } else {
203                bail!("query module requested but not storage provided");
204            }
205        } else if self.status.is_some() {
206            // If a status API is requested but no availability API, we use the
207            // `MetricsDataSource`, which allows us to run the status API with no persistent
208            // storage.
209            let ds = MetricsDataSource::default();
210            let metrics = ds.populate_metrics();
211            let mut app = App::<_, Error>::with_state(AppState::from(ExtensibleDataSource::new(
212                ds,
213                state.clone(),
214            )));
215
216            // Initialize v0 and v1 status API.
217            register_api("status", &mut app, move |ver| {
218                status::define_api(&Default::default(), SequencerApiVersion::instance(), ver)
219                    .context("failed to define status api")
220            })?;
221
222            self.init_hotshot_modules(&mut app)?;
223
224            // Initialize hotshot events API if enabled
225            if self.hotshot_events.is_some() {
226                self.init_hotshot_events_module(&mut app)?;
227            }
228
229            tasks.spawn(
230                "API server",
231                self.listen(self.http.port, app, SequencerApiVersion::instance()),
232            );
233
234            (metrics, Box::new(NullEventConsumer), None)
235        } else {
236            // If no status or availability API is requested, we don't need metrics or a query
237            // service data source. The only app state is the HotShot handle, which we use to
238            // submit transactions.
239            //
240            // If we have no availability API, we cannot load a saved leaf from local storage,
241            // so we better have been provided the leaf ahead of time if we want it at all.
242            let mut app = App::<_, Error>::with_state(AppState::from(state.clone()));
243
244            self.init_hotshot_modules(&mut app)?;
245
246            // Initialize hotshot events API if enabled
247            if self.hotshot_events.is_some() {
248                self.init_hotshot_events_module(&mut app)?;
249            }
250
251            tasks.spawn(
252                "API server",
253                self.listen(self.http.port, app, SequencerApiVersion::instance()),
254            );
255
256            (Box::new(NoMetrics), Box::new(NullEventConsumer), None)
257        };
258
259        let ctx = init_context(metrics, consumer, storage.clone()).await?;
260        send_ctx
261            .send(ctx.clone())
262            .ok()
263            .context("API server exited without receiving context")?;
264        Ok(ctx.with_task_list(tasks))
265    }
266
267    async fn init_app_modules<N, P, D>(
268        &self,
269        ds: D,
270        state: ApiState<N, P>,
271        bind_version: SequencerApiVersion,
272    ) -> anyhow::Result<(
273        Box<dyn Metrics>,
274        Arc<StorageState<N, P, D>>,
275        App<AppState<StorageState<N, P, D>>, Error>,
276    )>
277    where
278        N: ConnectedNetwork<PubKey>,
279        P: SequencerPersistence,
280        D: SequencerDataSource + CatchupStorage + Send + Sync + 'static,
281    {
282        let metrics = ds.populate_metrics();
283        let ds = Arc::new(ExtensibleDataSource::new(ds, state.clone()));
284        let api_state: endpoints::AvailState<N, P, D> = ds.clone().into();
285        let mut app = App::<_, Error>::with_state(api_state);
286
287        // Initialize v0 and v1 status API.
288        register_api("status", &mut app, move |ver| {
289            status::define_api(&Default::default(), SequencerApiVersion::instance(), ver)
290                .context("failed to define status api")
291        })?;
292
293        // Initialize availability and node APIs (these both use the same data source).
294
295        // Note: We initialize two versions of the availability module: `availability/v0` and `availability/v1`.
296        // - `availability/v0/leaf/0` returns the old `Leaf1` type for backward compatibility.
297        // - `availability/v1/leaf/0` returns the new `Leaf2` type
298
299        register_api("availability", &mut app, move |ver| {
300            endpoints::availability(ver).context("failed to define availability api")
301        })?;
302
303        register_api("node", &mut app, move |ver| {
304            endpoints::node(ver).context("failed to define node api")
305        })?;
306
307        register_api("token", &mut app, move |ver| {
308            endpoints::token(ver).context("failed to define token api")
309        })?;
310
311        // Initialize submit API
312        if self.submit.is_some() {
313            register_api("submit", &mut app, move |ver| {
314                endpoints::submit::<_, _, _, SequencerApiVersion>(ver)
315                    .context("failed to define submit api")
316            })?;
317        }
318
319        tracing::info!("initializing catchup API");
320
321        register_api("catchup", &mut app, move |ver| {
322            endpoints::catchup(bind_version, ver).context("failed to define catchup api")
323        })?;
324
325        register_api("state-signature", &mut app, move |ver| {
326            endpoints::state_signature(bind_version, ver)
327                .context("failed to define state signature api")
328        })?;
329
330        if self.config.is_some() {
331            register_api("config", &mut app, move |ver| {
332                endpoints::config(bind_version, ver).context("failed to define config api")
333            })?;
334        }
335        Ok((metrics, ds, app))
336    }
337
338    async fn init_with_query_module_fs<N, P>(
339        &self,
340        query_opt: Query,
341        mod_opt: persistence::fs::Options,
342        state: ApiState<N, P>,
343        tasks: &mut TaskList,
344        bind_version: SequencerApiVersion,
345    ) -> anyhow::Result<(
346        Box<dyn Metrics>,
347        Box<dyn EventConsumer>,
348        Option<RequestResponseStorage>,
349    )>
350    where
351        N: ConnectedNetwork<PubKey>,
352        P: SequencerPersistence,
353    {
354        let ds = <fs::DataSource as SequencerDataSource>::create(
355            mod_opt,
356            provider(query_opt.peers, bind_version),
357            false,
358        )
359        .await?;
360
361        // Get the inner storage from the data source
362        let inner_storage = ds.inner();
363
364        let (metrics, ds, mut app) = self
365            .init_app_modules(ds, state.clone(), bind_version)
366            .await?;
367
368        // Initialize hotshot events API if enabled
369        if self.hotshot_events.is_some() {
370            self.init_hotshot_events_module(&mut app)?;
371        }
372
373        tasks.spawn("API server", self.listen(self.http.port, app, bind_version));
374        Ok((
375            metrics,
376            Box::new(ApiEventConsumer::from(ds)),
377            Some(RequestResponseStorage::Fs(inner_storage)),
378        ))
379    }
380
381    async fn init_with_query_module_sql<N, P>(
382        self,
383        query_opt: Query,
384        mod_opt: persistence::sql::Options,
385        state: ApiState<N, P>,
386        tasks: &mut TaskList,
387        bind_version: SequencerApiVersion,
388    ) -> anyhow::Result<(
389        Box<dyn Metrics>,
390        Box<dyn EventConsumer>,
391        Option<RequestResponseStorage>,
392    )>
393    where
394        N: ConnectedNetwork<PubKey>,
395        P: SequencerPersistence,
396    {
397        let mut provider = Provider::default();
398
399        // Use the database itself as a fetching provider: sometimes we can fetch data that is
400        // missing from the query service from ephemeral consensus storage.
401        let db_provider = mod_opt.clone().create().await?;
402        provider = provider
403            .with_leaf_provider(db_provider.clone())
404            .with_block_provider(db_provider.clone())
405            .with_vid_common_provider(db_provider);
406        // If that fails, fetch missing data from peers.
407        for peer in query_opt.peers {
408            tracing::info!("will fetch missing data from {peer}");
409            provider = provider.with_provider(QueryServiceProvider::new(peer, bind_version));
410        }
411
412        let ds = sql::DataSource::create(mod_opt.clone(), provider, false).await?;
413        let inner_storage = ds.inner();
414        let (metrics, ds, mut app) = self
415            .init_app_modules(ds, state.clone(), bind_version)
416            .await?;
417
418        if self.explorer.is_some() {
419            register_api("explorer", &mut app, move |ver| {
420                endpoints::explorer(ver).context("failed to define explorer api")
421            })?;
422        }
423
424        // Initialize database metadata API (SQL-only)
425        register_api("database", &mut app, move |ver| {
426            endpoints::database::<_, SequencerApiVersion>(ver)
427                .context("failed to define database api")
428        })?;
429
430        // Initialize merklized state module for block merkle tree
431
432        register_api("block-state", &mut app, move |ver| {
433            endpoints::merklized_state::<N, P, _, BlockMerkleTree, 3>(ver)
434                .context("failed to define block-state api")
435        })?;
436
437        // Initialize merklized state module for fee merkle tree
438
439        register_api("fee-state", &mut app, move |ver| {
440            endpoints::fee::<_, SequencerApiVersion>(ver).context("failed to define fee-state api")
441        })?;
442
443        register_api("reward-state", &mut app, move |ver| {
444            endpoints::reward::<
445                _,
446                SequencerApiVersion,
447                RewardMerkleTreeV1,
448                { RewardMerkleTreeV1::ARITY },
449            >(ver, RewardMerkleTreeVersion::V1)
450            .context("failed to define reward-state api")
451        })?;
452
453        // register new api for new reward merkle tree
454        register_api("reward-state-v2", &mut app, move |ver| {
455            endpoints::reward::<
456                _,
457                SequencerApiVersion,
458                RewardMerkleTreeV2,
459                { RewardMerkleTreeV2::ARITY },
460            >(ver, RewardMerkleTreeVersion::V2)
461            .context("failed to define reward-state api")
462        })?;
463
464        let get_node_state = {
465            let state = state.clone();
466            async move { state.node_state().await.clone() }
467        };
468        tasks.spawn(
469            "merklized state storage update loop",
470            update_state_storage_loop(ds.clone(), get_node_state),
471        );
472
473        // Initialize hotshot events API if enabled
474        if self.hotshot_events.is_some() {
475            self.init_hotshot_events_module(&mut app)?;
476        }
477
478        // Initialize light client API if enabled.
479        if self.light_client.is_some() {
480            register_api("light-client", &mut app, move |ver| {
481                light_client::define_api::<_, SequencerApiVersion>(Default::default(), ver)
482                    .context("failed to define light client api")
483            })?;
484        }
485
486        tasks.spawn(
487            "API server",
488            self.listen(self.http.port, app, SequencerApiVersion::instance()),
489        );
490        Ok((
491            metrics,
492            Box::new(ApiEventConsumer::from(ds)),
493            Some(RequestResponseStorage::Sql(inner_storage)),
494        ))
495    }
496
497    /// Initialize the modules for interacting with HotShot.
498    ///
499    /// This function adds the `submit`, `state`, and `state_signature` API modules to the given
500    /// app. These modules only require a HotShot handle as state, and thus they work with any data
501    /// source, so initialization is the same no matter what mode the service is running in.
502    fn init_hotshot_modules<N, P, S>(&self, app: &mut App<S, Error>) -> anyhow::Result<()>
503    where
504        S: 'static + Send + Sync + ReadState,
505        P: SequencerPersistence,
506        S::State: Send
507            + Sync
508            + SubmitDataSource<N, P>
509            + StateSignatureDataSource<N>
510            + NodeStateDataSource
511            + CatchupDataSource
512            + HotShotConfigDataSource,
513        N: ConnectedNetwork<PubKey>,
514    {
515        let bind_version = SequencerApiVersion::instance();
516        // Initialize submit API
517        if self.submit.is_some() {
518            register_api("submit", app, move |ver| {
519                endpoints::submit::<_, _, _, SequencerApiVersion>(ver)
520                    .context("failed to define submit api")
521            })?;
522        }
523
524        // Initialize state API.
525        if self.catchup.is_some() {
526            tracing::info!("initializing state API");
527
528            register_api("catchup", app, move |ver| {
529                endpoints::catchup(bind_version, ver).context("failed to define catchup api")
530            })?;
531        }
532
533        register_api("state-signature", app, move |ver| {
534            endpoints::state_signature(bind_version, ver)
535                .context("failed to define state signature api")
536        })?;
537
538        if self.config.is_some() {
539            register_api("config", app, move |ver| {
540                endpoints::config(bind_version, ver).context("failed to define config api")
541            })?;
542        }
543
544        Ok(())
545    }
546
547    /// Initialize the hotshot events API module if enabled.
548    ///
549    /// This function adds the hotshot events API module to the given app if the hotshot_events
550    /// option is enabled. This module requires the app state to implement EventsSource.
551    fn init_hotshot_events_module<S>(&self, app: &mut App<S, Error>) -> anyhow::Result<()>
552    where
553        S: 'static + Send + Sync + ReadState,
554        S::State: Send + Sync + hotshot_events_service::events_source::EventsSource<SeqTypes>,
555    {
556        tracing::info!("Initializing HotShot events API at /hotshot-events");
557        register_api("hotshot-events", app, move |ver| {
558            hotshot_events_service::events::define_api::<_, _, SequencerApiVersion>(
559                &hotshot_events_service::events::Options::default(),
560                ver,
561            )
562            .with_context(|| "failed to define the HotShot events API")
563        })?;
564
565        Ok(())
566    }
567
568    fn listen<S, E, ApiVer>(
569        &self,
570        port: u16,
571        app: App<S, E>,
572        bind_version: ApiVer,
573    ) -> impl Future<Output = anyhow::Result<()>> + use<S, E, ApiVer>
574    where
575        S: Send + Sync + 'static,
576        E: Send + Sync + tide_disco::Error,
577        ApiVer: StaticVersionType + 'static,
578    {
579        let max_connections = self.http.max_connections;
580
581        async move {
582            if let Some(limit) = max_connections {
583                app.serve(RateLimitListener::with_port(port, limit), bind_version)
584                    .await?;
585            } else {
586                app.serve(format!("0.0.0.0:{port}"), bind_version).await?;
587            }
588            Ok(())
589        }
590    }
591}
592
593/// The minimal HTTP API.
594///
595/// The API automatically includes health and version endpoints. Additional API modules can be
596/// added by including the query-api or submit-api modules.
597#[derive(Parser, Clone, Copy, Debug)]
598pub struct Http {
599    /// Port that the HTTP API will use.
600    #[clap(long, env = "ESPRESSO_SEQUENCER_API_PORT", default_value = "8080")]
601    pub port: u16,
602
603    /// Maximum number of concurrent HTTP connections the server will allow.
604    ///
605    /// Connections exceeding this will receive and immediate 429 response and be closed.
606    ///
607    /// Leave unset for no connection limit.
608    #[clap(long, env = "ESPRESSO_SEQUENCER_MAX_CONNECTIONS")]
609    pub max_connections: Option<usize>,
610}
611
612impl Http {
613    /// Default options for running a web server on the given port.
614    pub fn with_port(port: u16) -> Self {
615        Self {
616            port,
617            max_connections: None,
618        }
619    }
620}
621
622/// Options for the submission API module.
623#[derive(Parser, Clone, Copy, Debug, Default)]
624pub struct Submit;
625
626/// Options for the status API module.
627#[derive(Parser, Clone, Copy, Debug, Default)]
628pub struct Status;
629
630/// Options for the catchup API module.
631#[derive(Parser, Clone, Copy, Debug, Default)]
632pub struct Catchup;
633
634/// Options for the config API module.
635#[derive(Parser, Clone, Copy, Debug, Default)]
636pub struct Config;
637
638/// Options for the query API module.
639#[derive(Parser, Clone, Debug, Default)]
640pub struct Query {
641    /// Peers for fetching missing data for the query service.
642    #[clap(long, env = "ESPRESSO_SEQUENCER_API_PEERS", value_delimiter = ',')]
643    pub peers: Vec<Url>,
644}
645
646/// Options for the state API module.
647#[derive(Parser, Clone, Copy, Debug, Default)]
648pub struct State;
649
650/// Options for the Hotshot events streaming API module.
651#[derive(Parser, Clone, Copy, Debug, Default)]
652pub struct HotshotEvents;
653
654/// Options for the explorer API module.
655#[derive(Parser, Clone, Copy, Debug, Default)]
656pub struct Explorer;
657
658/// Options for the light client API module.
659#[derive(Parser, Clone, Copy, Debug, Default)]
660pub struct LightClient;
661
662/// Registers two versions (v0 and v1) of the same API module under the given path.
663fn register_api<E, S, F, ModuleError, ModuleVersion>(
664    path: &'static str,
665    app: &mut App<S, E>,
666    f: F,
667) -> anyhow::Result<()>
668where
669    S: 'static + Send + Sync,
670    E: Send + Sync + 'static + tide_disco::Error + From<ModuleError>,
671    ModuleError: Send + Sync + 'static,
672    ModuleVersion: StaticVersionType + 'static,
673    F: Fn(semver::Version) -> anyhow::Result<Api<S, ModuleError, ModuleVersion>>,
674{
675    let v0 = "0.0.1".parse().unwrap();
676    let v1 = "1.1.0".parse().unwrap();
677    let result1 = f(v0)?;
678    let result2 = f(v1)?;
679
680    app.register_module(path, result1)?;
681    app.register_module(path, result2)?;
682
683    Ok(())
684}