1use std::{
2 fmt::{Debug, Display},
3 future::Future,
4 marker::PhantomData,
5 num::NonZeroU64,
6 sync::Arc,
7 time::Duration,
8};
9
10use anyhow::Context;
11use async_lock::RwLock;
12use derivative::Derivative;
13use espresso_types::{
14 NodeState, PubKey, Transaction, ValidatedState,
15 v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence},
16};
17use futures::{
18 future::join_all,
19 stream::{BoxStream, Stream, StreamExt},
20};
21use hotshot::SystemContext;
22use hotshot_events_service::events_source::{EventConsumer, EventsStreamer};
23use hotshot_new_protocol::{coordinator::Coordinator, network::Network};
24use hotshot_orchestrator::client::OrchestratorClient;
25use hotshot_types::{
26 PeerConfig, ValidatorConfig,
27 consensus::ConsensusMetricsValue,
28 constants::EXTERNAL_EVENT_CHANNEL_SIZE,
29 data::{Leaf2, ViewNumber},
30 epoch_membership::EpochMembershipCoordinator,
31 message::UpgradeLock,
32 network::NetworkConfig,
33 new_protocol::CoordinatorEvent,
34 storage_metrics::StorageMetricsValue,
35 traits::{metrics::Metrics, network::ConnectedNetwork},
36};
37use parking_lot::Mutex;
38use request_response::RequestResponseConfig;
39use tokio::{spawn, sync::mpsc::channel, task::JoinHandle};
40use tracing::{Instrument, Level};
41use url::Url;
42
43use crate::{
44 Node, SeqTypes, SequencerApiVersion,
45 catchup::ParallelStateCatchup,
46 consensus_handle::ConsensusHandle,
47 external_event_handler::ExternalEventHandler,
48 proposal_fetcher::ProposalFetcherConfig,
49 request_response::{
50 RequestResponseProtocol,
51 data_source::{DataSource, Storage as RequestResponseStorage},
52 network::Sender as RequestResponseSender,
53 recipient_source::RecipientSource,
54 },
55 startup_catchup::bootstrap_epoch_window,
56 state_signature::{self, StateSigner},
57};
58pub(crate) type ConsensusNode<N, P> = Node<N, P>;
59pub type Consensus<N, P> = hotshot::types::SystemContextHandle<SeqTypes, ConsensusNode<N, P>>;
60
61#[derive(Derivative, Clone)]
63#[derivative(Debug(bound = ""))]
64pub struct SequencerContext<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> {
65 #[derivative(Debug = "ignore")]
67 consensus_handle: Arc<ConsensusHandle<SeqTypes, ConsensusNode<N, P>>>,
68
69 #[derivative(Debug = "ignore")]
71 #[allow(dead_code)]
72 pub request_response_protocol: RequestResponseProtocol<ConsensusNode<N, P>, N, P>,
73
74 state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
76
77 #[derivative(Debug = "ignore")]
79 wait_for_orchestrator: Option<Arc<OrchestratorClient>>,
80
81 tasks: TaskList,
83
84 events_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
86
87 detached: bool,
88
89 node_state: NodeState,
90
91 network_config: NetworkConfig<SeqTypes>,
92
93 #[derivative(Debug = "ignore")]
94 validator_config: ValidatorConfig<SeqTypes>,
95}
96
97impl<N, P> SequencerContext<N, P>
98where
99 N: ConnectedNetwork<PubKey>,
100 P: SequencerPersistence,
101{
102 #[tracing::instrument(skip_all, fields(node_id = instance_state.node_id))]
103 #[allow(clippy::too_many_arguments)]
104 pub async fn init<T: Network<SeqTypes> + Send + 'static>(
105 network_config: NetworkConfig<SeqTypes>,
106 upgrade: versions::Upgrade,
107 validator_config: ValidatorConfig<SeqTypes>,
108 membership_coordinator: EpochMembershipCoordinator<SeqTypes>,
109 instance_state: NodeState,
110 storage: Option<RequestResponseStorage>,
111 state_catchup: ParallelStateCatchup,
112 persistence: Arc<P>,
113 network: Arc<N>,
114 coordinator_network: T,
115 state_relay_server: Option<Url>,
116 metrics: &dyn Metrics,
117 stake_table_capacity: usize,
118 event_consumer: impl PersistenceEventConsumer + 'static,
119 proposal_fetcher_cfg: ProposalFetcherConfig,
120 bootstrap_epoch_catchup_timeout: Duration,
121 new_protocol_consensus_gc_interval: NonZeroU64,
122 ) -> anyhow::Result<Self> {
123 let config = &network_config.config;
124 let pub_key = validator_config.public_key;
125 tracing::info!(%pub_key, "initializing consensus");
126
127 metrics
129 .create_gauge("node_index".into(), None)
130 .set(instance_state.node_id as usize);
131
132 instance_state.l1_client.spawn_tasks().await;
134
135 let (initializer, anchor_view) = persistence
137 .load_consensus_state(instance_state.clone(), upgrade)
138 .await?;
139
140 tracing::warn!(
141 "Starting up sequencer context with initializer:\n\n{:?}",
142 initializer
143 );
144
145 let stake_table = config.hotshot_stake_table();
146 let stake_table_commit = stake_table.commitment(stake_table_capacity)?;
147 let stake_table_epoch = None;
148 let should_vote =
149 state_signature::should_vote(&stake_table, &validator_config.state_public_key);
150
151 let epoch_height = initializer.epoch_height;
152
153 let initializer_for_coordinator = initializer.clone();
154
155 let event_streamer = Arc::new(RwLock::new(EventsStreamer::<SeqTypes>::new(
156 stake_table.0,
157 0,
158 )));
159 let handle = SystemContext::init(
160 validator_config.public_key,
161 validator_config.private_key.clone(),
162 validator_config.state_private_key.clone(),
163 instance_state.node_id,
164 config.clone(),
165 upgrade,
166 membership_coordinator.clone(),
167 network.clone(),
168 initializer,
169 ConsensusMetricsValue::new(metrics),
170 Arc::clone(&persistence),
171 StorageMetricsValue::new(metrics),
172 )
173 .await?
174 .0;
175
176 let current_epoch = bootstrap_epoch_window(
181 &membership_coordinator,
182 epoch_height,
183 bootstrap_epoch_catchup_timeout,
184 )
185 .await
186 .context("startup stake-table catchup failed")?;
187 tracing::info!(%current_epoch, "Startup catchup complete");
188
189 let mut coordinator_network = coordinator_network;
193 if let Err(err) = coordinator_network.apply_epoch(current_epoch, &membership_coordinator) {
194 tracing::warn!(%current_epoch, %err, "coordinator network apply_epoch failed at startup");
195 }
196
197 let coordinator = Coordinator::maker()
198 .membership_coordinator(membership_coordinator.clone())
199 .network(coordinator_network)
200 .initializer(&initializer_for_coordinator)
201 .upgrade_lock({
202 UpgradeLock::from_certificate(
206 upgrade,
207 &initializer_for_coordinator.decided_upgrade_certificate,
208 )
209 })
210 .public_key(validator_config.public_key)
211 .private_key(validator_config.private_key.clone())
212 .state_private_key(validator_config.state_private_key.clone())
213 .stake_table_capacity(stake_table_capacity)
214 .timeout_duration(Duration::from_secs(10))
215 .storage(Arc::clone(&persistence))
216 .garbage_collection_interval(new_protocol_consensus_gc_interval.get())
217 .make();
218
219 let legacy_event_rx = handle.event_stream_known_impl().deactivate();
220 let hotshot_handle = Arc::new(RwLock::new(handle));
221 let consensus_handle = Arc::new(ConsensusHandle::new(
222 hotshot_handle.clone(),
223 coordinator,
224 epoch_height,
225 legacy_event_rx,
226 EXTERNAL_EVENT_CHANNEL_SIZE,
227 ));
228
229 let mut state_signer = StateSigner::new(
230 validator_config.state_private_key.clone(),
231 validator_config.state_public_key.clone(),
232 stake_table_commit,
233 stake_table_epoch,
234 stake_table_capacity,
235 should_vote,
236 );
237 if let Some(url) = state_relay_server {
238 state_signer = state_signer.with_relay_server(url);
239 }
240
241 let (outbound_message_sender, outbound_message_receiver) = channel(20);
243 let (request_response_sender, request_response_receiver) = channel(20);
244
245 let request_response_config = RequestResponseConfig {
247 incoming_request_ttl: Duration::from_secs(40),
248 incoming_request_timeout: Duration::from_secs(5),
249 incoming_response_timeout: Duration::from_secs(5),
250 request_batch_size: 5,
251 request_batch_interval: Duration::from_secs(2),
252 max_incoming_requests: 10,
253 max_incoming_requests_per_key: 1,
254 max_incoming_responses: 200,
255 };
256
257 let request_response_protocol = RequestResponseProtocol::new(
259 request_response_config,
260 RequestResponseSender::new(outbound_message_sender),
261 request_response_receiver,
262 RecipientSource {
263 memberships: membership_coordinator,
264 consensus_handle: consensus_handle.clone(),
265 public_key: validator_config.public_key,
266 },
267 DataSource {
268 node_state: instance_state.clone(),
269 storage,
270 persistence: persistence.clone(),
271 consensus_handle: consensus_handle.clone(),
272 phantom: PhantomData,
273 },
274 validator_config.public_key,
275 validator_config.private_key.clone(),
276 );
277
278 state_catchup.add_provider(Arc::new(request_response_protocol.clone()));
282
283 let mut tasks = TaskList::default();
285 let external_event_handler = ExternalEventHandler::new(
286 &mut tasks,
287 request_response_sender,
288 outbound_message_receiver,
289 network,
290 pub_key,
291 )
292 .await
293 .with_context(|| "Failed to create external event handler")?;
294
295 Ok(Self::new(
296 consensus_handle,
297 persistence,
298 state_signer,
299 external_event_handler,
300 request_response_protocol,
301 event_streamer,
302 instance_state,
303 network_config,
304 validator_config,
305 event_consumer,
306 anchor_view,
307 proposal_fetcher_cfg,
308 metrics,
309 )
310 .with_task_list(tasks))
311 }
312
313 #[allow(clippy::too_many_arguments)]
315 fn new(
316 consensus_handle: Arc<ConsensusHandle<SeqTypes, ConsensusNode<N, P>>>,
317 persistence: Arc<P>,
318 state_signer: StateSigner<SequencerApiVersion>,
319 external_event_handler: ExternalEventHandler,
320 request_response_protocol: RequestResponseProtocol<ConsensusNode<N, P>, N, P>,
321 event_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
322 node_state: NodeState,
323 network_config: NetworkConfig<SeqTypes>,
324 validator_config: ValidatorConfig<SeqTypes>,
325 event_consumer: impl PersistenceEventConsumer + 'static,
326 anchor_view: Option<ViewNumber>,
327 proposal_fetcher_cfg: ProposalFetcherConfig,
328 metrics: &dyn Metrics,
329 ) -> Self {
330 let events = consensus_handle.event_stream();
331
332 let node_id = node_state.node_id;
333 let mut ctx = Self {
334 consensus_handle,
335 state_signer: Arc::new(RwLock::new(state_signer)),
336 request_response_protocol,
337 tasks: Default::default(),
338 detached: false,
339 wait_for_orchestrator: None,
340 events_streamer: event_streamer.clone(),
341 node_state,
342 network_config,
343 validator_config,
344 };
345
346 proposal_fetcher_cfg.spawn(
348 &mut ctx.tasks,
349 ctx.consensus_handle.clone(),
350 persistence.clone(),
351 metrics,
352 );
353
354 ctx.spawn(
356 "event handler",
357 handle_events(
358 ctx.consensus_handle.clone(),
359 node_id,
360 events,
361 persistence,
362 ctx.state_signer.clone(),
363 external_event_handler,
364 Some(event_streamer.clone()),
365 event_consumer,
366 anchor_view,
367 ),
368 );
369
370 ctx
371 }
372
373 pub fn wait_for_orchestrator(mut self, client: OrchestratorClient) -> Self {
375 self.wait_for_orchestrator = Some(Arc::new(client));
376 self
377 }
378
379 pub(crate) fn with_task_list(mut self, tasks: TaskList) -> Self {
381 self.tasks.extend(tasks);
382 self
383 }
384
385 pub fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
387 self.state_signer.clone()
388 }
389
390 pub fn event_stream(&self) -> BoxStream<'static, CoordinatorEvent<SeqTypes>> {
392 self.consensus_handle.event_stream()
393 }
394
395 pub async fn submit_transaction(&self, tx: Transaction) -> anyhow::Result<()> {
396 self.consensus_handle.submit_transaction(tx).await
397 }
398
399 pub fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
401 self.events_streamer.clone()
402 }
403
404 pub fn consensus_handle(&self) -> Arc<ConsensusHandle<SeqTypes, ConsensusNode<N, P>>> {
406 self.consensus_handle.clone()
407 }
408
409 pub async fn upgrade_lock(&self) -> UpgradeLock<SeqTypes> {
410 self.consensus_handle.upgrade_lock().await
411 }
412
413 pub async fn shutdown_consensus(&self) {
414 self.consensus_handle.shut_down().await
415 }
416
417 pub async fn decided_leaf(&self) -> Leaf2<SeqTypes> {
418 self.consensus_handle.decided_leaf().await
419 }
420
421 pub async fn state(&self, view: ViewNumber) -> Option<Arc<ValidatedState>> {
422 self.consensus_handle.state(view).await
423 }
424
425 pub async fn decided_state(&self) -> Option<Arc<ValidatedState>> {
426 self.consensus_handle.decided_state().await
427 }
428
429 pub fn node_id(&self) -> u64 {
430 self.node_state.node_id
431 }
432
433 pub fn node_state(&self) -> NodeState {
434 self.node_state.clone()
435 }
436
437 pub async fn start_consensus(&self) {
439 if let Some(orchestrator_client) = &self.wait_for_orchestrator {
440 tracing::warn!("waiting for orchestrated start");
441 let peer_config = PeerConfig::to_bytes(&self.validator_config.public_config()).clone();
442 orchestrator_client
443 .wait_for_all_nodes_ready(peer_config)
444 .await;
445 } else {
446 tracing::error!("Cannot get info from orchestrator client");
447 }
448 tracing::warn!("starting consensus");
449 self.consensus_handle.start_consensus().await;
450 }
451
452 pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
457 self.tasks.spawn(name, task);
458 }
459
460 pub fn spawn_short_lived(
468 &mut self,
469 name: impl Display,
470 task: impl Future<Output: Debug> + Send + 'static,
471 ) {
472 self.tasks.spawn_short_lived(name, task);
473 }
474
475 pub async fn shut_down(&mut self) {
477 tracing::info!("shutting down SequencerContext");
478 self.consensus_handle.shut_down().await;
479 self.tasks.shut_down();
480 self.node_state.l1_client.shut_down_tasks().await;
481
482 self.detached = true;
485 }
486
487 pub async fn join(mut self) {
492 self.tasks.join().await;
493 }
494
495 pub fn detach(&mut self) {
497 self.detached = true;
499 }
500
501 pub fn network_config(&self) -> NetworkConfig<SeqTypes> {
503 self.network_config.clone()
504 }
505}
506
507impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> Drop for SequencerContext<N, P> {
508 fn drop(&mut self) {
509 if !self.detached {
510 let consensus_handle = self.consensus_handle.clone();
512 let tasks_clone = self.tasks.clone();
513 let node_state_clone = self.node_state.clone();
514
515 spawn(async move {
516 tracing::info!("shutting down SequencerContext");
517 consensus_handle.shut_down().await;
518 tasks_clone.shut_down();
519 node_state_clone.l1_client.shut_down_tasks().await;
520 });
521
522 self.detached = true;
524 }
525 }
526}
527
528#[tracing::instrument(skip_all, fields(node_id))]
529#[allow(clippy::too_many_arguments)]
530async fn handle_events<N, P>(
531 consensus_handle: Arc<ConsensusHandle<SeqTypes, ConsensusNode<N, P>>>,
532 node_id: u64,
533 mut events: impl Stream<Item = CoordinatorEvent<SeqTypes>> + Unpin,
534 persistence: Arc<P>,
535 state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
536 external_event_handler: ExternalEventHandler,
537 events_streamer: Option<Arc<RwLock<EventsStreamer<SeqTypes>>>>,
538 event_consumer: impl PersistenceEventConsumer + 'static,
539 anchor_view: Option<ViewNumber>,
540) where
541 N: ConnectedNetwork<PubKey>,
542 P: SequencerPersistence,
543{
544 if let Some(view) = anchor_view {
545 if let Err(err) = persistence
548 .append_decided_leaves(view, vec![], None, &event_consumer)
549 .await
550 {
551 tracing::warn!(
552 "failed to process decided leaves, chain may not be up to date: {err:#}"
553 );
554 }
555 }
556
557 while let Some(event) = events.next().await {
558 tracing::debug!(node_id, ?event, "consensus event");
559
560 match &event {
561 CoordinatorEvent::LegacyEvent(hotshot_event) => {
562 if let hotshot_types::event::EventType::ExternalMessageReceived { ref data, .. } =
563 hotshot_event.event
564 && let Err(err) = external_event_handler.handle_event(data).await
565 {
566 tracing::warn!("Failed to handle legacy external message: {:?}", err);
567 }
568 consensus_handle.cutover_active().await;
570 },
571 CoordinatorEvent::ExternalMessageReceived { data, .. } => {
572 if let Err(err) = external_event_handler.handle_event(data).await {
573 tracing::warn!("Failed to handle external message: {:?}", err);
574 }
575 },
576 _ => {},
577 }
578
579 let persistence_fut = persistence.handle_event(&event, &event_consumer);
580
581 let state_signer_fut = async {
582 state_signer
583 .write()
584 .await
585 .handle_event(&event, consensus_handle.as_ref())
586 .await;
587 };
588
589 let events_streamer_fut = async {
590 if let CoordinatorEvent::LegacyEvent(ref hotshot_event) = event
591 && let Some(events_streamer) = events_streamer.as_ref()
592 {
593 events_streamer
594 .write()
595 .await
596 .handle_event(hotshot_event.clone())
597 .await;
598 }
599 };
600
601 tokio::join!(persistence_fut, state_signer_fut, events_streamer_fut);
602 }
603}
604
605#[derive(Debug, Default, Clone)]
606#[allow(clippy::type_complexity)]
607pub(crate) struct TaskList(Arc<Mutex<Vec<(String, JoinHandle<()>)>>>);
608
609macro_rules! spawn_with_log_level {
610 ($this:expr, $lvl:expr, $name:expr, $task: expr) => {
611 let name = $name.to_string();
612 let task = {
613 let name = name.clone();
614 let span = tracing::span!($lvl, "background task", name);
615 spawn(
616 async move {
617 tracing::event!($lvl, "spawning background task");
618 let res = $task.await;
619 tracing::event!($lvl, ?res, "background task exited");
620 }
621 .instrument(span),
622 )
623 };
624 $this.0.lock().push((name, task));
625 };
626}
627
628impl TaskList {
629 pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
634 spawn_with_log_level!(self, Level::INFO, name, task);
635 }
636
637 pub fn spawn_short_lived(
645 &mut self,
646 name: impl Display,
647 task: impl Future<Output: Debug> + Send + 'static,
648 ) {
649 spawn_with_log_level!(self, Level::DEBUG, name, task);
650 }
651
652 pub fn shut_down(&self) {
654 let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
655 for (name, task) in tasks.into_iter().rev() {
656 tracing::info!(name, "cancelling background task");
657 task.abort();
658 }
659 }
660
661 pub async fn join(&mut self) {
663 let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
664 join_all(tasks.into_iter().map(|(_, task)| task)).await;
665 }
666
667 pub fn extend(&mut self, tasks: TaskList) {
668 self.0.lock().extend(
669 tasks
670 .0
671 .lock()
672 .drain(..)
673 .collect::<Vec<(String, JoinHandle<()>)>>(),
674 );
675 }
676}
677
678impl Drop for TaskList {
679 fn drop(&mut self) {
680 self.shut_down()
681 }
682}