Skip to main content

espresso_node/
context.rs

1use std::{
2    fmt::{Debug, Display},
3    future::Future,
4    marker::PhantomData,
5    num::NonZeroU64,
6    sync::Arc,
7    time::Duration,
8};
9
10use anyhow::Context;
11use async_lock::RwLock;
12use derivative::Derivative;
13use espresso_types::{
14    NodeState, PubKey, Transaction, ValidatedState,
15    v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence},
16};
17use futures::{
18    future::join_all,
19    stream::{BoxStream, Stream, StreamExt},
20};
21use hotshot::SystemContext;
22use hotshot_events_service::events_source::{EventConsumer, EventsStreamer};
23use hotshot_new_protocol::{coordinator::Coordinator, network::Network};
24use hotshot_orchestrator::client::OrchestratorClient;
25use hotshot_types::{
26    PeerConfig, ValidatorConfig,
27    consensus::ConsensusMetricsValue,
28    constants::EXTERNAL_EVENT_CHANNEL_SIZE,
29    data::{Leaf2, ViewNumber},
30    epoch_membership::EpochMembershipCoordinator,
31    message::UpgradeLock,
32    network::NetworkConfig,
33    new_protocol::CoordinatorEvent,
34    storage_metrics::StorageMetricsValue,
35    traits::{metrics::Metrics, network::ConnectedNetwork},
36};
37use parking_lot::Mutex;
38use request_response::RequestResponseConfig;
39use tokio::{spawn, sync::mpsc::channel, task::JoinHandle};
40use tracing::{Instrument, Level};
41use url::Url;
42
43use crate::{
44    Node, SeqTypes, SequencerApiVersion,
45    catchup::ParallelStateCatchup,
46    consensus_handle::ConsensusHandle,
47    external_event_handler::ExternalEventHandler,
48    proposal_fetcher::ProposalFetcherConfig,
49    request_response::{
50        RequestResponseProtocol,
51        data_source::{DataSource, Storage as RequestResponseStorage},
52        network::Sender as RequestResponseSender,
53        recipient_source::RecipientSource,
54    },
55    startup_catchup::bootstrap_epoch_window,
56    state_signature::{self, StateSigner},
57};
58pub(crate) type ConsensusNode<N, P> = Node<N, P>;
59pub type Consensus<N, P> = hotshot::types::SystemContextHandle<SeqTypes, ConsensusNode<N, P>>;
60
61/// The sequencer context contains a consensus handle and other sequencer specific information.
62#[derive(Derivative, Clone)]
63#[derivative(Debug(bound = ""))]
64pub struct SequencerContext<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> {
65    /// The consensus adapter that dispatches between old HotShot and new coordinator.
66    #[derivative(Debug = "ignore")]
67    consensus_handle: Arc<ConsensusHandle<SeqTypes, ConsensusNode<N, P>>>,
68
69    /// The request-response protocol
70    #[derivative(Debug = "ignore")]
71    #[allow(dead_code)]
72    pub request_response_protocol: RequestResponseProtocol<ConsensusNode<N, P>, N, P>,
73
74    /// Context for generating state signatures.
75    state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
76
77    /// An orchestrator to wait for before starting consensus.
78    #[derivative(Debug = "ignore")]
79    wait_for_orchestrator: Option<Arc<OrchestratorClient>>,
80
81    /// Background tasks to shut down when the node is dropped.
82    tasks: TaskList,
83
84    /// events streamer to stream hotshot events to external clients
85    events_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
86
87    detached: bool,
88
89    node_state: NodeState,
90
91    network_config: NetworkConfig<SeqTypes>,
92
93    #[derivative(Debug = "ignore")]
94    validator_config: ValidatorConfig<SeqTypes>,
95}
96
97impl<N, P> SequencerContext<N, P>
98where
99    N: ConnectedNetwork<PubKey>,
100    P: SequencerPersistence,
101{
102    #[tracing::instrument(skip_all, fields(node_id = instance_state.node_id))]
103    #[allow(clippy::too_many_arguments)]
104    pub async fn init<T: Network<SeqTypes> + Send + 'static>(
105        network_config: NetworkConfig<SeqTypes>,
106        upgrade: versions::Upgrade,
107        validator_config: ValidatorConfig<SeqTypes>,
108        membership_coordinator: EpochMembershipCoordinator<SeqTypes>,
109        instance_state: NodeState,
110        storage: Option<RequestResponseStorage>,
111        state_catchup: ParallelStateCatchup,
112        persistence: Arc<P>,
113        network: Arc<N>,
114        coordinator_network: T,
115        state_relay_server: Option<Url>,
116        metrics: &dyn Metrics,
117        stake_table_capacity: usize,
118        event_consumer: impl PersistenceEventConsumer + 'static,
119        proposal_fetcher_cfg: ProposalFetcherConfig,
120        bootstrap_epoch_catchup_timeout: Duration,
121        new_protocol_consensus_gc_interval: NonZeroU64,
122    ) -> anyhow::Result<Self> {
123        let config = &network_config.config;
124        let pub_key = validator_config.public_key;
125        tracing::info!(%pub_key, "initializing consensus");
126
127        // Stick our node ID in `metrics` so it is easily accessible via the status API.
128        metrics
129            .create_gauge("node_index".into(), None)
130            .set(instance_state.node_id as usize);
131
132        // Start L1 client if it isn't already.
133        instance_state.l1_client.spawn_tasks().await;
134
135        // Load saved consensus state from storage.
136        let (initializer, anchor_view) = persistence
137            .load_consensus_state(instance_state.clone(), upgrade)
138            .await?;
139
140        tracing::warn!(
141            "Starting up sequencer context with initializer:\n\n{:?}",
142            initializer
143        );
144
145        let stake_table = config.hotshot_stake_table();
146        let stake_table_commit = stake_table.commitment(stake_table_capacity)?;
147        let stake_table_epoch = None;
148        let should_vote =
149            state_signature::should_vote(&stake_table, &validator_config.state_public_key);
150
151        let epoch_height = initializer.epoch_height;
152
153        let initializer_for_coordinator = initializer.clone();
154
155        let event_streamer = Arc::new(RwLock::new(EventsStreamer::<SeqTypes>::new(
156            stake_table.0,
157            0,
158        )));
159        let handle = SystemContext::init(
160            validator_config.public_key,
161            validator_config.private_key.clone(),
162            validator_config.state_private_key.clone(),
163            instance_state.node_id,
164            config.clone(),
165            upgrade,
166            membership_coordinator.clone(),
167            network.clone(),
168            initializer,
169            ConsensusMetricsValue::new(metrics),
170            Arc::clone(&persistence),
171            StorageMetricsValue::new(metrics),
172        )
173        .await?
174        .0;
175
176        // `load_start_epoch_info` ran inside `SystemContext::init`, so
177        // `first_epoch` is now seeded on the shared membership. Walk the
178        // catchup chain forward to populate the stake-table window for the
179        // current epoch.
180        let current_epoch = bootstrap_epoch_window(
181            &membership_coordinator,
182            epoch_height,
183            bootstrap_epoch_catchup_timeout,
184        )
185        .await
186        .context("startup stake-table catchup failed")?;
187        tracing::info!(%current_epoch, "Startup catchup complete");
188
189        // Push the resolved peer window into the coordinator network. For
190        // cliquenet this dials the N-1/N/N+1 sliding window for the current
191        // epoch before consensus starts.
192        let mut coordinator_network = coordinator_network;
193        if let Err(err) = coordinator_network.apply_epoch(current_epoch, &membership_coordinator) {
194            tracing::warn!(%current_epoch, %err, "coordinator network apply_epoch failed at startup");
195        }
196
197        let coordinator = Coordinator::maker()
198            .membership_coordinator(membership_coordinator.clone())
199            .network(coordinator_network)
200            .initializer(&initializer_for_coordinator)
201            .upgrade_lock({
202                // TODO: The Coordinator and HotShot each create their own UpgradeLock
203                // from the same inputs. They need to share a single lock so that upgrade
204                // certificate updates are visible to both.
205                UpgradeLock::from_certificate(
206                    upgrade,
207                    &initializer_for_coordinator.decided_upgrade_certificate,
208                )
209            })
210            .public_key(validator_config.public_key)
211            .private_key(validator_config.private_key.clone())
212            .state_private_key(validator_config.state_private_key.clone())
213            .stake_table_capacity(stake_table_capacity)
214            .timeout_duration(Duration::from_secs(10))
215            .storage(Arc::clone(&persistence))
216            .garbage_collection_interval(new_protocol_consensus_gc_interval.get())
217            .make();
218
219        let legacy_event_rx = handle.event_stream_known_impl().deactivate();
220        let hotshot_handle = Arc::new(RwLock::new(handle));
221        let consensus_handle = Arc::new(ConsensusHandle::new(
222            hotshot_handle.clone(),
223            coordinator,
224            epoch_height,
225            legacy_event_rx,
226            EXTERNAL_EVENT_CHANNEL_SIZE,
227        ));
228
229        let mut state_signer = StateSigner::new(
230            validator_config.state_private_key.clone(),
231            validator_config.state_public_key.clone(),
232            stake_table_commit,
233            stake_table_epoch,
234            stake_table_capacity,
235            should_vote,
236        );
237        if let Some(url) = state_relay_server {
238            state_signer = state_signer.with_relay_server(url);
239        }
240
241        // Create the channel for sending outbound messages from the external event handler
242        let (outbound_message_sender, outbound_message_receiver) = channel(20);
243        let (request_response_sender, request_response_receiver) = channel(20);
244
245        // Configure the request-response protocol
246        let request_response_config = RequestResponseConfig {
247            incoming_request_ttl: Duration::from_secs(40),
248            incoming_request_timeout: Duration::from_secs(5),
249            incoming_response_timeout: Duration::from_secs(5),
250            request_batch_size: 5,
251            request_batch_interval: Duration::from_secs(2),
252            max_incoming_requests: 10,
253            max_incoming_requests_per_key: 1,
254            max_incoming_responses: 200,
255        };
256
257        // Create the request-response protocol
258        let request_response_protocol = RequestResponseProtocol::new(
259            request_response_config,
260            RequestResponseSender::new(outbound_message_sender),
261            request_response_receiver,
262            RecipientSource {
263                memberships: membership_coordinator,
264                consensus_handle: consensus_handle.clone(),
265                public_key: validator_config.public_key,
266            },
267            DataSource {
268                node_state: instance_state.clone(),
269                storage,
270                persistence: persistence.clone(),
271                consensus_handle: consensus_handle.clone(),
272                phantom: PhantomData,
273            },
274            validator_config.public_key,
275            validator_config.private_key.clone(),
276        );
277
278        // Add the request-response protocol to the list of providers for state catchup. Since the interior is mutable,
279        // the request-response protocol will now retroactively be used anywhere we passed in the original struct (e.g. in consensus
280        // itself)
281        state_catchup.add_provider(Arc::new(request_response_protocol.clone()));
282
283        // Create the external event handler
284        let mut tasks = TaskList::default();
285        let external_event_handler = ExternalEventHandler::new(
286            &mut tasks,
287            request_response_sender,
288            outbound_message_receiver,
289            network,
290            pub_key,
291        )
292        .await
293        .with_context(|| "Failed to create external event handler")?;
294
295        Ok(Self::new(
296            consensus_handle,
297            persistence,
298            state_signer,
299            external_event_handler,
300            request_response_protocol,
301            event_streamer,
302            instance_state,
303            network_config,
304            validator_config,
305            event_consumer,
306            anchor_view,
307            proposal_fetcher_cfg,
308            metrics,
309        )
310        .with_task_list(tasks))
311    }
312
313    /// Constructor
314    #[allow(clippy::too_many_arguments)]
315    fn new(
316        consensus_handle: Arc<ConsensusHandle<SeqTypes, ConsensusNode<N, P>>>,
317        persistence: Arc<P>,
318        state_signer: StateSigner<SequencerApiVersion>,
319        external_event_handler: ExternalEventHandler,
320        request_response_protocol: RequestResponseProtocol<ConsensusNode<N, P>, N, P>,
321        event_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
322        node_state: NodeState,
323        network_config: NetworkConfig<SeqTypes>,
324        validator_config: ValidatorConfig<SeqTypes>,
325        event_consumer: impl PersistenceEventConsumer + 'static,
326        anchor_view: Option<ViewNumber>,
327        proposal_fetcher_cfg: ProposalFetcherConfig,
328        metrics: &dyn Metrics,
329    ) -> Self {
330        let events = consensus_handle.event_stream();
331
332        let node_id = node_state.node_id;
333        let mut ctx = Self {
334            consensus_handle,
335            state_signer: Arc::new(RwLock::new(state_signer)),
336            request_response_protocol,
337            tasks: Default::default(),
338            detached: false,
339            wait_for_orchestrator: None,
340            events_streamer: event_streamer.clone(),
341            node_state,
342            network_config,
343            validator_config,
344        };
345
346        // Spawn proposal fetching tasks.
347        proposal_fetcher_cfg.spawn(
348            &mut ctx.tasks,
349            ctx.consensus_handle.clone(),
350            persistence.clone(),
351            metrics,
352        );
353
354        // Spawn event handling loop.
355        ctx.spawn(
356            "event handler",
357            handle_events(
358                ctx.consensus_handle.clone(),
359                node_id,
360                events,
361                persistence,
362                ctx.state_signer.clone(),
363                external_event_handler,
364                Some(event_streamer.clone()),
365                event_consumer,
366                anchor_view,
367            ),
368        );
369
370        ctx
371    }
372
373    /// Wait for a signal from the orchestrator before starting consensus.
374    pub fn wait_for_orchestrator(mut self, client: OrchestratorClient) -> Self {
375        self.wait_for_orchestrator = Some(Arc::new(client));
376        self
377    }
378
379    /// Add a list of tasks to the given context.
380    pub(crate) fn with_task_list(mut self, tasks: TaskList) -> Self {
381        self.tasks.extend(tasks);
382        self
383    }
384
385    /// Return a reference to the consensus state signer.
386    pub fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
387        self.state_signer.clone()
388    }
389
390    /// Stream consensus events.
391    pub fn event_stream(&self) -> BoxStream<'static, CoordinatorEvent<SeqTypes>> {
392        self.consensus_handle.event_stream()
393    }
394
395    pub async fn submit_transaction(&self, tx: Transaction) -> anyhow::Result<()> {
396        self.consensus_handle.submit_transaction(tx).await
397    }
398
399    /// get event streamer
400    pub fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
401        self.events_streamer.clone()
402    }
403
404    /// Return a reference to the consensus adapter.
405    pub fn consensus_handle(&self) -> Arc<ConsensusHandle<SeqTypes, ConsensusNode<N, P>>> {
406        self.consensus_handle.clone()
407    }
408
409    pub async fn upgrade_lock(&self) -> UpgradeLock<SeqTypes> {
410        self.consensus_handle.upgrade_lock().await
411    }
412
413    pub async fn shutdown_consensus(&self) {
414        self.consensus_handle.shut_down().await
415    }
416
417    pub async fn decided_leaf(&self) -> Leaf2<SeqTypes> {
418        self.consensus_handle.decided_leaf().await
419    }
420
421    pub async fn state(&self, view: ViewNumber) -> Option<Arc<ValidatedState>> {
422        self.consensus_handle.state(view).await
423    }
424
425    pub async fn decided_state(&self) -> Option<Arc<ValidatedState>> {
426        self.consensus_handle.decided_state().await
427    }
428
429    pub fn node_id(&self) -> u64 {
430        self.node_state.node_id
431    }
432
433    pub fn node_state(&self) -> NodeState {
434        self.node_state.clone()
435    }
436
437    /// Start participating in consensus.
438    pub async fn start_consensus(&self) {
439        if let Some(orchestrator_client) = &self.wait_for_orchestrator {
440            tracing::warn!("waiting for orchestrated start");
441            let peer_config = PeerConfig::to_bytes(&self.validator_config.public_config()).clone();
442            orchestrator_client
443                .wait_for_all_nodes_ready(peer_config)
444                .await;
445        } else {
446            tracing::error!("Cannot get info from orchestrator client");
447        }
448        tracing::warn!("starting consensus");
449        self.consensus_handle.start_consensus().await;
450    }
451
452    /// Spawn a background task attached to this context.
453    ///
454    /// When this context is dropped or [`shut_down`](Self::shut_down), background tasks will be
455    /// cancelled in the reverse order that they were spawned.
456    pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
457        self.tasks.spawn(name, task);
458    }
459
460    /// Spawn a short-lived background task attached to this context.
461    ///
462    /// When this context is dropped or [`shut_down`](Self::shut_down), background tasks will be
463    /// cancelled in the reverse order that they were spawned.
464    ///
465    /// The only difference between a short-lived background task and a [long-lived](Self::spawn)
466    /// one is how urgently logging related to the task is treated.
467    pub fn spawn_short_lived(
468        &mut self,
469        name: impl Display,
470        task: impl Future<Output: Debug> + Send + 'static,
471    ) {
472        self.tasks.spawn_short_lived(name, task);
473    }
474
475    /// Stop participating in consensus.
476    pub async fn shut_down(&mut self) {
477        tracing::info!("shutting down SequencerContext");
478        self.consensus_handle.shut_down().await;
479        self.tasks.shut_down();
480        self.node_state.l1_client.shut_down_tasks().await;
481
482        // Since we've already shut down, we can set `detached` so the drop
483        // handler doesn't call `shut_down` again.
484        self.detached = true;
485    }
486
487    /// Wait for consensus to complete.
488    ///
489    /// Under normal conditions, this function will block forever, which is a convenient way of
490    /// keeping the main thread from exiting as long as there are still active background tasks.
491    pub async fn join(mut self) {
492        self.tasks.join().await;
493    }
494
495    /// Allow this node to continue participating in consensus even after it is dropped.
496    pub fn detach(&mut self) {
497        // Set `detached` so the drop handler doesn't call `shut_down`.
498        self.detached = true;
499    }
500
501    /// Get the network config
502    pub fn network_config(&self) -> NetworkConfig<SeqTypes> {
503        self.network_config.clone()
504    }
505}
506
507impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> Drop for SequencerContext<N, P> {
508    fn drop(&mut self) {
509        if !self.detached {
510            // Spawn a task to shut down the context
511            let consensus_handle = self.consensus_handle.clone();
512            let tasks_clone = self.tasks.clone();
513            let node_state_clone = self.node_state.clone();
514
515            spawn(async move {
516                tracing::info!("shutting down SequencerContext");
517                consensus_handle.shut_down().await;
518                tasks_clone.shut_down();
519                node_state_clone.l1_client.shut_down_tasks().await;
520            });
521
522            // Set `detached` so the drop handler doesn't call `shut_down` again.
523            self.detached = true;
524        }
525    }
526}
527
528#[tracing::instrument(skip_all, fields(node_id))]
529#[allow(clippy::too_many_arguments)]
530async fn handle_events<N, P>(
531    consensus_handle: Arc<ConsensusHandle<SeqTypes, ConsensusNode<N, P>>>,
532    node_id: u64,
533    mut events: impl Stream<Item = CoordinatorEvent<SeqTypes>> + Unpin,
534    persistence: Arc<P>,
535    state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
536    external_event_handler: ExternalEventHandler,
537    events_streamer: Option<Arc<RwLock<EventsStreamer<SeqTypes>>>>,
538    event_consumer: impl PersistenceEventConsumer + 'static,
539    anchor_view: Option<ViewNumber>,
540) where
541    N: ConnectedNetwork<PubKey>,
542    P: SequencerPersistence,
543{
544    if let Some(view) = anchor_view {
545        // Process and clean up any leaves that we may have persisted last time we were running but
546        // failed to handle due to a shutdown.
547        if let Err(err) = persistence
548            .append_decided_leaves(view, vec![], None, &event_consumer)
549            .await
550        {
551            tracing::warn!(
552                "failed to process decided leaves, chain may not be up to date: {err:#}"
553            );
554        }
555    }
556
557    while let Some(event) = events.next().await {
558        tracing::debug!(node_id, ?event, "consensus event");
559
560        match &event {
561            CoordinatorEvent::LegacyEvent(hotshot_event) => {
562                if let hotshot_types::event::EventType::ExternalMessageReceived { ref data, .. } =
563                    hotshot_event.event
564                    && let Err(err) = external_event_handler.handle_event(data).await
565                {
566                    tracing::warn!("Failed to handle legacy external message: {:?}", err);
567                }
568                // Check if we're ready to start the new protocol
569                consensus_handle.cutover_active().await;
570            },
571            CoordinatorEvent::ExternalMessageReceived { data, .. } => {
572                if let Err(err) = external_event_handler.handle_event(data).await {
573                    tracing::warn!("Failed to handle external message: {:?}", err);
574                }
575            },
576            _ => {},
577        }
578
579        let persistence_fut = persistence.handle_event(&event, &event_consumer);
580
581        let state_signer_fut = async {
582            state_signer
583                .write()
584                .await
585                .handle_event(&event, consensus_handle.as_ref())
586                .await;
587        };
588
589        let events_streamer_fut = async {
590            if let CoordinatorEvent::LegacyEvent(ref hotshot_event) = event
591                && let Some(events_streamer) = events_streamer.as_ref()
592            {
593                events_streamer
594                    .write()
595                    .await
596                    .handle_event(hotshot_event.clone())
597                    .await;
598            }
599        };
600
601        tokio::join!(persistence_fut, state_signer_fut, events_streamer_fut);
602    }
603}
604
605#[derive(Debug, Default, Clone)]
606#[allow(clippy::type_complexity)]
607pub(crate) struct TaskList(Arc<Mutex<Vec<(String, JoinHandle<()>)>>>);
608
609macro_rules! spawn_with_log_level {
610    ($this:expr, $lvl:expr, $name:expr, $task: expr) => {
611        let name = $name.to_string();
612        let task = {
613            let name = name.clone();
614            let span = tracing::span!($lvl, "background task", name);
615            spawn(
616                async move {
617                    tracing::event!($lvl, "spawning background task");
618                    let res = $task.await;
619                    tracing::event!($lvl, ?res, "background task exited");
620                }
621                .instrument(span),
622            )
623        };
624        $this.0.lock().push((name, task));
625    };
626}
627
628impl TaskList {
629    /// Spawn a background task attached to this [`TaskList`].
630    ///
631    /// When this [`TaskList`] is dropped or [`shut_down`](Self::shut_down), background tasks will
632    /// be cancelled in the reverse order that they were spawned.
633    pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
634        spawn_with_log_level!(self, Level::INFO, name, task);
635    }
636
637    /// Spawn a short-lived background task attached to this [`TaskList`].
638    ///
639    /// When this [`TaskList`] is dropped or [`shut_down`](Self::shut_down), background tasks will
640    /// be cancelled in the reverse order that they were spawned.
641    ///
642    /// The only difference between a short-lived background task and a [long-lived](Self::spawn)
643    /// one is how urgently logging related to the task is treated.
644    pub fn spawn_short_lived(
645        &mut self,
646        name: impl Display,
647        task: impl Future<Output: Debug> + Send + 'static,
648    ) {
649        spawn_with_log_level!(self, Level::DEBUG, name, task);
650    }
651
652    /// Stop all background tasks.
653    pub fn shut_down(&self) {
654        let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
655        for (name, task) in tasks.into_iter().rev() {
656            tracing::info!(name, "cancelling background task");
657            task.abort();
658        }
659    }
660
661    /// Wait for all background tasks to complete.
662    pub async fn join(&mut self) {
663        let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
664        join_all(tasks.into_iter().map(|(_, task)| task)).await;
665    }
666
667    pub fn extend(&mut self, tasks: TaskList) {
668        self.0.lock().extend(
669            tasks
670                .0
671                .lock()
672                .drain(..)
673                .collect::<Vec<(String, JoinHandle<()>)>>(),
674        );
675    }
676}
677
678impl Drop for TaskList {
679    fn drop(&mut self) {
680        self.shut_down()
681    }
682}