espresso_node/
external_event_handler.rs

1//! Should probably rename this to "external" or something
2
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use espresso_types::{PubKey, SeqTypes};
7use hotshot::types::Message;
8use hotshot_types::{
9    message::MessageKind,
10    traits::network::{BroadcastDelay, ConnectedNetwork, Topic, ViewMessage},
11};
12use request_response::network::Bytes;
13use serde::{Deserialize, Serialize};
14use tokio::sync::mpsc::{Receiver, Sender};
15use vbs::{BinarySerializer, bincode_serializer::BincodeSerializer, version::StaticVersion};
16
17use crate::context::TaskList;
18
19/// An external message that can be sent to or received from a node
20#[derive(Debug, Serialize, Deserialize, Clone)]
21pub enum ExternalMessage {
22    RequestResponse(Vec<u8>),
23}
24
25/// The external event handler
26#[derive(Clone)]
27pub struct ExternalEventHandler {
28    /// The sender to the request-response protocol
29    request_response_sender: Sender<Bytes>,
30}
31
32// The different types of outbound messages (broadcast or direct)
33#[derive(Debug)]
34#[allow(dead_code)]
35pub enum OutboundMessage {
36    Direct(MessageKind<SeqTypes>, PubKey),
37    Broadcast(MessageKind<SeqTypes>),
38}
39
40impl ExternalEventHandler {
41    /// Creates a new `ExternalEventHandler` with the given network
42    pub async fn new<N: ConnectedNetwork<PubKey>>(
43        tasks: &mut TaskList,
44        request_response_sender: Sender<Bytes>,
45        outbound_message_receiver: Receiver<OutboundMessage>,
46        network: Arc<N>,
47        public_key: PubKey,
48    ) -> Result<Self> {
49        // Spawn the outbound message handling loop
50        tasks.spawn(
51            "ExternalEventHandler",
52            Self::outbound_message_loop(outbound_message_receiver, network, public_key),
53        );
54
55        Ok(Self {
56            request_response_sender,
57        })
58    }
59
60    /// Handles an event
61    ///
62    /// # Errors
63    /// If the message type is unknown or if there is an error serializing or deserializing the message
64    pub async fn handle_event(&self, external_message_bytes: &[u8]) -> Result<()> {
65        // Deserialize the external message
66        let external_message = bincode::deserialize(external_message_bytes)
67            .with_context(|| "Failed to deserialize external message")?;
68
69        // Match the type
70        match external_message {
71            ExternalMessage::RequestResponse(request_response) => {
72                // Send the inner message to the request-response protocol
73                self.request_response_sender
74                    .send(request_response.into())
75                    .await?;
76            },
77        }
78        Ok(())
79    }
80
81    /// The main loop for sending outbound messages.
82    async fn outbound_message_loop<N: ConnectedNetwork<PubKey>>(
83        mut receiver: Receiver<OutboundMessage>,
84        network: Arc<N>,
85        public_key: PubKey,
86    ) {
87        while let Some(message) = receiver.recv().await {
88            // Match the message type
89            match message {
90                OutboundMessage::Direct(message, recipient) => {
91                    let view = message.view_number();
92                    // Wrap it in the real message type
93                    let message_inner = Message {
94                        sender: public_key,
95                        kind: message,
96                    };
97
98                    // Serialize it
99                    let message_bytes =
100                        match BincodeSerializer::<StaticVersion<0, 0>>::serialize(&message_inner) {
101                            Ok(message_bytes) => message_bytes,
102                            Err(err) => {
103                                tracing::warn!("Failed to serialize direct message: {}", err);
104                                continue;
105                            },
106                        };
107
108                    // Send the message to the recipient
109                    let network = Arc::clone(&network);
110                    tokio::spawn(async move {
111                        if let Err(err) =
112                            network.direct_message(view, message_bytes, recipient).await
113                        {
114                            tracing::warn!("Failed to send message: {:?}", err);
115                        }
116                    });
117                },
118
119                OutboundMessage::Broadcast(message) => {
120                    let view = message.view_number();
121                    // Wrap it in the real message type
122                    let message_inner = Message {
123                        sender: public_key,
124                        kind: message,
125                    };
126
127                    // Serialize it
128                    let message_bytes =
129                        match BincodeSerializer::<StaticVersion<0, 0>>::serialize(&message_inner) {
130                            Ok(message_bytes) => message_bytes,
131                            Err(err) => {
132                                tracing::warn!("Failed to serialize broadcast message: {}", err);
133                                continue;
134                            },
135                        };
136
137                    // Broadcast the message to the global topic
138                    if let Err(err) = network
139                        .broadcast_message(view, message_bytes, Topic::Global, BroadcastDelay::None)
140                        .await
141                    {
142                        tracing::error!("Failed to broadcast message: {:?}", err);
143                    };
144                },
145            }
146        }
147    }
148}