1use std::{
2 fmt::{Debug, Display},
3 marker::PhantomData,
4 sync::Arc,
5 time::Duration,
6};
7
8use anyhow::Context;
9use async_lock::RwLock;
10use derivative::Derivative;
11use espresso_types::{
12 NodeState, PubKey, Transaction, ValidatedState,
13 v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence},
14};
15use futures::{
16 future::{Future, join_all},
17 stream::{Stream, StreamExt},
18};
19use hotshot::{
20 SystemContext,
21 types::{Event, EventType, SystemContextHandle},
22};
23use hotshot_events_service::events_source::{EventConsumer, EventsStreamer};
24use hotshot_orchestrator::client::OrchestratorClient;
25use hotshot_types::{
26 PeerConfig, ValidatorConfig,
27 consensus::ConsensusMetricsValue,
28 data::{Leaf2, ViewNumber},
29 epoch_membership::EpochMembershipCoordinator,
30 message::UpgradeLock,
31 network::NetworkConfig,
32 storage_metrics::StorageMetricsValue,
33 traits::{metrics::Metrics, network::ConnectedNetwork},
34};
35use parking_lot::Mutex;
36use request_response::RequestResponseConfig;
37use tokio::{spawn, sync::mpsc::channel, task::JoinHandle};
38use tracing::{Instrument, Level};
39use url::Url;
40
41use crate::{
42 Node, SeqTypes, SequencerApiVersion,
43 catchup::ParallelStateCatchup,
44 external_event_handler::ExternalEventHandler,
45 proposal_fetcher::ProposalFetcherConfig,
46 request_response::{
47 RequestResponseProtocol,
48 data_source::{DataSource, Storage as RequestResponseStorage},
49 network::Sender as RequestResponseSender,
50 recipient_source::RecipientSource,
51 },
52 state_signature::{self, StateSigner},
53};
54
55pub type Consensus<N, P> = SystemContextHandle<SeqTypes, Node<N, P>>;
57
58#[derive(Derivative, Clone)]
60#[derivative(Debug(bound = ""))]
61pub struct SequencerContext<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> {
62 #[derivative(Debug = "ignore")]
64 handle: Arc<RwLock<Consensus<N, P>>>,
65
66 #[derivative(Debug = "ignore")]
68 #[allow(dead_code)]
69 pub request_response_protocol: RequestResponseProtocol<Node<N, P>, N, P>,
70
71 state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
73
74 #[derivative(Debug = "ignore")]
76 wait_for_orchestrator: Option<Arc<OrchestratorClient>>,
77
78 tasks: TaskList,
80
81 events_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
83
84 detached: bool,
85
86 node_state: NodeState,
87
88 network_config: NetworkConfig<SeqTypes>,
89
90 #[derivative(Debug = "ignore")]
91 validator_config: ValidatorConfig<SeqTypes>,
92}
93
94impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> SequencerContext<N, P> {
95 #[tracing::instrument(skip_all, fields(node_id = instance_state.node_id))]
96 #[allow(clippy::too_many_arguments)]
97 pub async fn init(
98 network_config: NetworkConfig<SeqTypes>,
99 upgrade: versions::Upgrade,
100 validator_config: ValidatorConfig<SeqTypes>,
101 coordinator: EpochMembershipCoordinator<SeqTypes>,
102 instance_state: NodeState,
103 storage: Option<RequestResponseStorage>,
104 state_catchup: ParallelStateCatchup,
105 persistence: Arc<P>,
106 network: Arc<N>,
107 state_relay_server: Option<Url>,
108 metrics: &dyn Metrics,
109 stake_table_capacity: usize,
110 event_consumer: impl PersistenceEventConsumer + 'static,
111 proposal_fetcher_cfg: ProposalFetcherConfig,
112 ) -> anyhow::Result<Self> {
113 let config = &network_config.config;
114 let pub_key = validator_config.public_key;
115 tracing::info!(%pub_key, "initializing consensus");
116
117 metrics
119 .create_gauge("node_index".into(), None)
120 .set(instance_state.node_id as usize);
121
122 instance_state.l1_client.spawn_tasks().await;
124
125 let (initializer, anchor_view) = persistence
127 .load_consensus_state(instance_state.clone(), upgrade)
128 .await?;
129
130 tracing::warn!(
131 "Starting up sequencer context with initializer:\n\n{:?}",
132 initializer
133 );
134
135 let stake_table = config.hotshot_stake_table();
136 let stake_table_commit = stake_table.commitment(stake_table_capacity)?;
137 let stake_table_epoch = None;
138 let should_vote =
139 state_signature::should_vote(&stake_table, &validator_config.state_public_key);
140
141 let event_streamer = Arc::new(RwLock::new(EventsStreamer::<SeqTypes>::new(
142 stake_table.0,
143 0,
144 )));
145
146 let handle = SystemContext::init(
147 validator_config.public_key,
148 validator_config.private_key.clone(),
149 validator_config.state_private_key.clone(),
150 instance_state.node_id,
151 config.clone(),
152 upgrade,
153 coordinator.clone(),
154 network.clone(),
155 initializer,
156 ConsensusMetricsValue::new(metrics),
157 Arc::clone(&persistence),
158 StorageMetricsValue::new(metrics),
159 )
160 .await?
161 .0;
162
163 let mut state_signer = StateSigner::new(
164 validator_config.state_private_key.clone(),
165 validator_config.state_public_key.clone(),
166 stake_table_commit,
167 stake_table_epoch,
168 stake_table_capacity,
169 should_vote,
170 );
171 if let Some(url) = state_relay_server {
172 state_signer = state_signer.with_relay_server(url);
173 }
174
175 let (outbound_message_sender, outbound_message_receiver) = channel(20);
177 let (request_response_sender, request_response_receiver) = channel(20);
178
179 let request_response_config = RequestResponseConfig {
181 incoming_request_ttl: Duration::from_secs(40),
182 incoming_request_timeout: Duration::from_secs(5),
183 incoming_response_timeout: Duration::from_secs(5),
184 request_batch_size: 5,
185 request_batch_interval: Duration::from_secs(2),
186 max_incoming_requests: 10,
187 max_incoming_requests_per_key: 1,
188 max_incoming_responses: 200,
189 };
190
191 let request_response_protocol = RequestResponseProtocol::new(
193 request_response_config,
194 RequestResponseSender::new(outbound_message_sender),
195 request_response_receiver,
196 RecipientSource {
197 memberships: coordinator,
198 consensus: handle.hotshot.clone(),
199 public_key: validator_config.public_key,
200 },
201 DataSource {
202 node_state: instance_state.clone(),
203 storage,
204 persistence: persistence.clone(),
205 consensus: handle.hotshot.clone(),
206 phantom: PhantomData,
207 },
208 validator_config.public_key,
209 validator_config.private_key.clone(),
210 );
211
212 state_catchup.add_provider(Arc::new(request_response_protocol.clone()));
216
217 let mut tasks = TaskList::default();
219 let external_event_handler = ExternalEventHandler::new(
220 &mut tasks,
221 request_response_sender,
222 outbound_message_receiver,
223 network,
224 pub_key,
225 )
226 .await
227 .with_context(|| "Failed to create external event handler")?;
228
229 Ok(Self::new(
230 handle,
231 persistence,
232 state_signer,
233 external_event_handler,
234 request_response_protocol,
235 event_streamer,
236 instance_state,
237 network_config,
238 validator_config,
239 event_consumer,
240 anchor_view,
241 proposal_fetcher_cfg,
242 metrics,
243 )
244 .with_task_list(tasks))
245 }
246
247 #[allow(clippy::too_many_arguments)]
249 fn new(
250 handle: Consensus<N, P>,
251 persistence: Arc<P>,
252 state_signer: StateSigner<SequencerApiVersion>,
253 external_event_handler: ExternalEventHandler,
254 request_response_protocol: RequestResponseProtocol<Node<N, P>, N, P>,
255 event_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
256 node_state: NodeState,
257 network_config: NetworkConfig<SeqTypes>,
258 validator_config: ValidatorConfig<SeqTypes>,
259 event_consumer: impl PersistenceEventConsumer + 'static,
260 anchor_view: Option<ViewNumber>,
261 proposal_fetcher_cfg: ProposalFetcherConfig,
262 metrics: &dyn Metrics,
263 ) -> Self {
264 let events = handle.event_stream();
265
266 let node_id = node_state.node_id;
267 let mut ctx = Self {
268 handle: Arc::new(RwLock::new(handle)),
269 state_signer: Arc::new(RwLock::new(state_signer)),
270 request_response_protocol,
271 tasks: Default::default(),
272 detached: false,
273 wait_for_orchestrator: None,
274 events_streamer: event_streamer.clone(),
275 node_state,
276 network_config,
277 validator_config,
278 };
279
280 proposal_fetcher_cfg.spawn(
282 &mut ctx.tasks,
283 ctx.handle.clone(),
284 persistence.clone(),
285 metrics,
286 );
287
288 ctx.spawn(
290 "event handler",
291 handle_events(
292 ctx.handle.clone(),
293 node_id,
294 events,
295 persistence,
296 ctx.state_signer.clone(),
297 external_event_handler,
298 Some(event_streamer.clone()),
299 event_consumer,
300 anchor_view,
301 ),
302 );
303
304 ctx
305 }
306
307 pub fn wait_for_orchestrator(mut self, client: OrchestratorClient) -> Self {
309 self.wait_for_orchestrator = Some(Arc::new(client));
310 self
311 }
312
313 pub(crate) fn with_task_list(mut self, tasks: TaskList) -> Self {
315 self.tasks.extend(tasks);
316 self
317 }
318
319 pub fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
321 self.state_signer.clone()
322 }
323
324 pub async fn event_stream(&self) -> impl Stream<Item = Event<SeqTypes>> + use<N, P> {
326 self.handle.read().await.event_stream()
327 }
328
329 pub async fn submit_transaction(&self, tx: Transaction) -> anyhow::Result<()> {
330 self.handle.read().await.submit_transaction(tx).await?;
331 Ok(())
332 }
333
334 pub fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
336 self.events_streamer.clone()
337 }
338
339 pub fn consensus(&self) -> Arc<RwLock<Consensus<N, P>>> {
341 Arc::clone(&self.handle)
342 }
343
344 pub async fn upgrade_lock(&self) -> UpgradeLock<SeqTypes> {
345 self.handle.read().await.hotshot.upgrade_lock.clone()
346 }
347
348 pub async fn shutdown_consensus(&self) {
349 self.handle.write().await.shut_down().await
350 }
351
352 pub async fn decided_leaf(&self) -> Leaf2<SeqTypes> {
353 self.handle.read().await.decided_leaf().await
354 }
355
356 pub async fn state(&self, view: ViewNumber) -> Option<Arc<ValidatedState>> {
357 self.handle.read().await.state(view).await
358 }
359
360 pub async fn decided_state(&self) -> Arc<ValidatedState> {
361 self.handle.read().await.decided_state().await
362 }
363
364 pub fn node_id(&self) -> u64 {
365 self.node_state.node_id
366 }
367
368 pub fn node_state(&self) -> NodeState {
369 self.node_state.clone()
370 }
371
372 pub async fn start_consensus(&self) {
374 if let Some(orchestrator_client) = &self.wait_for_orchestrator {
375 tracing::warn!("waiting for orchestrated start");
376 let peer_config = PeerConfig::to_bytes(&self.validator_config.public_config()).clone();
377 orchestrator_client
378 .wait_for_all_nodes_ready(peer_config)
379 .await;
380 } else {
381 tracing::error!("Cannot get info from orchestrator client");
382 }
383 tracing::warn!("starting consensus");
384 self.handle.read().await.hotshot.start_consensus().await;
385 }
386
387 pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
392 self.tasks.spawn(name, task);
393 }
394
395 pub fn spawn_short_lived(
403 &mut self,
404 name: impl Display,
405 task: impl Future<Output: Debug> + Send + 'static,
406 ) {
407 self.tasks.spawn_short_lived(name, task);
408 }
409
410 pub async fn shut_down(&mut self) {
412 tracing::info!("shutting down SequencerContext");
413 self.handle.write().await.shut_down().await;
414 self.tasks.shut_down();
415 self.node_state.l1_client.shut_down_tasks().await;
416
417 self.detached = true;
420 }
421
422 pub async fn join(mut self) {
427 self.tasks.join().await;
428 }
429
430 pub fn detach(&mut self) {
432 self.detached = true;
434 }
435
436 pub fn network_config(&self) -> NetworkConfig<SeqTypes> {
438 self.network_config.clone()
439 }
440}
441
442impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> Drop for SequencerContext<N, P> {
443 fn drop(&mut self) {
444 if !self.detached {
445 let handle_clone = self.handle.clone();
447 let tasks_clone = self.tasks.clone();
448 let node_state_clone = self.node_state.clone();
449
450 spawn(async move {
451 tracing::info!("shutting down SequencerContext");
452 handle_clone.write().await.shut_down().await;
453 tasks_clone.shut_down();
454 node_state_clone.l1_client.shut_down_tasks().await;
455 });
456
457 self.detached = true;
459 }
460 }
461}
462
463#[tracing::instrument(skip_all, fields(node_id))]
464#[allow(clippy::too_many_arguments)]
465async fn handle_events<N, P>(
466 consensus: Arc<RwLock<Consensus<N, P>>>,
467 node_id: u64,
468 mut events: impl Stream<Item = Event<SeqTypes>> + Unpin,
469 persistence: Arc<P>,
470 state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
471 external_event_handler: ExternalEventHandler,
472 events_streamer: Option<Arc<RwLock<EventsStreamer<SeqTypes>>>>,
473 event_consumer: impl PersistenceEventConsumer + 'static,
474 anchor_view: Option<ViewNumber>,
475) where
476 N: ConnectedNetwork<PubKey>,
477 P: SequencerPersistence,
478{
479 if let Some(view) = anchor_view {
480 if let Err(err) = persistence
483 .append_decided_leaves(view, vec![], None, &event_consumer)
484 .await
485 {
486 tracing::warn!(
487 "failed to process decided leaves, chain may not be up to date: {err:#}"
488 );
489 }
490 }
491
492 while let Some(event) = events.next().await {
493 tracing::debug!(node_id, ?event, "consensus event");
494 persistence.handle_event(&event, &event_consumer).await;
496
497 state_signer
499 .write()
500 .await
501 .handle_event(&event, consensus.clone())
502 .await;
503
504 if let EventType::ExternalMessageReceived { data, .. } = &event.event
506 && let Err(err) = external_event_handler.handle_event(data).await
507 {
508 tracing::warn!("Failed to handle external message: {:?}", err);
509 };
510
511 if let Some(events_streamer) = events_streamer.as_ref() {
513 events_streamer.write().await.handle_event(event).await;
514 }
515 }
516}
517
518#[derive(Debug, Default, Clone)]
519#[allow(clippy::type_complexity)]
520pub(crate) struct TaskList(Arc<Mutex<Vec<(String, JoinHandle<()>)>>>);
521
522macro_rules! spawn_with_log_level {
523 ($this:expr, $lvl:expr, $name:expr, $task: expr) => {
524 let name = $name.to_string();
525 let task = {
526 let name = name.clone();
527 let span = tracing::span!($lvl, "background task", name);
528 spawn(
529 async move {
530 tracing::event!($lvl, "spawning background task");
531 let res = $task.await;
532 tracing::event!($lvl, ?res, "background task exited");
533 }
534 .instrument(span),
535 )
536 };
537 $this.0.lock().push((name, task));
538 };
539}
540
541impl TaskList {
542 pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
547 spawn_with_log_level!(self, Level::INFO, name, task);
548 }
549
550 pub fn spawn_short_lived(
558 &mut self,
559 name: impl Display,
560 task: impl Future<Output: Debug> + Send + 'static,
561 ) {
562 spawn_with_log_level!(self, Level::DEBUG, name, task);
563 }
564
565 pub fn shut_down(&self) {
567 let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
568 for (name, task) in tasks.into_iter().rev() {
569 tracing::info!(name, "cancelling background task");
570 task.abort();
571 }
572 }
573
574 pub async fn join(&mut self) {
576 let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
577 join_all(tasks.into_iter().map(|(_, task)| task)).await;
578 }
579
580 pub fn extend(&mut self, tasks: TaskList) {
581 self.0.lock().extend(
582 tasks
583 .0
584 .lock()
585 .drain(..)
586 .collect::<Vec<(String, JoinHandle<()>)>>(),
587 );
588 }
589}
590
591impl Drop for TaskList {
592 fn drop(&mut self) {
593 self.shut_down()
594 }
595}