hotshot_libp2p_networking/network/behaviours/
direct_message.rs1use std::collections::HashMap;
8
9use libp2p::request_response::{Event, Message, OutboundRequestId, ResponseChannel};
10use libp2p_identity::PeerId;
11use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
12use tracing::debug;
13
14use super::exponential_backoff::ExponentialBackoff;
15use crate::network::{ClientRequest, NetworkEvent, log_summary::LogEvent};
16
17#[derive(Debug)]
19pub struct DMRequest {
20 pub peer_id: PeerId,
22 pub data: Vec<u8>,
24 pub backoff: ExponentialBackoff,
26 pub(crate) retry_count: u8,
28}
29
30#[derive(Debug, Default)]
33pub struct DMBehaviour {
34 in_progress_rr: HashMap<OutboundRequestId, DMRequest>,
36}
37
38#[derive(Debug)]
40pub enum DMEvent {
41 DirectRequest(Vec<u8>, PeerId, ResponseChannel<Vec<u8>>),
43 DirectResponse(Vec<u8>, PeerId),
45}
46
47impl DMBehaviour {
48 pub(crate) fn handle_dm_event(
50 &mut self,
51 event: Event<Vec<u8>, Vec<u8>>,
52 retry_tx: Option<UnboundedSender<ClientRequest>>,
53 ) -> Option<NetworkEvent> {
54 match event {
55 Event::InboundFailure {
56 peer,
57 request_id: _,
58 error,
59 connection_id: _,
60 } => {
61 LogEvent::DirectMessageInboundFailure.record();
62 debug!("Inbound message failure from {:?}: {:?}", peer, error);
63 None
64 },
65 Event::OutboundFailure {
66 peer,
67 request_id,
68 error,
69 connection_id: _,
70 } => {
71 LogEvent::DirectMessageOutboundFailure.record();
72 debug!("Outbound message failure to {:?}: {:?}", peer, error);
73 if let Some(mut req) = self.in_progress_rr.remove(&request_id) {
74 if req.retry_count == 0 {
75 return None;
76 }
77 req.retry_count -= 1;
78 if let Some(retry_tx) = retry_tx {
79 spawn(async move {
80 sleep(req.backoff.next_timeout(false)).await;
81 let _ = retry_tx.send(ClientRequest::DirectRequest {
82 pid: peer,
83 contents: req.data,
84 retry_count: req.retry_count,
85 });
86 });
87 }
88 }
89 None
90 },
91 Event::Message { message, peer, .. } => match message {
92 Message::Request {
93 request: msg,
94 channel,
95 ..
96 } => {
97 debug!("Received direct request {:?}", msg);
98 Some(NetworkEvent::DirectRequest(msg, peer, channel))
101 },
102 Message::Response {
103 request_id,
104 response: msg,
105 } => {
106 if let Some(req) = self.in_progress_rr.remove(&request_id) {
108 debug!("Received direct response {:?}", msg);
109 Some(NetworkEvent::DirectResponse(msg, req.peer_id))
110 } else {
111 debug!("Received response for unknown request id {:?}", request_id);
112 None
113 }
114 },
115 },
116 e @ Event::ResponseSent { .. } => {
117 debug!("Response sent {:?}", e);
118 None
119 },
120 }
121 }
122}
123
124impl DMBehaviour {
125 pub fn add_direct_request(&mut self, mut req: DMRequest, request_id: OutboundRequestId) {
127 if req.retry_count == 0 {
128 return;
129 }
130
131 req.retry_count -= 1;
132
133 debug!("Adding direct request {:?}", req);
134
135 self.in_progress_rr.insert(request_id, req);
136 }
137}