espresso_node/
external_event_handler.rs1use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use espresso_types::{PubKey, SeqTypes};
7use hotshot::types::Message;
8use hotshot_types::{
9 message::MessageKind,
10 traits::network::{BroadcastDelay, ConnectedNetwork, Topic, ViewMessage},
11};
12use request_response::network::Bytes;
13use serde::{Deserialize, Serialize};
14use tokio::sync::mpsc::{Receiver, Sender};
15use vbs::{BinarySerializer, bincode_serializer::BincodeSerializer, version::StaticVersion};
16
17use crate::context::TaskList;
18
19#[derive(Debug, Serialize, Deserialize, Clone)]
21pub enum ExternalMessage {
22 RequestResponse(Vec<u8>),
23}
24
25#[derive(Clone)]
27pub struct ExternalEventHandler {
28 request_response_sender: Sender<Bytes>,
30}
31
32#[derive(Debug)]
34#[allow(dead_code)]
35pub enum OutboundMessage {
36 Direct(MessageKind<SeqTypes>, PubKey),
37 Broadcast(MessageKind<SeqTypes>),
38}
39
40impl ExternalEventHandler {
41 pub async fn new<N: ConnectedNetwork<PubKey>>(
43 tasks: &mut TaskList,
44 request_response_sender: Sender<Bytes>,
45 outbound_message_receiver: Receiver<OutboundMessage>,
46 network: Arc<N>,
47 public_key: PubKey,
48 ) -> Result<Self> {
49 tasks.spawn(
51 "ExternalEventHandler",
52 Self::outbound_message_loop(outbound_message_receiver, network, public_key),
53 );
54
55 Ok(Self {
56 request_response_sender,
57 })
58 }
59
60 pub async fn handle_event(&self, external_message_bytes: &[u8]) -> Result<()> {
65 let external_message = bincode::deserialize(external_message_bytes)
67 .with_context(|| "Failed to deserialize external message")?;
68
69 match external_message {
71 ExternalMessage::RequestResponse(request_response) => {
72 self.request_response_sender
74 .send(request_response.into())
75 .await?;
76 },
77 }
78 Ok(())
79 }
80
81 async fn outbound_message_loop<N: ConnectedNetwork<PubKey>>(
83 mut receiver: Receiver<OutboundMessage>,
84 network: Arc<N>,
85 public_key: PubKey,
86 ) {
87 while let Some(message) = receiver.recv().await {
88 match message {
90 OutboundMessage::Direct(message, recipient) => {
91 let view = message.view_number();
92 let message_inner = Message {
94 sender: public_key,
95 kind: message,
96 };
97
98 let message_bytes =
100 match BincodeSerializer::<StaticVersion<0, 0>>::serialize(&message_inner) {
101 Ok(message_bytes) => message_bytes,
102 Err(err) => {
103 tracing::warn!("Failed to serialize direct message: {}", err);
104 continue;
105 },
106 };
107
108 let network = Arc::clone(&network);
110 tokio::spawn(async move {
111 if let Err(err) =
112 network.direct_message(view, message_bytes, recipient).await
113 {
114 tracing::warn!("Failed to send message: {:?}", err);
115 }
116 });
117 },
118
119 OutboundMessage::Broadcast(message) => {
120 let view = message.view_number();
121 let message_inner = Message {
123 sender: public_key,
124 kind: message,
125 };
126
127 let message_bytes =
129 match BincodeSerializer::<StaticVersion<0, 0>>::serialize(&message_inner) {
130 Ok(message_bytes) => message_bytes,
131 Err(err) => {
132 tracing::warn!("Failed to serialize broadcast message: {}", err);
133 continue;
134 },
135 };
136
137 if let Err(err) = network
139 .broadcast_message(view, message_bytes, Topic::Global, BroadcastDelay::None)
140 .await
141 {
142 tracing::error!("Failed to broadcast message: {:?}", err);
143 };
144 },
145 }
146 }
147 }
148}