espresso_node/
context.rs

1use std::{
2    fmt::{Debug, Display},
3    marker::PhantomData,
4    sync::Arc,
5    time::Duration,
6};
7
8use anyhow::Context;
9use async_lock::RwLock;
10use derivative::Derivative;
11use espresso_types::{
12    NodeState, PubKey, Transaction, ValidatedState,
13    v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence},
14};
15use futures::{
16    future::{Future, join_all},
17    stream::{Stream, StreamExt},
18};
19use hotshot::{
20    SystemContext,
21    types::{Event, EventType, SystemContextHandle},
22};
23use hotshot_events_service::events_source::{EventConsumer, EventsStreamer};
24use hotshot_orchestrator::client::OrchestratorClient;
25use hotshot_types::{
26    PeerConfig, ValidatorConfig,
27    consensus::ConsensusMetricsValue,
28    data::{Leaf2, ViewNumber},
29    epoch_membership::EpochMembershipCoordinator,
30    message::UpgradeLock,
31    network::NetworkConfig,
32    storage_metrics::StorageMetricsValue,
33    traits::{metrics::Metrics, network::ConnectedNetwork},
34};
35use parking_lot::Mutex;
36use request_response::RequestResponseConfig;
37use tokio::{spawn, sync::mpsc::channel, task::JoinHandle};
38use tracing::{Instrument, Level};
39use url::Url;
40
41use crate::{
42    Node, SeqTypes, SequencerApiVersion,
43    catchup::ParallelStateCatchup,
44    external_event_handler::ExternalEventHandler,
45    proposal_fetcher::ProposalFetcherConfig,
46    request_response::{
47        RequestResponseProtocol,
48        data_source::{DataSource, Storage as RequestResponseStorage},
49        network::Sender as RequestResponseSender,
50        recipient_source::RecipientSource,
51    },
52    state_signature::{self, StateSigner},
53};
54
55/// The consensus handle
56pub type Consensus<N, P> = SystemContextHandle<SeqTypes, Node<N, P>>;
57
58/// The sequencer context contains a consensus handle and other sequencer specific information.
59#[derive(Derivative, Clone)]
60#[derivative(Debug(bound = ""))]
61pub struct SequencerContext<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> {
62    /// The consensus handle
63    #[derivative(Debug = "ignore")]
64    handle: Arc<RwLock<Consensus<N, P>>>,
65
66    /// The request-response protocol
67    #[derivative(Debug = "ignore")]
68    #[allow(dead_code)]
69    pub request_response_protocol: RequestResponseProtocol<Node<N, P>, N, P>,
70
71    /// Context for generating state signatures.
72    state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
73
74    /// An orchestrator to wait for before starting consensus.
75    #[derivative(Debug = "ignore")]
76    wait_for_orchestrator: Option<Arc<OrchestratorClient>>,
77
78    /// Background tasks to shut down when the node is dropped.
79    tasks: TaskList,
80
81    /// events streamer to stream hotshot events to external clients
82    events_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
83
84    detached: bool,
85
86    node_state: NodeState,
87
88    network_config: NetworkConfig<SeqTypes>,
89
90    #[derivative(Debug = "ignore")]
91    validator_config: ValidatorConfig<SeqTypes>,
92}
93
94impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> SequencerContext<N, P> {
95    #[tracing::instrument(skip_all, fields(node_id = instance_state.node_id))]
96    #[allow(clippy::too_many_arguments)]
97    pub async fn init(
98        network_config: NetworkConfig<SeqTypes>,
99        upgrade: versions::Upgrade,
100        validator_config: ValidatorConfig<SeqTypes>,
101        coordinator: EpochMembershipCoordinator<SeqTypes>,
102        instance_state: NodeState,
103        storage: Option<RequestResponseStorage>,
104        state_catchup: ParallelStateCatchup,
105        persistence: Arc<P>,
106        network: Arc<N>,
107        state_relay_server: Option<Url>,
108        metrics: &dyn Metrics,
109        stake_table_capacity: usize,
110        event_consumer: impl PersistenceEventConsumer + 'static,
111        proposal_fetcher_cfg: ProposalFetcherConfig,
112    ) -> anyhow::Result<Self> {
113        let config = &network_config.config;
114        let pub_key = validator_config.public_key;
115        tracing::info!(%pub_key, "initializing consensus");
116
117        // Stick our node ID in `metrics` so it is easily accessible via the status API.
118        metrics
119            .create_gauge("node_index".into(), None)
120            .set(instance_state.node_id as usize);
121
122        // Start L1 client if it isn't already.
123        instance_state.l1_client.spawn_tasks().await;
124
125        // Load saved consensus state from storage.
126        let (initializer, anchor_view) = persistence
127            .load_consensus_state(instance_state.clone(), upgrade)
128            .await?;
129
130        tracing::warn!(
131            "Starting up sequencer context with initializer:\n\n{:?}",
132            initializer
133        );
134
135        let stake_table = config.hotshot_stake_table();
136        let stake_table_commit = stake_table.commitment(stake_table_capacity)?;
137        let stake_table_epoch = None;
138        let should_vote =
139            state_signature::should_vote(&stake_table, &validator_config.state_public_key);
140
141        let event_streamer = Arc::new(RwLock::new(EventsStreamer::<SeqTypes>::new(
142            stake_table.0,
143            0,
144        )));
145
146        let handle = SystemContext::init(
147            validator_config.public_key,
148            validator_config.private_key.clone(),
149            validator_config.state_private_key.clone(),
150            instance_state.node_id,
151            config.clone(),
152            upgrade,
153            coordinator.clone(),
154            network.clone(),
155            initializer,
156            ConsensusMetricsValue::new(metrics),
157            Arc::clone(&persistence),
158            StorageMetricsValue::new(metrics),
159        )
160        .await?
161        .0;
162
163        let mut state_signer = StateSigner::new(
164            validator_config.state_private_key.clone(),
165            validator_config.state_public_key.clone(),
166            stake_table_commit,
167            stake_table_epoch,
168            stake_table_capacity,
169            should_vote,
170        );
171        if let Some(url) = state_relay_server {
172            state_signer = state_signer.with_relay_server(url);
173        }
174
175        // Create the channel for sending outbound messages from the external event handler
176        let (outbound_message_sender, outbound_message_receiver) = channel(20);
177        let (request_response_sender, request_response_receiver) = channel(20);
178
179        // Configure the request-response protocol
180        let request_response_config = RequestResponseConfig {
181            incoming_request_ttl: Duration::from_secs(40),
182            incoming_request_timeout: Duration::from_secs(5),
183            incoming_response_timeout: Duration::from_secs(5),
184            request_batch_size: 5,
185            request_batch_interval: Duration::from_secs(2),
186            max_incoming_requests: 10,
187            max_incoming_requests_per_key: 1,
188            max_incoming_responses: 200,
189        };
190
191        // Create the request-response protocol
192        let request_response_protocol = RequestResponseProtocol::new(
193            request_response_config,
194            RequestResponseSender::new(outbound_message_sender),
195            request_response_receiver,
196            RecipientSource {
197                memberships: coordinator,
198                consensus: handle.hotshot.clone(),
199                public_key: validator_config.public_key,
200            },
201            DataSource {
202                node_state: instance_state.clone(),
203                storage,
204                persistence: persistence.clone(),
205                consensus: handle.hotshot.clone(),
206                phantom: PhantomData,
207            },
208            validator_config.public_key,
209            validator_config.private_key.clone(),
210        );
211
212        // Add the request-response protocol to the list of providers for state catchup. Since the interior is mutable,
213        // the request-response protocol will now retroactively be used anywhere we passed in the original struct (e.g. in consensus
214        // itself)
215        state_catchup.add_provider(Arc::new(request_response_protocol.clone()));
216
217        // Create the external event handler
218        let mut tasks = TaskList::default();
219        let external_event_handler = ExternalEventHandler::new(
220            &mut tasks,
221            request_response_sender,
222            outbound_message_receiver,
223            network,
224            pub_key,
225        )
226        .await
227        .with_context(|| "Failed to create external event handler")?;
228
229        Ok(Self::new(
230            handle,
231            persistence,
232            state_signer,
233            external_event_handler,
234            request_response_protocol,
235            event_streamer,
236            instance_state,
237            network_config,
238            validator_config,
239            event_consumer,
240            anchor_view,
241            proposal_fetcher_cfg,
242            metrics,
243        )
244        .with_task_list(tasks))
245    }
246
247    /// Constructor
248    #[allow(clippy::too_many_arguments)]
249    fn new(
250        handle: Consensus<N, P>,
251        persistence: Arc<P>,
252        state_signer: StateSigner<SequencerApiVersion>,
253        external_event_handler: ExternalEventHandler,
254        request_response_protocol: RequestResponseProtocol<Node<N, P>, N, P>,
255        event_streamer: Arc<RwLock<EventsStreamer<SeqTypes>>>,
256        node_state: NodeState,
257        network_config: NetworkConfig<SeqTypes>,
258        validator_config: ValidatorConfig<SeqTypes>,
259        event_consumer: impl PersistenceEventConsumer + 'static,
260        anchor_view: Option<ViewNumber>,
261        proposal_fetcher_cfg: ProposalFetcherConfig,
262        metrics: &dyn Metrics,
263    ) -> Self {
264        let events = handle.event_stream();
265
266        let node_id = node_state.node_id;
267        let mut ctx = Self {
268            handle: Arc::new(RwLock::new(handle)),
269            state_signer: Arc::new(RwLock::new(state_signer)),
270            request_response_protocol,
271            tasks: Default::default(),
272            detached: false,
273            wait_for_orchestrator: None,
274            events_streamer: event_streamer.clone(),
275            node_state,
276            network_config,
277            validator_config,
278        };
279
280        // Spawn proposal fetching tasks.
281        proposal_fetcher_cfg.spawn(
282            &mut ctx.tasks,
283            ctx.handle.clone(),
284            persistence.clone(),
285            metrics,
286        );
287
288        // Spawn event handling loop.
289        ctx.spawn(
290            "event handler",
291            handle_events(
292                ctx.handle.clone(),
293                node_id,
294                events,
295                persistence,
296                ctx.state_signer.clone(),
297                external_event_handler,
298                Some(event_streamer.clone()),
299                event_consumer,
300                anchor_view,
301            ),
302        );
303
304        ctx
305    }
306
307    /// Wait for a signal from the orchestrator before starting consensus.
308    pub fn wait_for_orchestrator(mut self, client: OrchestratorClient) -> Self {
309        self.wait_for_orchestrator = Some(Arc::new(client));
310        self
311    }
312
313    /// Add a list of tasks to the given context.
314    pub(crate) fn with_task_list(mut self, tasks: TaskList) -> Self {
315        self.tasks.extend(tasks);
316        self
317    }
318
319    /// Return a reference to the consensus state signer.
320    pub fn state_signer(&self) -> Arc<RwLock<StateSigner<SequencerApiVersion>>> {
321        self.state_signer.clone()
322    }
323
324    /// Stream consensus events.
325    pub async fn event_stream(&self) -> impl Stream<Item = Event<SeqTypes>> + use<N, P> {
326        self.handle.read().await.event_stream()
327    }
328
329    pub async fn submit_transaction(&self, tx: Transaction) -> anyhow::Result<()> {
330        self.handle.read().await.submit_transaction(tx).await?;
331        Ok(())
332    }
333
334    /// get event streamer
335    pub fn event_streamer(&self) -> Arc<RwLock<EventsStreamer<SeqTypes>>> {
336        self.events_streamer.clone()
337    }
338
339    /// Return a reference to the underlying consensus handle.
340    pub fn consensus(&self) -> Arc<RwLock<Consensus<N, P>>> {
341        Arc::clone(&self.handle)
342    }
343
344    pub async fn upgrade_lock(&self) -> UpgradeLock<SeqTypes> {
345        self.handle.read().await.hotshot.upgrade_lock.clone()
346    }
347
348    pub async fn shutdown_consensus(&self) {
349        self.handle.write().await.shut_down().await
350    }
351
352    pub async fn decided_leaf(&self) -> Leaf2<SeqTypes> {
353        self.handle.read().await.decided_leaf().await
354    }
355
356    pub async fn state(&self, view: ViewNumber) -> Option<Arc<ValidatedState>> {
357        self.handle.read().await.state(view).await
358    }
359
360    pub async fn decided_state(&self) -> Arc<ValidatedState> {
361        self.handle.read().await.decided_state().await
362    }
363
364    pub fn node_id(&self) -> u64 {
365        self.node_state.node_id
366    }
367
368    pub fn node_state(&self) -> NodeState {
369        self.node_state.clone()
370    }
371
372    /// Start participating in consensus.
373    pub async fn start_consensus(&self) {
374        if let Some(orchestrator_client) = &self.wait_for_orchestrator {
375            tracing::warn!("waiting for orchestrated start");
376            let peer_config = PeerConfig::to_bytes(&self.validator_config.public_config()).clone();
377            orchestrator_client
378                .wait_for_all_nodes_ready(peer_config)
379                .await;
380        } else {
381            tracing::error!("Cannot get info from orchestrator client");
382        }
383        tracing::warn!("starting consensus");
384        self.handle.read().await.hotshot.start_consensus().await;
385    }
386
387    /// Spawn a background task attached to this context.
388    ///
389    /// When this context is dropped or [`shut_down`](Self::shut_down), background tasks will be
390    /// cancelled in the reverse order that they were spawned.
391    pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
392        self.tasks.spawn(name, task);
393    }
394
395    /// Spawn a short-lived background task attached to this context.
396    ///
397    /// When this context is dropped or [`shut_down`](Self::shut_down), background tasks will be
398    /// cancelled in the reverse order that they were spawned.
399    ///
400    /// The only difference between a short-lived background task and a [long-lived](Self::spawn)
401    /// one is how urgently logging related to the task is treated.
402    pub fn spawn_short_lived(
403        &mut self,
404        name: impl Display,
405        task: impl Future<Output: Debug> + Send + 'static,
406    ) {
407        self.tasks.spawn_short_lived(name, task);
408    }
409
410    /// Stop participating in consensus.
411    pub async fn shut_down(&mut self) {
412        tracing::info!("shutting down SequencerContext");
413        self.handle.write().await.shut_down().await;
414        self.tasks.shut_down();
415        self.node_state.l1_client.shut_down_tasks().await;
416
417        // Since we've already shut down, we can set `detached` so the drop
418        // handler doesn't call `shut_down` again.
419        self.detached = true;
420    }
421
422    /// Wait for consensus to complete.
423    ///
424    /// Under normal conditions, this function will block forever, which is a convenient way of
425    /// keeping the main thread from exiting as long as there are still active background tasks.
426    pub async fn join(mut self) {
427        self.tasks.join().await;
428    }
429
430    /// Allow this node to continue participating in consensus even after it is dropped.
431    pub fn detach(&mut self) {
432        // Set `detached` so the drop handler doesn't call `shut_down`.
433        self.detached = true;
434    }
435
436    /// Get the network config
437    pub fn network_config(&self) -> NetworkConfig<SeqTypes> {
438        self.network_config.clone()
439    }
440}
441
442impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> Drop for SequencerContext<N, P> {
443    fn drop(&mut self) {
444        if !self.detached {
445            // Spawn a task to shut down the context
446            let handle_clone = self.handle.clone();
447            let tasks_clone = self.tasks.clone();
448            let node_state_clone = self.node_state.clone();
449
450            spawn(async move {
451                tracing::info!("shutting down SequencerContext");
452                handle_clone.write().await.shut_down().await;
453                tasks_clone.shut_down();
454                node_state_clone.l1_client.shut_down_tasks().await;
455            });
456
457            // Set `detached` so the drop handler doesn't call `shut_down` again.
458            self.detached = true;
459        }
460    }
461}
462
463#[tracing::instrument(skip_all, fields(node_id))]
464#[allow(clippy::too_many_arguments)]
465async fn handle_events<N, P>(
466    consensus: Arc<RwLock<Consensus<N, P>>>,
467    node_id: u64,
468    mut events: impl Stream<Item = Event<SeqTypes>> + Unpin,
469    persistence: Arc<P>,
470    state_signer: Arc<RwLock<StateSigner<SequencerApiVersion>>>,
471    external_event_handler: ExternalEventHandler,
472    events_streamer: Option<Arc<RwLock<EventsStreamer<SeqTypes>>>>,
473    event_consumer: impl PersistenceEventConsumer + 'static,
474    anchor_view: Option<ViewNumber>,
475) where
476    N: ConnectedNetwork<PubKey>,
477    P: SequencerPersistence,
478{
479    if let Some(view) = anchor_view {
480        // Process and clean up any leaves that we may have persisted last time we were running but
481        // failed to handle due to a shutdown.
482        if let Err(err) = persistence
483            .append_decided_leaves(view, vec![], None, &event_consumer)
484            .await
485        {
486            tracing::warn!(
487                "failed to process decided leaves, chain may not be up to date: {err:#}"
488            );
489        }
490    }
491
492    while let Some(event) = events.next().await {
493        tracing::debug!(node_id, ?event, "consensus event");
494        // Store latest consensus state.
495        persistence.handle_event(&event, &event_consumer).await;
496
497        // Generate state signature.
498        state_signer
499            .write()
500            .await
501            .handle_event(&event, consensus.clone())
502            .await;
503
504        // Handle external messages
505        if let EventType::ExternalMessageReceived { data, .. } = &event.event
506            && let Err(err) = external_event_handler.handle_event(data).await
507        {
508            tracing::warn!("Failed to handle external message: {:?}", err);
509        };
510
511        // Send the event via the event streaming service
512        if let Some(events_streamer) = events_streamer.as_ref() {
513            events_streamer.write().await.handle_event(event).await;
514        }
515    }
516}
517
518#[derive(Debug, Default, Clone)]
519#[allow(clippy::type_complexity)]
520pub(crate) struct TaskList(Arc<Mutex<Vec<(String, JoinHandle<()>)>>>);
521
522macro_rules! spawn_with_log_level {
523    ($this:expr, $lvl:expr, $name:expr, $task: expr) => {
524        let name = $name.to_string();
525        let task = {
526            let name = name.clone();
527            let span = tracing::span!($lvl, "background task", name);
528            spawn(
529                async move {
530                    tracing::event!($lvl, "spawning background task");
531                    let res = $task.await;
532                    tracing::event!($lvl, ?res, "background task exited");
533                }
534                .instrument(span),
535            )
536        };
537        $this.0.lock().push((name, task));
538    };
539}
540
541impl TaskList {
542    /// Spawn a background task attached to this [`TaskList`].
543    ///
544    /// When this [`TaskList`] is dropped or [`shut_down`](Self::shut_down), background tasks will
545    /// be cancelled in the reverse order that they were spawned.
546    pub fn spawn(&mut self, name: impl Display, task: impl Future<Output: Debug> + Send + 'static) {
547        spawn_with_log_level!(self, Level::INFO, name, task);
548    }
549
550    /// Spawn a short-lived background task attached to this [`TaskList`].
551    ///
552    /// When this [`TaskList`] is dropped or [`shut_down`](Self::shut_down), background tasks will
553    /// be cancelled in the reverse order that they were spawned.
554    ///
555    /// The only difference between a short-lived background task and a [long-lived](Self::spawn)
556    /// one is how urgently logging related to the task is treated.
557    pub fn spawn_short_lived(
558        &mut self,
559        name: impl Display,
560        task: impl Future<Output: Debug> + Send + 'static,
561    ) {
562        spawn_with_log_level!(self, Level::DEBUG, name, task);
563    }
564
565    /// Stop all background tasks.
566    pub fn shut_down(&self) {
567        let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
568        for (name, task) in tasks.into_iter().rev() {
569            tracing::info!(name, "cancelling background task");
570            task.abort();
571        }
572    }
573
574    /// Wait for all background tasks to complete.
575    pub async fn join(&mut self) {
576        let tasks: Vec<(String, JoinHandle<()>)> = self.0.lock().drain(..).collect();
577        join_all(tasks.into_iter().map(|(_, task)| task)).await;
578    }
579
580    pub fn extend(&mut self, tasks: TaskList) {
581        self.0.lock().extend(
582            tasks
583                .0
584                .lock()
585                .drain(..)
586                .collect::<Vec<(String, JoinHandle<()>)>>(),
587        );
588    }
589}
590
591impl Drop for TaskList {
592    fn drop(&mut self) {
593        self.shut_down()
594    }
595}