1use std::sync::Arc;
4
5use ::light_client::{state::LightClientOptions, storage::LightClientSqliteOptions};
6use anyhow::{Context, bail};
7use clap::Parser;
8use espresso_types::{
9 BlockMerkleTree, PubKey, SeqTypes,
10 v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, SequencerPersistence},
11 v0_3::RewardMerkleTreeV1,
12 v0_4::RewardMerkleTreeV2,
13};
14use futures::{
15 channel::oneshot,
16 future::{BoxFuture, Future},
17};
18use hotshot_query_service::{
19 ApiState as AppState, Error,
20 data_source::{ExtensibleDataSource, MetricsDataSource},
21 status::{self, UpdateStatusData},
22};
23use hotshot_types::traits::{
24 metrics::{Metrics, NoMetrics},
25 network::ConnectedNetwork,
26};
27use jf_merkle_tree_compat::MerkleTreeScheme;
28use tide_disco::{Api, App, Url, listener::RateLimitListener, method::ReadState};
29use vbs::version::StaticVersionType;
30
31use super::{
32 ApiState, StorageState,
33 data_source::{
34 CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource, Provider,
35 PruningDataSource, SequencerDataSource, StateSignatureDataSource, SubmitDataSource,
36 provider,
37 },
38 endpoints, fs, light_client, sql,
39 state::NodeApiStateImpl,
40 update::ApiEventConsumer,
41};
42use crate::{
43 SequencerApiVersion,
44 api::{LightClientProvider, endpoints::RewardMerkleTreeVersion},
45 catchup::CatchupStorage,
46 context::{SequencerContext, TaskList},
47 options::PublicNodeConfig,
48 persistence,
49 request_response::data_source::Storage as RequestResponseStorage,
50 state::update_state_storage_loop,
51};
52
53#[derive(Clone, Debug)]
54pub struct Options {
55 pub http: Http,
56 pub query: Option<Query>,
57 pub submit: Option<Submit>,
58 pub status: Option<Status>,
59 pub catchup: Option<Catchup>,
60 pub config: Option<Config>,
61 pub hotshot_events: Option<HotshotEvents>,
62 pub explorer: Option<Explorer>,
63 pub light_client: Option<LightClient>,
64 pub storage_fs: Option<persistence::fs::Options>,
65 pub storage_sql: Option<persistence::sql::Options>,
66 pub public_node_config: Option<Box<PublicNodeConfig>>,
67}
68
69impl From<Http> for Options {
70 fn from(http: Http) -> Self {
71 Self {
72 http,
73 query: None,
74 submit: None,
75 status: None,
76 catchup: None,
77 config: None,
78 hotshot_events: None,
79 explorer: None,
80 light_client: None,
81 storage_fs: None,
82 storage_sql: None,
83 public_node_config: None,
84 }
85 }
86}
87
88impl Options {
89 pub fn with_port(port: u16) -> Self {
91 Http::with_port(port).into()
92 }
93
94 pub fn query_sql(mut self, query: Query, storage: persistence::sql::Options) -> Self {
96 self.query = Some(query);
97 self.storage_sql = Some(storage);
98 self
99 }
100
101 pub fn query_fs(mut self, query: Query, storage: persistence::fs::Options) -> Self {
103 self.query = Some(query);
104 self.storage_fs = Some(storage);
105 self
106 }
107
108 pub fn submit(mut self, opt: Submit) -> Self {
110 self.submit = Some(opt);
111 self
112 }
113
114 pub fn status(mut self, opt: Status) -> Self {
116 self.status = Some(opt);
117 self
118 }
119
120 pub fn catchup(mut self, opt: Catchup) -> Self {
122 self.catchup = Some(opt);
123 self
124 }
125
126 pub fn config(mut self, opt: Config) -> Self {
128 self.config = Some(opt);
129 self
130 }
131
132 pub fn public_node_config(mut self, c: PublicNodeConfig) -> Self {
136 self.public_node_config = Some(Box::new(c));
137 self
138 }
139
140 pub fn hotshot_events(mut self, opt: HotshotEvents) -> Self {
142 self.hotshot_events = Some(opt);
143 self
144 }
145
146 pub fn explorer(mut self, opt: Explorer) -> Self {
148 self.explorer = Some(opt);
149 self
150 }
151
152 pub fn light_client(mut self, opt: LightClient) -> Self {
154 self.light_client = Some(opt);
155 self
156 }
157
158 pub fn has_query_module(&self) -> bool {
160 self.query.is_some() && (self.storage_fs.is_some() || self.storage_sql.is_some())
161 }
162
163 pub async fn serve<N, P, F>(mut self, init_context: F) -> anyhow::Result<SequencerContext<N, P>>
169 where
170 N: ConnectedNetwork<PubKey>,
171 P: SequencerPersistence,
172 F: FnOnce(
173 Box<dyn Metrics>,
174 Box<dyn EventConsumer>,
175 Option<RequestResponseStorage>,
176 ) -> BoxFuture<'static, anyhow::Result<SequencerContext<N, P>>>,
177 {
178 let (send_ctx, recv_ctx) = oneshot::channel();
182 let state = ApiState::new(async move {
183 recv_ctx
184 .await
185 .expect("context initialized and sent over channel")
186 });
187 let mut tasks = TaskList::default();
188
189 #[allow(clippy::type_complexity)]
192 let (metrics, consumer, storage): (
193 Box<dyn Metrics>,
194 Box<dyn EventConsumer>,
195 Option<RequestResponseStorage>,
196 ) = if let Some(query_opt) = self.query.take() {
197 if let Some(opt) = self.storage_sql.take() {
198 self.init_with_query_module_sql(
199 query_opt,
200 opt,
201 state,
202 &mut tasks,
203 SequencerApiVersion::instance(),
204 )
205 .await?
206 } else if let Some(opt) = self.storage_fs.take() {
207 self.init_with_query_module_fs(
208 query_opt,
209 opt,
210 state,
211 &mut tasks,
212 SequencerApiVersion::instance(),
213 )
214 .await?
215 } else {
216 bail!("query module requested but not storage provided");
217 }
218 } else if self.status.is_some() {
219 let ds = MetricsDataSource::default();
223 let metrics = ds.populate_metrics();
224 let mut app = App::<_, Error>::with_state(AppState::from(ExtensibleDataSource::new(
225 ds,
226 state.clone(),
227 )));
228
229 register_api("status", &mut app, move |ver| {
231 status::define_api(&Default::default(), SequencerApiVersion::instance(), ver)
232 .context("failed to define status api")
233 })?;
234
235 self.init_hotshot_modules(&mut app)?;
236
237 if self.hotshot_events.is_some() {
239 self.init_hotshot_events_module(&mut app)?;
240 }
241
242 tasks.spawn(
243 "API server",
244 self.listen(self.http.port, app, SequencerApiVersion::instance()),
245 );
246
247 if self.http.axum_port.is_some() {
250 tracing::warn!("Axum reward API not available in status-only mode");
251 }
252
253 if self.http.tonic_port.is_some() {
254 tracing::warn!("gRPC reward API not available in status-only mode");
255 }
256
257 (metrics, Box::new(NullEventConsumer), None)
258 } else {
259 let mut app = App::<_, Error>::with_state(AppState::from(state.clone()));
266
267 self.init_hotshot_modules(&mut app)?;
268
269 if self.hotshot_events.is_some() {
271 self.init_hotshot_events_module(&mut app)?;
272 }
273
274 tasks.spawn(
275 "API server",
276 self.listen(self.http.port, app, SequencerApiVersion::instance()),
277 );
278
279 (Box::new(NoMetrics), Box::new(NullEventConsumer), None)
280 };
281
282 let ctx = init_context(metrics, consumer, storage.clone()).await?;
283 send_ctx
284 .send(ctx.clone())
285 .ok()
286 .context("API server exited without receiving context")?;
287 Ok(ctx.with_task_list(tasks))
288 }
289
290 async fn init_app_modules<N, P, D>(
291 &self,
292 ds: D,
293 state: ApiState<N, P>,
294 bind_version: SequencerApiVersion,
295 ) -> anyhow::Result<(
296 Box<dyn Metrics>,
297 Arc<StorageState<N, P, D>>,
298 App<AppState<StorageState<N, P, D>>, Error>,
299 )>
300 where
301 N: ConnectedNetwork<PubKey>,
302 P: SequencerPersistence,
303 D: SequencerDataSource + CatchupStorage + PruningDataSource + Send + Sync + 'static,
304 {
305 let metrics = ds.populate_metrics();
306 let ds = Arc::new(ExtensibleDataSource::new(ds, state.clone()));
307 let api_state: endpoints::AvailState<N, P, D> = ds.clone().into();
308 let mut app = App::<_, Error>::with_state(api_state);
309
310 register_api("status", &mut app, move |ver| {
312 status::define_api(&Default::default(), SequencerApiVersion::instance(), ver)
313 .context("failed to define status api")
314 })?;
315
316 register_api("availability", &mut app, move |ver| {
323 endpoints::availability(ver).context("failed to define availability api")
324 })?;
325
326 register_api("node", &mut app, move |ver| {
327 endpoints::node(ver).context("failed to define node api")
328 })?;
329
330 register_api("token", &mut app, move |ver| {
331 endpoints::token(ver).context("failed to define token api")
332 })?;
333
334 if self.submit.is_some() {
336 register_api("submit", &mut app, move |ver| {
337 endpoints::submit::<_, _, _, SequencerApiVersion>(ver)
338 .context("failed to define submit api")
339 })?;
340 }
341
342 tracing::info!("initializing catchup API");
343
344 register_api("catchup", &mut app, move |ver| {
345 endpoints::catchup(bind_version, ver).context("failed to define catchup api")
346 })?;
347
348 register_api("state-signature", &mut app, move |ver| {
349 endpoints::state_signature(bind_version, ver)
350 .context("failed to define state signature api")
351 })?;
352
353 if self.config.is_some() {
354 let node_cfg = self.public_node_config.as_deref().cloned();
355 register_api("config", &mut app, move |ver| {
356 endpoints::config(bind_version, ver, node_cfg.clone())
357 .context("failed to define config api")
358 })?;
359 }
360 Ok((metrics, ds, app))
361 }
362
363 async fn init_with_query_module_fs<N, P>(
364 &self,
365 query_opt: Query,
366 mod_opt: persistence::fs::Options,
367 state: ApiState<N, P>,
368 tasks: &mut TaskList,
369 bind_version: SequencerApiVersion,
370 ) -> anyhow::Result<(
371 Box<dyn Metrics>,
372 Box<dyn EventConsumer>,
373 Option<RequestResponseStorage>,
374 )>
375 where
376 N: ConnectedNetwork<PubKey>,
377 P: SequencerPersistence,
378 {
379 let ds = <fs::DataSource as SequencerDataSource>::create(
380 mod_opt,
381 provider(
382 query_opt.peers,
383 &state,
384 query_opt.light_client,
385 query_opt.light_client_db,
386 )
387 .await?,
388 false,
389 )
390 .await?;
391
392 let inner_storage = ds.inner();
394
395 let (metrics, ds, mut app) = self
396 .init_app_modules(ds, state.clone(), bind_version)
397 .await?;
398
399 if self.hotshot_events.is_some() {
401 self.init_hotshot_events_module(&mut app)?;
402 }
403
404 tasks.spawn("API server", self.listen(self.http.port, app, bind_version));
405
406 if self.http.axum_port.is_some() {
409 tracing::warn!("Axum reward API not available with filesystem storage");
410 }
411
412 if self.http.tonic_port.is_some() {
413 tracing::warn!("gRPC reward API not available with filesystem storage");
414 }
415
416 Ok((
417 metrics,
418 Box::new(ApiEventConsumer::from(ds)),
419 Some(RequestResponseStorage::Fs(inner_storage)),
420 ))
421 }
422
423 async fn init_with_query_module_sql<N, P>(
424 self,
425 query_opt: Query,
426 mod_opt: persistence::sql::Options,
427 state: ApiState<N, P>,
428 tasks: &mut TaskList,
429 bind_version: SequencerApiVersion,
430 ) -> anyhow::Result<(
431 Box<dyn Metrics>,
432 Box<dyn EventConsumer>,
433 Option<RequestResponseStorage>,
434 )>
435 where
436 N: ConnectedNetwork<PubKey>,
437 P: SequencerPersistence,
438 {
439 let mut provider = Provider::default();
440
441 let db_provider = mod_opt.clone().create().await?;
444 provider = provider
445 .with_block_provider(db_provider.clone())
446 .with_vid_common_provider(db_provider);
447 provider = provider.with_provider(
449 LightClientProvider::new(
450 query_opt.peers,
451 state.clone(),
452 query_opt.light_client,
453 query_opt.light_client_db,
454 )
455 .await?,
456 );
457
458 let ds = sql::DataSource::create(mod_opt.clone(), provider, false).await?;
459 let inner_storage = ds.inner();
460 let (metrics, ds, mut app) = self
461 .init_app_modules(ds, state.clone(), bind_version)
462 .await?;
463
464 if self.explorer.is_some() {
465 register_api("explorer", &mut app, move |ver| {
466 endpoints::explorer(ver).context("failed to define explorer api")
467 })?;
468 }
469
470 register_api("database", &mut app, move |ver| {
472 endpoints::database::<_, SequencerApiVersion>(ver)
473 .context("failed to define database api")
474 })?;
475
476 register_api("block-state", &mut app, move |ver| {
479 endpoints::merklized_state::<N, P, _, BlockMerkleTree, 3>(ver)
480 .context("failed to define block-state api")
481 })?;
482
483 register_api("fee-state", &mut app, move |ver| {
486 endpoints::fee::<_, SequencerApiVersion>(ver).context("failed to define fee-state api")
487 })?;
488
489 register_api("reward-state", &mut app, move |ver| {
490 endpoints::reward::<
491 _,
492 SequencerApiVersion,
493 RewardMerkleTreeV1,
494 { RewardMerkleTreeV1::ARITY },
495 >(ver, RewardMerkleTreeVersion::V1)
496 .context("failed to define reward-state api")
497 })?;
498
499 register_api("reward-state-v2", &mut app, move |ver| {
501 endpoints::reward::<
502 _,
503 SequencerApiVersion,
504 RewardMerkleTreeV2,
505 { RewardMerkleTreeV2::ARITY },
506 >(ver, RewardMerkleTreeVersion::V2)
507 .context("failed to define reward-state api")
508 })?;
509
510 let get_node_state = {
511 let state = state.clone();
512 async move { state.node_state().await.clone() }
513 };
514 tasks.spawn(
515 "merklized state storage update loop",
516 update_state_storage_loop(ds.clone(), get_node_state),
517 );
518
519 if self.hotshot_events.is_some() {
521 self.init_hotshot_events_module(&mut app)?;
522 }
523
524 if self.light_client.is_some() {
526 register_api("light-client", &mut app, move |ver| {
527 light_client::define_api::<_, SequencerApiVersion>(Default::default(), ver)
528 .context("failed to define light client api")
529 })?;
530 }
531
532 tasks.spawn(
533 "API server",
534 self.listen(self.http.port, app, SequencerApiVersion::instance()),
535 );
536
537 if let Some(axum_port) = self.http.axum_port {
539 let ds_for_axum = ds.clone();
540 tasks.spawn("Axum API server", async move {
541 let state = NodeApiStateImpl::new(ds_for_axum);
542 if let Err(e) = espresso_api::serve_axum(axum_port, state).await {
543 tracing::error!("Axum server error: {}", e);
544 }
545 });
546 }
547
548 if let Some(tonic_port) = self.http.tonic_port {
549 let ds_for_tonic = ds.clone();
550 tasks.spawn("Tonic gRPC server", async move {
551 let state = NodeApiStateImpl::new(ds_for_tonic);
552 if let Err(e) = espresso_api::serve_tonic(tonic_port, state).await {
553 tracing::error!("Tonic gRPC server error: {}", e);
554 }
555 });
556 }
557
558 Ok((
559 metrics,
560 Box::new(ApiEventConsumer::from(ds)),
561 Some(RequestResponseStorage::Sql(inner_storage)),
562 ))
563 }
564
565 fn init_hotshot_modules<N, P, S>(&self, app: &mut App<S, Error>) -> anyhow::Result<()>
571 where
572 S: 'static + Send + Sync + ReadState,
573 P: SequencerPersistence,
574 S::State: Send
575 + Sync
576 + SubmitDataSource<N, P>
577 + StateSignatureDataSource<N>
578 + NodeStateDataSource
579 + CatchupDataSource
580 + HotShotConfigDataSource,
581 N: ConnectedNetwork<PubKey>,
582 {
583 let bind_version = SequencerApiVersion::instance();
584 if self.submit.is_some() {
586 register_api("submit", app, move |ver| {
587 endpoints::submit::<_, _, _, SequencerApiVersion>(ver)
588 .context("failed to define submit api")
589 })?;
590 }
591
592 if self.catchup.is_some() {
594 tracing::info!("initializing state API");
595
596 register_api("catchup", app, move |ver| {
597 endpoints::catchup(bind_version, ver).context("failed to define catchup api")
598 })?;
599 }
600
601 register_api("state-signature", app, move |ver| {
602 endpoints::state_signature(bind_version, ver)
603 .context("failed to define state signature api")
604 })?;
605
606 if self.config.is_some() {
607 let node_cfg = self.public_node_config.as_deref().cloned();
608 register_api("config", app, move |ver| {
609 endpoints::config(bind_version, ver, node_cfg.clone())
610 .context("failed to define config api")
611 })?;
612 }
613
614 Ok(())
615 }
616
617 fn init_hotshot_events_module<S>(&self, app: &mut App<S, Error>) -> anyhow::Result<()>
622 where
623 S: 'static + Send + Sync + ReadState,
624 S::State: Send + Sync + hotshot_events_service::events_source::EventsSource<SeqTypes>,
625 {
626 tracing::info!("Initializing HotShot events API at /hotshot-events");
627 register_api("hotshot-events", app, move |ver| {
628 hotshot_events_service::events::define_api::<_, _, SequencerApiVersion>(
629 &hotshot_events_service::events::Options::default(),
630 ver,
631 )
632 .with_context(|| "failed to define the HotShot events API")
633 })?;
634
635 Ok(())
636 }
637
638 fn listen<S, E, ApiVer>(
639 &self,
640 port: u16,
641 app: App<S, E>,
642 bind_version: ApiVer,
643 ) -> impl Future<Output = anyhow::Result<()>> + use<S, E, ApiVer>
644 where
645 S: Send + Sync + 'static,
646 E: Send + Sync + tide_disco::Error,
647 ApiVer: StaticVersionType + 'static,
648 {
649 let max_connections = self.http.max_connections;
650
651 async move {
652 if let Some(limit) = max_connections {
653 app.serve(RateLimitListener::with_port(port, limit), bind_version)
654 .await?;
655 } else {
656 app.serve(format!("0.0.0.0:{port}"), bind_version).await?;
657 }
658 Ok(())
659 }
660 }
661}
662
663#[derive(Parser, Clone, Copy, Debug)]
668pub struct Http {
669 #[clap(long, env = "ESPRESSO_NODE_API_PORT", default_value = "8080")]
671 pub port: u16,
672
673 #[clap(long, env = "ESPRESSO_NODE_API_MAX_CONNECTIONS")]
679 pub max_connections: Option<usize>,
680
681 #[clap(long, env = "ESPRESSO_NODE_AXUM_PORT")]
683 pub axum_port: Option<u16>,
684
685 #[clap(long, env = "ESPRESSO_NODE_TONIC_PORT")]
687 pub tonic_port: Option<u16>,
688}
689
690impl Http {
691 pub fn with_port(port: u16) -> Self {
693 Self {
694 port,
695 max_connections: None,
696 axum_port: None,
697 tonic_port: None,
698 }
699 }
700}
701
702#[derive(Parser, Clone, Copy, Debug, Default)]
704pub struct Submit;
705
706#[derive(Parser, Clone, Copy, Debug, Default)]
708pub struct Status;
709
710#[derive(Parser, Clone, Copy, Debug, Default)]
712pub struct Catchup;
713
714#[derive(Parser, Clone, Copy, Debug, Default)]
716pub struct Config;
717
718#[derive(Parser, Clone, Debug, Default)]
720pub struct Query {
721 #[clap(long, env = "ESPRESSO_NODE_API_PEERS", value_delimiter = ',')]
723 pub peers: Vec<Url>,
724
725 #[clap(flatten)]
727 pub light_client: LightClientOptions,
728
729 #[clap(flatten)]
731 pub light_client_db: LightClientSqliteOptions,
732}
733
734#[cfg(test)]
735impl Query {
736 pub fn test() -> Self {
737 Self {
738 light_client: LightClientOptions {
739 decaf: true,
740 ..Default::default()
741 },
742 ..Default::default()
743 }
744 }
745}
746
747#[derive(Parser, Clone, Copy, Debug, Default)]
749pub struct State;
750
751#[derive(Parser, Clone, Copy, Debug, Default)]
753pub struct HotshotEvents;
754
755#[derive(Parser, Clone, Copy, Debug, Default)]
757pub struct Explorer;
758
759#[derive(Parser, Clone, Copy, Debug, Default)]
761pub struct LightClient;
762
763fn register_api<E, S, F, ModuleError, ModuleVersion>(
765 path: &'static str,
766 app: &mut App<S, E>,
767 f: F,
768) -> anyhow::Result<()>
769where
770 S: 'static + Send + Sync,
771 E: Send + Sync + 'static + tide_disco::Error + From<ModuleError>,
772 ModuleError: Send + Sync + 'static,
773 ModuleVersion: StaticVersionType + 'static,
774 F: Fn(semver::Version) -> anyhow::Result<Api<S, ModuleError, ModuleVersion>>,
775{
776 let v0 = "0.0.1".parse().unwrap();
777 let v1 = "1.1.0".parse().unwrap();
778 let result1 = f(v0)?;
779 let result2 = f(v1)?;
780
781 app.register_module(path, result1)?;
782 app.register_module(path, result2)?;
783
784 Ok(())
785}