espresso_node/
external_event_handler.rs1use std::sync::Arc;
4
5use anyhow::{Context, Result, bail};
6use espresso_types::{PubKey, SeqTypes};
7use hotshot::types::Message;
8use hotshot_types::{
9 message::MessageKind,
10 traits::network::{BroadcastDelay, ConnectedNetwork, Topic, ViewMessage},
11};
12use request_response::network::Bytes;
13use serde::{Deserialize, Serialize};
14use tokio::sync::mpsc::{Receiver, Sender, error::TrySendError};
15use vbs::{BinarySerializer, bincode_serializer::BincodeSerializer, version::StaticVersion};
16
17use crate::context::TaskList;
18
19#[derive(Debug, Serialize, Deserialize, Clone)]
21pub enum ExternalMessage {
22 RequestResponse(Vec<u8>),
23}
24
25#[derive(Clone)]
27pub struct ExternalEventHandler {
28 request_response_sender: Sender<Bytes>,
30}
31
32#[derive(Debug)]
34#[allow(dead_code)]
35pub enum OutboundMessage {
36 Direct(MessageKind<SeqTypes>, PubKey),
37 Broadcast(MessageKind<SeqTypes>),
38}
39
40impl ExternalEventHandler {
41 pub async fn new<N: ConnectedNetwork<PubKey>>(
43 tasks: &mut TaskList,
44 request_response_sender: Sender<Bytes>,
45 outbound_message_receiver: Receiver<OutboundMessage>,
46 network: Arc<N>,
47 public_key: PubKey,
48 ) -> Result<Self> {
49 tasks.spawn(
51 "ExternalEventHandler",
52 Self::outbound_message_loop(outbound_message_receiver, network, public_key),
53 );
54
55 Ok(Self {
56 request_response_sender,
57 })
58 }
59
60 pub async fn handle_event(&self, external_message_bytes: &[u8]) -> Result<()> {
65 let external_message = bincode::deserialize(external_message_bytes)
67 .with_context(|| "Failed to deserialize external message")?;
68
69 match external_message {
71 ExternalMessage::RequestResponse(request_response) => {
72 match self
73 .request_response_sender
74 .try_send(request_response.into())
75 {
76 Ok(()) => Ok(()),
77 Err(TrySendError::Full(..)) => bail!("request-response channel full"),
78 Err(TrySendError::Closed(..)) => bail!("request-response channel closed"),
79 }
80 },
81 }
82 }
83
84 async fn outbound_message_loop<N: ConnectedNetwork<PubKey>>(
86 mut receiver: Receiver<OutboundMessage>,
87 network: Arc<N>,
88 public_key: PubKey,
89 ) {
90 while let Some(message) = receiver.recv().await {
91 match message {
93 OutboundMessage::Direct(message, recipient) => {
94 let view = message.view_number();
95 let message_inner = Message {
97 sender: public_key,
98 kind: message,
99 };
100
101 let message_bytes =
103 match BincodeSerializer::<StaticVersion<0, 0>>::serialize(&message_inner) {
104 Ok(message_bytes) => message_bytes,
105 Err(err) => {
106 tracing::warn!("Failed to serialize direct message: {}", err);
107 continue;
108 },
109 };
110
111 let network = Arc::clone(&network);
113 tokio::spawn(async move {
114 if let Err(err) =
115 network.direct_message(view, message_bytes, recipient).await
116 {
117 tracing::warn!("Failed to send message: {:?}", err);
118 }
119 });
120 },
121
122 OutboundMessage::Broadcast(message) => {
123 let view = message.view_number();
124 let message_inner = Message {
126 sender: public_key,
127 kind: message,
128 };
129
130 let message_bytes =
132 match BincodeSerializer::<StaticVersion<0, 0>>::serialize(&message_inner) {
133 Ok(message_bytes) => message_bytes,
134 Err(err) => {
135 tracing::warn!("Failed to serialize broadcast message: {}", err);
136 continue;
137 },
138 };
139
140 if let Err(err) = network
142 .broadcast_message(view, message_bytes, Topic::Global, BroadcastDelay::None)
143 .await
144 {
145 tracing::error!("Failed to broadcast message: {:?}", err);
146 };
147 },
148 }
149 }
150 }
151}