Skip to main content

hotshot_libp2p_networking/network/behaviours/
direct_message.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7use std::collections::HashMap;
8
9use libp2p::request_response::{Event, Message, OutboundRequestId, ResponseChannel};
10use libp2p_identity::PeerId;
11use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
12use tracing::debug;
13
14use super::exponential_backoff::ExponentialBackoff;
15use crate::network::{ClientRequest, NetworkEvent, log_summary::LogEvent};
16
17/// Request to direct message a peert
18#[derive(Debug)]
19pub struct DMRequest {
20    /// the recv-ers peer id
21    pub peer_id: PeerId,
22    /// the data
23    pub data: Vec<u8>,
24    /// backoff since last attempted request
25    pub backoff: ExponentialBackoff,
26    /// the number of remaining retries before giving up
27    pub(crate) retry_count: u8,
28}
29
30/// Wrapper metadata around libp2p's request response
31/// usage: direct message peer
32#[derive(Debug, Default)]
33pub struct DMBehaviour {
34    /// In progress queries
35    in_progress_rr: HashMap<OutboundRequestId, DMRequest>,
36}
37
38/// Lilst of direct message output events
39#[derive(Debug)]
40pub enum DMEvent {
41    /// We received as Direct Request
42    DirectRequest(Vec<u8>, PeerId, ResponseChannel<Vec<u8>>),
43    /// We received a Direct Response
44    DirectResponse(Vec<u8>, PeerId),
45}
46
47impl DMBehaviour {
48    /// handle a direct message event
49    pub(crate) fn handle_dm_event(
50        &mut self,
51        event: Event<Vec<u8>, Vec<u8>>,
52        retry_tx: Option<UnboundedSender<ClientRequest>>,
53    ) -> Option<NetworkEvent> {
54        match event {
55            Event::InboundFailure {
56                peer,
57                request_id: _,
58                error,
59                connection_id: _,
60            } => {
61                LogEvent::DirectMessageInboundFailure.record();
62                debug!("Inbound message failure from {:?}: {:?}", peer, error);
63                None
64            },
65            Event::OutboundFailure {
66                peer,
67                request_id,
68                error,
69                connection_id: _,
70            } => {
71                LogEvent::DirectMessageOutboundFailure.record();
72                debug!("Outbound message failure to {:?}: {:?}", peer, error);
73                if let Some(mut req) = self.in_progress_rr.remove(&request_id) {
74                    if req.retry_count == 0 {
75                        return None;
76                    }
77                    req.retry_count -= 1;
78                    if let Some(retry_tx) = retry_tx {
79                        spawn(async move {
80                            sleep(req.backoff.next_timeout(false)).await;
81                            let _ = retry_tx.send(ClientRequest::DirectRequest {
82                                pid: peer,
83                                contents: req.data,
84                                retry_count: req.retry_count,
85                            });
86                        });
87                    }
88                }
89                None
90            },
91            Event::Message { message, peer, .. } => match message {
92                Message::Request {
93                    request: msg,
94                    channel,
95                    ..
96                } => {
97                    debug!("Received direct request {:?}", msg);
98                    // receiver, not initiator.
99                    // don't track. If we are disconnected, sender will reinitiate
100                    Some(NetworkEvent::DirectRequest(msg, peer, channel))
101                },
102                Message::Response {
103                    request_id,
104                    response: msg,
105                } => {
106                    // success, finished.
107                    if let Some(req) = self.in_progress_rr.remove(&request_id) {
108                        debug!("Received direct response {:?}", msg);
109                        Some(NetworkEvent::DirectResponse(msg, req.peer_id))
110                    } else {
111                        debug!("Received response for unknown request id {:?}", request_id);
112                        None
113                    }
114                },
115            },
116            e @ Event::ResponseSent { .. } => {
117                debug!("Response sent {:?}", e);
118                None
119            },
120        }
121    }
122}
123
124impl DMBehaviour {
125    /// Add a direct request for a given peer
126    pub fn add_direct_request(&mut self, mut req: DMRequest, request_id: OutboundRequestId) {
127        if req.retry_count == 0 {
128            return;
129        }
130
131        req.retry_count -= 1;
132
133        debug!("Adding direct request {:?}", req);
134
135        self.in_progress_rr.insert(request_id, req);
136    }
137}