Skip to main content

espresso_node/
external_event_handler.rs

1//! Should probably rename this to "external" or something
2
3use std::sync::Arc;
4
5use anyhow::{Context, Result, bail};
6use espresso_types::{PubKey, SeqTypes};
7use hotshot::types::Message;
8use hotshot_types::{
9    message::MessageKind,
10    traits::network::{BroadcastDelay, ConnectedNetwork, Topic, ViewMessage},
11};
12use request_response::network::Bytes;
13use serde::{Deserialize, Serialize};
14use tokio::sync::mpsc::{Receiver, Sender, error::TrySendError};
15use vbs::{BinarySerializer, bincode_serializer::BincodeSerializer, version::StaticVersion};
16
17use crate::context::TaskList;
18
19/// An external message that can be sent to or received from a node
20#[derive(Debug, Serialize, Deserialize, Clone)]
21pub enum ExternalMessage {
22    RequestResponse(Vec<u8>),
23}
24
25/// The external event handler
26#[derive(Clone)]
27pub struct ExternalEventHandler {
28    /// The sender to the request-response protocol
29    request_response_sender: Sender<Bytes>,
30}
31
32// The different types of outbound messages (broadcast or direct)
33#[derive(Debug)]
34#[allow(dead_code)]
35pub enum OutboundMessage {
36    Direct(MessageKind<SeqTypes>, PubKey),
37    Broadcast(MessageKind<SeqTypes>),
38}
39
40impl ExternalEventHandler {
41    /// Creates a new `ExternalEventHandler` with the given network
42    pub async fn new<N: ConnectedNetwork<PubKey>>(
43        tasks: &mut TaskList,
44        request_response_sender: Sender<Bytes>,
45        outbound_message_receiver: Receiver<OutboundMessage>,
46        network: Arc<N>,
47        public_key: PubKey,
48    ) -> Result<Self> {
49        // Spawn the outbound message handling loop
50        tasks.spawn(
51            "ExternalEventHandler",
52            Self::outbound_message_loop(outbound_message_receiver, network, public_key),
53        );
54
55        Ok(Self {
56            request_response_sender,
57        })
58    }
59
60    /// Handles an event
61    ///
62    /// # Errors
63    /// If the message type is unknown or if there is an error serializing or deserializing the message
64    pub async fn handle_event(&self, external_message_bytes: &[u8]) -> Result<()> {
65        // Deserialize the external message
66        let external_message = bincode::deserialize(external_message_bytes)
67            .with_context(|| "Failed to deserialize external message")?;
68
69        // Match the type
70        match external_message {
71            ExternalMessage::RequestResponse(request_response) => {
72                match self
73                    .request_response_sender
74                    .try_send(request_response.into())
75                {
76                    Ok(()) => Ok(()),
77                    Err(TrySendError::Full(..)) => bail!("request-response channel full"),
78                    Err(TrySendError::Closed(..)) => bail!("request-response channel closed"),
79                }
80            },
81        }
82    }
83
84    /// The main loop for sending outbound messages.
85    async fn outbound_message_loop<N: ConnectedNetwork<PubKey>>(
86        mut receiver: Receiver<OutboundMessage>,
87        network: Arc<N>,
88        public_key: PubKey,
89    ) {
90        while let Some(message) = receiver.recv().await {
91            // Match the message type
92            match message {
93                OutboundMessage::Direct(message, recipient) => {
94                    let view = message.view_number();
95                    // Wrap it in the real message type
96                    let message_inner = Message {
97                        sender: public_key,
98                        kind: message,
99                    };
100
101                    // Serialize it
102                    let message_bytes =
103                        match BincodeSerializer::<StaticVersion<0, 0>>::serialize(&message_inner) {
104                            Ok(message_bytes) => message_bytes,
105                            Err(err) => {
106                                tracing::warn!("Failed to serialize direct message: {}", err);
107                                continue;
108                            },
109                        };
110
111                    // Send the message to the recipient
112                    let network = Arc::clone(&network);
113                    tokio::spawn(async move {
114                        if let Err(err) =
115                            network.direct_message(view, message_bytes, recipient).await
116                        {
117                            tracing::warn!("Failed to send message: {:?}", err);
118                        }
119                    });
120                },
121
122                OutboundMessage::Broadcast(message) => {
123                    let view = message.view_number();
124                    // Wrap it in the real message type
125                    let message_inner = Message {
126                        sender: public_key,
127                        kind: message,
128                    };
129
130                    // Serialize it
131                    let message_bytes =
132                        match BincodeSerializer::<StaticVersion<0, 0>>::serialize(&message_inner) {
133                            Ok(message_bytes) => message_bytes,
134                            Err(err) => {
135                                tracing::warn!("Failed to serialize broadcast message: {}", err);
136                                continue;
137                            },
138                        };
139
140                    // Broadcast the message to the global topic
141                    if let Err(err) = network
142                        .broadcast_message(view, message_bytes, Topic::Global, BroadcastDelay::None)
143                        .await
144                    {
145                        tracing::error!("Failed to broadcast message: {:?}", err);
146                    };
147                },
148            }
149        }
150    }
151}