1use std::sync::Arc;
4
5use anyhow::{Context, bail};
6use clap::Parser;
7use espresso_types::{
8 BlockMerkleTree, PubKey, SeqTypes,
9 v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, SequencerPersistence},
10 v0_3::RewardMerkleTreeV1,
11 v0_4::RewardMerkleTreeV2,
12};
13use futures::{
14 channel::oneshot,
15 future::{BoxFuture, Future},
16};
17use hotshot_query_service::{
18 ApiState as AppState, Error,
19 data_source::{ExtensibleDataSource, MetricsDataSource},
20 fetching::provider::QueryServiceProvider,
21 status::{self, UpdateStatusData},
22};
23use hotshot_types::traits::{
24 metrics::{Metrics, NoMetrics},
25 network::ConnectedNetwork,
26};
27use jf_merkle_tree_compat::MerkleTreeScheme;
28use tide_disco::{Api, App, Url, listener::RateLimitListener, method::ReadState};
29use vbs::version::StaticVersionType;
30
31use super::{
32 ApiState, StorageState,
33 data_source::{
34 CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource, Provider,
35 SequencerDataSource, StateSignatureDataSource, SubmitDataSource, provider,
36 },
37 endpoints, fs, light_client, sql,
38 update::ApiEventConsumer,
39};
40use crate::{
41 SequencerApiVersion,
42 api::endpoints::RewardMerkleTreeVersion,
43 catchup::CatchupStorage,
44 context::{SequencerContext, TaskList},
45 persistence,
46 request_response::data_source::Storage as RequestResponseStorage,
47 state::update_state_storage_loop,
48};
49
50#[derive(Clone, Debug)]
51pub struct Options {
52 pub http: Http,
53 pub query: Option<Query>,
54 pub submit: Option<Submit>,
55 pub status: Option<Status>,
56 pub catchup: Option<Catchup>,
57 pub config: Option<Config>,
58 pub hotshot_events: Option<HotshotEvents>,
59 pub explorer: Option<Explorer>,
60 pub light_client: Option<LightClient>,
61 pub storage_fs: Option<persistence::fs::Options>,
62 pub storage_sql: Option<persistence::sql::Options>,
63}
64
65impl From<Http> for Options {
66 fn from(http: Http) -> Self {
67 Self {
68 http,
69 query: None,
70 submit: None,
71 status: None,
72 catchup: None,
73 config: None,
74 hotshot_events: None,
75 explorer: None,
76 light_client: None,
77 storage_fs: None,
78 storage_sql: None,
79 }
80 }
81}
82
83impl Options {
84 pub fn with_port(port: u16) -> Self {
86 Http::with_port(port).into()
87 }
88
89 pub fn query_sql(mut self, query: Query, storage: persistence::sql::Options) -> Self {
91 self.query = Some(query);
92 self.storage_sql = Some(storage);
93 self
94 }
95
96 pub fn query_fs(mut self, query: Query, storage: persistence::fs::Options) -> Self {
98 self.query = Some(query);
99 self.storage_fs = Some(storage);
100 self
101 }
102
103 pub fn submit(mut self, opt: Submit) -> Self {
105 self.submit = Some(opt);
106 self
107 }
108
109 pub fn status(mut self, opt: Status) -> Self {
111 self.status = Some(opt);
112 self
113 }
114
115 pub fn catchup(mut self, opt: Catchup) -> Self {
117 self.catchup = Some(opt);
118 self
119 }
120
121 pub fn config(mut self, opt: Config) -> Self {
123 self.config = Some(opt);
124 self
125 }
126
127 pub fn hotshot_events(mut self, opt: HotshotEvents) -> Self {
129 self.hotshot_events = Some(opt);
130 self
131 }
132
133 pub fn explorer(mut self, opt: Explorer) -> Self {
135 self.explorer = Some(opt);
136 self
137 }
138
139 pub fn light_client(mut self, opt: LightClient) -> Self {
141 self.light_client = Some(opt);
142 self
143 }
144
145 pub fn has_query_module(&self) -> bool {
147 self.query.is_some() && (self.storage_fs.is_some() || self.storage_sql.is_some())
148 }
149
150 pub async fn serve<N, P, F>(mut self, init_context: F) -> anyhow::Result<SequencerContext<N, P>>
156 where
157 N: ConnectedNetwork<PubKey>,
158 P: SequencerPersistence,
159 F: FnOnce(
160 Box<dyn Metrics>,
161 Box<dyn EventConsumer>,
162 Option<RequestResponseStorage>,
163 ) -> BoxFuture<'static, anyhow::Result<SequencerContext<N, P>>>,
164 {
165 let (send_ctx, recv_ctx) = oneshot::channel();
169 let state = ApiState::new(async move {
170 recv_ctx
171 .await
172 .expect("context initialized and sent over channel")
173 });
174 let mut tasks = TaskList::default();
175
176 #[allow(clippy::type_complexity)]
179 let (metrics, consumer, storage): (
180 Box<dyn Metrics>,
181 Box<dyn EventConsumer>,
182 Option<RequestResponseStorage>,
183 ) = if let Some(query_opt) = self.query.take() {
184 if let Some(opt) = self.storage_sql.take() {
185 self.init_with_query_module_sql(
186 query_opt,
187 opt,
188 state,
189 &mut tasks,
190 SequencerApiVersion::instance(),
191 )
192 .await?
193 } else if let Some(opt) = self.storage_fs.take() {
194 self.init_with_query_module_fs(
195 query_opt,
196 opt,
197 state,
198 &mut tasks,
199 SequencerApiVersion::instance(),
200 )
201 .await?
202 } else {
203 bail!("query module requested but not storage provided");
204 }
205 } else if self.status.is_some() {
206 let ds = MetricsDataSource::default();
210 let metrics = ds.populate_metrics();
211 let mut app = App::<_, Error>::with_state(AppState::from(ExtensibleDataSource::new(
212 ds,
213 state.clone(),
214 )));
215
216 register_api("status", &mut app, move |ver| {
218 status::define_api(&Default::default(), SequencerApiVersion::instance(), ver)
219 .context("failed to define status api")
220 })?;
221
222 self.init_hotshot_modules(&mut app)?;
223
224 if self.hotshot_events.is_some() {
226 self.init_hotshot_events_module(&mut app)?;
227 }
228
229 tasks.spawn(
230 "API server",
231 self.listen(self.http.port, app, SequencerApiVersion::instance()),
232 );
233
234 (metrics, Box::new(NullEventConsumer), None)
235 } else {
236 let mut app = App::<_, Error>::with_state(AppState::from(state.clone()));
243
244 self.init_hotshot_modules(&mut app)?;
245
246 if self.hotshot_events.is_some() {
248 self.init_hotshot_events_module(&mut app)?;
249 }
250
251 tasks.spawn(
252 "API server",
253 self.listen(self.http.port, app, SequencerApiVersion::instance()),
254 );
255
256 (Box::new(NoMetrics), Box::new(NullEventConsumer), None)
257 };
258
259 let ctx = init_context(metrics, consumer, storage.clone()).await?;
260 send_ctx
261 .send(ctx.clone())
262 .ok()
263 .context("API server exited without receiving context")?;
264 Ok(ctx.with_task_list(tasks))
265 }
266
267 async fn init_app_modules<N, P, D>(
268 &self,
269 ds: D,
270 state: ApiState<N, P>,
271 bind_version: SequencerApiVersion,
272 ) -> anyhow::Result<(
273 Box<dyn Metrics>,
274 Arc<StorageState<N, P, D>>,
275 App<AppState<StorageState<N, P, D>>, Error>,
276 )>
277 where
278 N: ConnectedNetwork<PubKey>,
279 P: SequencerPersistence,
280 D: SequencerDataSource + CatchupStorage + Send + Sync + 'static,
281 {
282 let metrics = ds.populate_metrics();
283 let ds = Arc::new(ExtensibleDataSource::new(ds, state.clone()));
284 let api_state: endpoints::AvailState<N, P, D> = ds.clone().into();
285 let mut app = App::<_, Error>::with_state(api_state);
286
287 register_api("status", &mut app, move |ver| {
289 status::define_api(&Default::default(), SequencerApiVersion::instance(), ver)
290 .context("failed to define status api")
291 })?;
292
293 register_api("availability", &mut app, move |ver| {
300 endpoints::availability(ver).context("failed to define availability api")
301 })?;
302
303 register_api("node", &mut app, move |ver| {
304 endpoints::node(ver).context("failed to define node api")
305 })?;
306
307 register_api("token", &mut app, move |ver| {
308 endpoints::token(ver).context("failed to define token api")
309 })?;
310
311 if self.submit.is_some() {
313 register_api("submit", &mut app, move |ver| {
314 endpoints::submit::<_, _, _, SequencerApiVersion>(ver)
315 .context("failed to define submit api")
316 })?;
317 }
318
319 tracing::info!("initializing catchup API");
320
321 register_api("catchup", &mut app, move |ver| {
322 endpoints::catchup(bind_version, ver).context("failed to define catchup api")
323 })?;
324
325 register_api("state-signature", &mut app, move |ver| {
326 endpoints::state_signature(bind_version, ver)
327 .context("failed to define state signature api")
328 })?;
329
330 if self.config.is_some() {
331 register_api("config", &mut app, move |ver| {
332 endpoints::config(bind_version, ver).context("failed to define config api")
333 })?;
334 }
335 Ok((metrics, ds, app))
336 }
337
338 async fn init_with_query_module_fs<N, P>(
339 &self,
340 query_opt: Query,
341 mod_opt: persistence::fs::Options,
342 state: ApiState<N, P>,
343 tasks: &mut TaskList,
344 bind_version: SequencerApiVersion,
345 ) -> anyhow::Result<(
346 Box<dyn Metrics>,
347 Box<dyn EventConsumer>,
348 Option<RequestResponseStorage>,
349 )>
350 where
351 N: ConnectedNetwork<PubKey>,
352 P: SequencerPersistence,
353 {
354 let ds = <fs::DataSource as SequencerDataSource>::create(
355 mod_opt,
356 provider(query_opt.peers, bind_version),
357 false,
358 )
359 .await?;
360
361 let inner_storage = ds.inner();
363
364 let (metrics, ds, mut app) = self
365 .init_app_modules(ds, state.clone(), bind_version)
366 .await?;
367
368 if self.hotshot_events.is_some() {
370 self.init_hotshot_events_module(&mut app)?;
371 }
372
373 tasks.spawn("API server", self.listen(self.http.port, app, bind_version));
374 Ok((
375 metrics,
376 Box::new(ApiEventConsumer::from(ds)),
377 Some(RequestResponseStorage::Fs(inner_storage)),
378 ))
379 }
380
381 async fn init_with_query_module_sql<N, P>(
382 self,
383 query_opt: Query,
384 mod_opt: persistence::sql::Options,
385 state: ApiState<N, P>,
386 tasks: &mut TaskList,
387 bind_version: SequencerApiVersion,
388 ) -> anyhow::Result<(
389 Box<dyn Metrics>,
390 Box<dyn EventConsumer>,
391 Option<RequestResponseStorage>,
392 )>
393 where
394 N: ConnectedNetwork<PubKey>,
395 P: SequencerPersistence,
396 {
397 let mut provider = Provider::default();
398
399 let db_provider = mod_opt.clone().create().await?;
402 provider = provider
403 .with_leaf_provider(db_provider.clone())
404 .with_block_provider(db_provider.clone())
405 .with_vid_common_provider(db_provider);
406 for peer in query_opt.peers {
408 tracing::info!("will fetch missing data from {peer}");
409 provider = provider.with_provider(QueryServiceProvider::new(peer, bind_version));
410 }
411
412 let ds = sql::DataSource::create(mod_opt.clone(), provider, false).await?;
413 let inner_storage = ds.inner();
414 let (metrics, ds, mut app) = self
415 .init_app_modules(ds, state.clone(), bind_version)
416 .await?;
417
418 if self.explorer.is_some() {
419 register_api("explorer", &mut app, move |ver| {
420 endpoints::explorer(ver).context("failed to define explorer api")
421 })?;
422 }
423
424 register_api("database", &mut app, move |ver| {
426 endpoints::database::<_, SequencerApiVersion>(ver)
427 .context("failed to define database api")
428 })?;
429
430 register_api("block-state", &mut app, move |ver| {
433 endpoints::merklized_state::<N, P, _, BlockMerkleTree, 3>(ver)
434 .context("failed to define block-state api")
435 })?;
436
437 register_api("fee-state", &mut app, move |ver| {
440 endpoints::fee::<_, SequencerApiVersion>(ver).context("failed to define fee-state api")
441 })?;
442
443 register_api("reward-state", &mut app, move |ver| {
444 endpoints::reward::<
445 _,
446 SequencerApiVersion,
447 RewardMerkleTreeV1,
448 { RewardMerkleTreeV1::ARITY },
449 >(ver, RewardMerkleTreeVersion::V1)
450 .context("failed to define reward-state api")
451 })?;
452
453 register_api("reward-state-v2", &mut app, move |ver| {
455 endpoints::reward::<
456 _,
457 SequencerApiVersion,
458 RewardMerkleTreeV2,
459 { RewardMerkleTreeV2::ARITY },
460 >(ver, RewardMerkleTreeVersion::V2)
461 .context("failed to define reward-state api")
462 })?;
463
464 let get_node_state = {
465 let state = state.clone();
466 async move { state.node_state().await.clone() }
467 };
468 tasks.spawn(
469 "merklized state storage update loop",
470 update_state_storage_loop(ds.clone(), get_node_state),
471 );
472
473 if self.hotshot_events.is_some() {
475 self.init_hotshot_events_module(&mut app)?;
476 }
477
478 if self.light_client.is_some() {
480 register_api("light-client", &mut app, move |ver| {
481 light_client::define_api::<_, SequencerApiVersion>(Default::default(), ver)
482 .context("failed to define light client api")
483 })?;
484 }
485
486 tasks.spawn(
487 "API server",
488 self.listen(self.http.port, app, SequencerApiVersion::instance()),
489 );
490 Ok((
491 metrics,
492 Box::new(ApiEventConsumer::from(ds)),
493 Some(RequestResponseStorage::Sql(inner_storage)),
494 ))
495 }
496
497 fn init_hotshot_modules<N, P, S>(&self, app: &mut App<S, Error>) -> anyhow::Result<()>
503 where
504 S: 'static + Send + Sync + ReadState,
505 P: SequencerPersistence,
506 S::State: Send
507 + Sync
508 + SubmitDataSource<N, P>
509 + StateSignatureDataSource<N>
510 + NodeStateDataSource
511 + CatchupDataSource
512 + HotShotConfigDataSource,
513 N: ConnectedNetwork<PubKey>,
514 {
515 let bind_version = SequencerApiVersion::instance();
516 if self.submit.is_some() {
518 register_api("submit", app, move |ver| {
519 endpoints::submit::<_, _, _, SequencerApiVersion>(ver)
520 .context("failed to define submit api")
521 })?;
522 }
523
524 if self.catchup.is_some() {
526 tracing::info!("initializing state API");
527
528 register_api("catchup", app, move |ver| {
529 endpoints::catchup(bind_version, ver).context("failed to define catchup api")
530 })?;
531 }
532
533 register_api("state-signature", app, move |ver| {
534 endpoints::state_signature(bind_version, ver)
535 .context("failed to define state signature api")
536 })?;
537
538 if self.config.is_some() {
539 register_api("config", app, move |ver| {
540 endpoints::config(bind_version, ver).context("failed to define config api")
541 })?;
542 }
543
544 Ok(())
545 }
546
547 fn init_hotshot_events_module<S>(&self, app: &mut App<S, Error>) -> anyhow::Result<()>
552 where
553 S: 'static + Send + Sync + ReadState,
554 S::State: Send + Sync + hotshot_events_service::events_source::EventsSource<SeqTypes>,
555 {
556 tracing::info!("Initializing HotShot events API at /hotshot-events");
557 register_api("hotshot-events", app, move |ver| {
558 hotshot_events_service::events::define_api::<_, _, SequencerApiVersion>(
559 &hotshot_events_service::events::Options::default(),
560 ver,
561 )
562 .with_context(|| "failed to define the HotShot events API")
563 })?;
564
565 Ok(())
566 }
567
568 fn listen<S, E, ApiVer>(
569 &self,
570 port: u16,
571 app: App<S, E>,
572 bind_version: ApiVer,
573 ) -> impl Future<Output = anyhow::Result<()>> + use<S, E, ApiVer>
574 where
575 S: Send + Sync + 'static,
576 E: Send + Sync + tide_disco::Error,
577 ApiVer: StaticVersionType + 'static,
578 {
579 let max_connections = self.http.max_connections;
580
581 async move {
582 if let Some(limit) = max_connections {
583 app.serve(RateLimitListener::with_port(port, limit), bind_version)
584 .await?;
585 } else {
586 app.serve(format!("0.0.0.0:{port}"), bind_version).await?;
587 }
588 Ok(())
589 }
590 }
591}
592
593#[derive(Parser, Clone, Copy, Debug)]
598pub struct Http {
599 #[clap(long, env = "ESPRESSO_SEQUENCER_API_PORT", default_value = "8080")]
601 pub port: u16,
602
603 #[clap(long, env = "ESPRESSO_SEQUENCER_MAX_CONNECTIONS")]
609 pub max_connections: Option<usize>,
610}
611
612impl Http {
613 pub fn with_port(port: u16) -> Self {
615 Self {
616 port,
617 max_connections: None,
618 }
619 }
620}
621
622#[derive(Parser, Clone, Copy, Debug, Default)]
624pub struct Submit;
625
626#[derive(Parser, Clone, Copy, Debug, Default)]
628pub struct Status;
629
630#[derive(Parser, Clone, Copy, Debug, Default)]
632pub struct Catchup;
633
634#[derive(Parser, Clone, Copy, Debug, Default)]
636pub struct Config;
637
638#[derive(Parser, Clone, Debug, Default)]
640pub struct Query {
641 #[clap(long, env = "ESPRESSO_SEQUENCER_API_PEERS", value_delimiter = ',')]
643 pub peers: Vec<Url>,
644}
645
646#[derive(Parser, Clone, Copy, Debug, Default)]
648pub struct State;
649
650#[derive(Parser, Clone, Copy, Debug, Default)]
652pub struct HotshotEvents;
653
654#[derive(Parser, Clone, Copy, Debug, Default)]
656pub struct Explorer;
657
658#[derive(Parser, Clone, Copy, Debug, Default)]
660pub struct LightClient;
661
662fn register_api<E, S, F, ModuleError, ModuleVersion>(
664 path: &'static str,
665 app: &mut App<S, E>,
666 f: F,
667) -> anyhow::Result<()>
668where
669 S: 'static + Send + Sync,
670 E: Send + Sync + 'static + tide_disco::Error + From<ModuleError>,
671 ModuleError: Send + Sync + 'static,
672 ModuleVersion: StaticVersionType + 'static,
673 F: Fn(semver::Version) -> anyhow::Result<Api<S, ModuleError, ModuleVersion>>,
674{
675 let v0 = "0.0.1".parse().unwrap();
676 let v1 = "1.1.0".parse().unwrap();
677 let result1 = f(v0)?;
678 let result2 = f(v1)?;
679
680 app.register_module(path, result1)?;
681 app.register_module(path, result2)?;
682
683 Ok(())
684}