Skip to main content

hotshot_libp2p_networking/network/
node.rs

1// Copyright (c) 2021-2024 Espresso Systems (espressosys.com)
2// This file is part of the HotShot repository.
3
4// You should have received a copy of the MIT License
5// along with the HotShot repository. If not, see <https://mit-license.org/>.
6
7/// configuration for the libp2p network (e.g. how it should be built)
8mod config;
9
10/// libp2p network handle
11/// allows for control over the libp2p network
12mod handle;
13
14use std::{
15    collections::{HashMap, HashSet},
16    iter,
17    num::{NonZeroU32, NonZeroUsize},
18    sync::Arc,
19    time::{Duration, Instant},
20};
21
22use bimap::BiMap;
23use futures::{SinkExt, StreamExt, channel::mpsc};
24use hotshot_types::{
25    constants::KAD_DEFAULT_REPUB_INTERVAL_SEC, traits::node_implementation::NodeType,
26};
27use libp2p::{
28    Multiaddr, StreamProtocol, Swarm, SwarmBuilder, autonat,
29    core::{transport::ListenerId, upgrade::Version::V1Lazy},
30    gossipsub::{
31        Behaviour as Gossipsub, ConfigBuilder as GossipsubConfigBuilder, Event as GossipEvent,
32        Message as GossipsubMessage, MessageAuthenticity, MessageId, Topic, ValidationMode,
33    },
34    identify::{
35        Behaviour as IdentifyBehaviour, Config as IdentifyConfig, Event as IdentifyEvent,
36        Info as IdentifyInfo,
37    },
38    identity::Keypair,
39    kad::{Behaviour, Config, Mode, Record, store::MemoryStore},
40    request_response::{
41        Behaviour as RequestResponse, Config as Libp2pRequestResponseConfig, ProtocolSupport,
42    },
43    swarm::SwarmEvent,
44};
45use libp2p_identity::PeerId;
46use parking_lot::Mutex;
47use rand::{prelude::SliceRandom, thread_rng};
48use tokio::{
49    select, spawn,
50    sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
51};
52use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
53
54pub use self::{
55    config::{
56        DEFAULT_REPLICATION_FACTOR, GossipConfig, NetworkNodeConfig, NetworkNodeConfigBuilder,
57        NetworkNodeConfigBuilderError, RequestResponseConfig,
58    },
59    handle::{NetworkNodeHandle, NetworkNodeReceiver, spawn_network_node},
60};
61use super::{
62    BoxedTransport, ClientRequest, NetworkDef, NetworkError, NetworkEvent, NetworkEventInternal,
63    behaviours::dht::{
64        bootstrap::{DHTBootstrapTask, InputEvent},
65        store::{
66            persistent::{DhtPersistentStorage, PersistentStore},
67            validated::ValidatedStore,
68        },
69    },
70    cbor::Cbor,
71    gen_transport,
72};
73use crate::network::{
74    behaviours::{
75        dht::{DHTBehaviour, DHTProgress, KadPutQuery},
76        direct_message::{DMBehaviour, DMRequest},
77        exponential_backoff::ExponentialBackoff,
78    },
79    log_summary::LogEvent,
80};
81
82/// Maximum size of a message
83pub const MAX_GOSSIP_MSG_SIZE: usize = 2_000_000_000;
84
85/// Wrapped num of connections
86pub const ESTABLISHED_LIMIT: NonZeroU32 = NonZeroU32::new(ESTABLISHED_LIMIT_UNWR).unwrap();
87/// Number of connections to a single peer before logging an error
88pub const ESTABLISHED_LIMIT_UNWR: u32 = 10;
89
90/// AutoNAT confidence at which we treat a Private status as definitive enough
91/// to escalate from a transient warning to a loud operator-facing error.
92/// Matches libp2p-autonat's default `confidence_max`.
93const AUTONAT_CONFIDENCE_MAX: usize = 3;
94
95/// Network definition
96#[derive(derive_more::Debug)]
97pub struct NetworkNode<T: NodeType, D: DhtPersistentStorage> {
98    /// peer id of network node
99    peer_id: PeerId,
100    /// the swarm of networkbehaviours
101    #[debug(skip)]
102    swarm: Swarm<NetworkDef<T::SignatureKey, D>>,
103    /// The Kademlia record TTL
104    kademlia_record_ttl: Duration,
105    /// The map from consensus keys to peer IDs
106    consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
107    /// the listener id we are listening on, if it exists
108    listener_id: Option<ListenerId>,
109    /// Handler for direct messages
110    direct_message_state: DMBehaviour,
111    /// Handler for DHT Events
112    dht_handler: DHTBehaviour<T::SignatureKey, D>,
113    /// Channel to resend requests, set to Some when we call `spawn_listeners`
114    resend_tx: Option<UnboundedSender<ClientRequest>>,
115    /// Whether we've already emitted the loud "not publicly reachable" error for the
116    /// current Private episode. Reset whenever AutoNAT leaves Private status.
117    autonat_private_logged: bool,
118}
119
120impl<T: NodeType, D: DhtPersistentStorage> NetworkNode<T, D> {
121    /// Returns number of peers this node is connected to
122    pub fn num_connected(&self) -> usize {
123        self.swarm.connected_peers().count()
124    }
125
126    /// return hashset of PIDs this node is connected to
127    pub fn connected_pids(&self) -> HashSet<PeerId> {
128        self.swarm.connected_peers().copied().collect()
129    }
130
131    /// starts the swarm listening on `listen_addr`
132    /// and optionally dials into peer `known_peer`
133    /// returns the address the swarm is listening upon
134    #[instrument(skip(self))]
135    pub async fn start_listen(
136        &mut self,
137        listen_addr: Multiaddr,
138    ) -> Result<Multiaddr, NetworkError> {
139        self.listener_id = Some(self.swarm.listen_on(listen_addr).map_err(|err| {
140            NetworkError::ListenError(format!("failed to listen for Libp2p: {err}"))
141        })?);
142        let addr = loop {
143            if let Some(SwarmEvent::NewListenAddr { address, .. }) = self.swarm.next().await {
144                break address;
145            }
146        };
147        info!("Libp2p listening on {addr:?}");
148        Ok(addr)
149    }
150
151    /// initialize the DHT with known peers
152    /// add the peers to kademlia and then
153    /// the `spawn_listeners` function
154    /// will start connecting to peers
155    #[instrument(skip(self))]
156    pub fn add_known_peers(&mut self, known_peers: &[(PeerId, Multiaddr)]) {
157        debug!("Adding {} known peers", known_peers.len());
158        let behaviour = self.swarm.behaviour_mut();
159        let mut bs_nodes = HashMap::<PeerId, HashSet<Multiaddr>>::new();
160        let mut shuffled = known_peers.iter().collect::<Vec<_>>();
161        shuffled.shuffle(&mut thread_rng());
162        for (peer_id, addr) in shuffled {
163            if *peer_id != self.peer_id {
164                behaviour.dht.add_address(peer_id, addr.clone());
165                behaviour.autonat.add_server(*peer_id, Some(addr.clone()));
166                bs_nodes.insert(*peer_id, iter::once(addr.clone()).collect());
167            }
168        }
169    }
170
171    /// Creates a new `Network` with the given settings.
172    ///
173    /// Currently:
174    ///   * Generates a random key pair and associated [`PeerId`]
175    ///   * Launches a hopefully production ready transport: QUIC v1 (RFC 9000) + DNS
176    ///   * Generates a connection to the "broadcast" topic
177    ///   * Creates a swarm to manage peers and events
178    ///
179    /// # Errors
180    /// - If we fail to generate the transport or any of the behaviours
181    ///
182    /// # Panics
183    /// If 5 < 0
184    #[allow(clippy::too_many_lines)]
185    pub async fn new(
186        config: NetworkNodeConfig,
187        dht_persistent_storage: D,
188        consensus_key_to_pid_map: Arc<Mutex<BiMap<T::SignatureKey, PeerId>>>,
189    ) -> Result<Self, NetworkError> {
190        // Generate a random `KeyPair` if one is not specified
191        let keypair = config
192            .keypair
193            .clone()
194            .unwrap_or_else(Keypair::generate_ed25519);
195
196        // Get the `PeerId` from the `KeyPair`
197        let peer_id = PeerId::from(keypair.public());
198
199        // Generate the transport from the keypair and auth message
200        let transport: BoxedTransport = gen_transport::<T>(
201            keypair.clone(),
202            config.auth_message.clone(),
203            Arc::clone(&consensus_key_to_pid_map),
204        )
205        .await?;
206
207        // Calculate the record republication interval
208        let kademlia_record_republication_interval = config
209            .republication_interval
210            .unwrap_or(Duration::from_secs(KAD_DEFAULT_REPUB_INTERVAL_SEC));
211
212        // Calculate the Kademlia record TTL
213        let kademlia_ttl = config
214            .ttl
215            .unwrap_or(16 * kademlia_record_republication_interval);
216
217        // Generate the swarm
218        let mut swarm: Swarm<NetworkDef<T::SignatureKey, D>> = {
219            // Use the `Blake3` hash of the message's contents as the ID
220            let message_id_fn = |message: &GossipsubMessage| {
221                let hash = blake3::hash(&message.data);
222                MessageId::from(hash.as_bytes().to_vec())
223            };
224
225            // Derive a `Gossipsub` config from our gossip config
226            let gossipsub_config = GossipsubConfigBuilder::default()
227                .message_id_fn(message_id_fn) // Use the (blake3) hash of a message as its ID
228                .validation_mode(ValidationMode::Strict) // Force all messages to have valid signatures
229                .heartbeat_interval(config.gossip_config.heartbeat_interval) // Time between gossip heartbeats
230                .history_gossip(config.gossip_config.history_gossip) // Number of heartbeats to gossip about
231                .history_length(config.gossip_config.history_length) // Number of heartbeats to remember the full message for
232                .mesh_n(config.gossip_config.mesh_n) // Target number of mesh peers
233                .mesh_n_high(config.gossip_config.mesh_n_high) // Upper limit of mesh peers
234                .mesh_n_low(config.gossip_config.mesh_n_low) // Lower limit of mesh peers
235                .mesh_outbound_min(config.gossip_config.mesh_outbound_min) // Minimum number of outbound peers in mesh
236                .max_transmit_size(config.gossip_config.max_transmit_size) // Maximum size of a message
237                .max_ihave_length(config.gossip_config.max_ihave_length) // Maximum number of messages to include in an IHAVE message
238                .max_ihave_messages(config.gossip_config.max_ihave_messages) // Maximum number of IHAVE messages to accept from a peer within a heartbeat
239                .published_message_ids_cache_time(
240                    config.gossip_config.published_message_ids_cache_time,
241                ) // Cache duration for published message IDs
242                .iwant_followup_time(config.gossip_config.iwant_followup_time) // Time to wait for a message requested through IWANT following an IHAVE advertisement
243                .max_messages_per_rpc(config.gossip_config.max_messages_per_rpc) // The maximum number of messages we will process in a given RPC
244                .gossip_retransimission(config.gossip_config.gossip_retransmission) // Controls how many times we will allow a peer to request the same message id through IWANT gossip before we start ignoring them.
245                .flood_publish(config.gossip_config.flood_publish) // If enabled newly created messages will always be sent to all peers that are subscribed to the topic and have a good enough score.
246                .duplicate_cache_time(config.gossip_config.duplicate_cache_time) // The time period that messages are stored in the cache
247                .fanout_ttl(config.gossip_config.fanout_ttl) // Time to live for fanout peers
248                .heartbeat_initial_delay(config.gossip_config.heartbeat_initial_delay) // Initial delay in each heartbeat
249                .gossip_factor(config.gossip_config.gossip_factor) // Affects how many peers we will emit gossip to at each heartbeat
250                .gossip_lazy(config.gossip_config.gossip_lazy) // Minimum number of peers to emit gossip to during a heartbeat
251                .build()
252                .map_err(|err| {
253                    NetworkError::ConfigError(format!("error building gossipsub config: {err:?}"))
254                })?;
255
256            // - Build a gossipsub network behavior
257            let gossipsub: Gossipsub = Gossipsub::new(
258                MessageAuthenticity::Signed(keypair.clone()),
259                gossipsub_config,
260            )
261            .map_err(|err| {
262                NetworkError::ConfigError(format!("error building gossipsub behaviour: {err:?}"))
263            })?;
264
265            //   Build a identify network behavior needed for own
266            //   node connection information
267            //   E.g. this will answer the question: how are other nodes
268            //   seeing the peer from behind a NAT
269            let identify_cfg =
270                IdentifyConfig::new("HotShot/identify/1.0".to_string(), keypair.public());
271            let identify = IdentifyBehaviour::new(identify_cfg);
272
273            // - Build DHT needed for peer discovery
274            let mut kconfig = Config::new(StreamProtocol::new("/ipfs/kad/1.0.0"));
275            kconfig
276                .set_parallelism(NonZeroUsize::new(5).unwrap())
277                .set_provider_publication_interval(Some(kademlia_record_republication_interval))
278                .set_publication_interval(Some(kademlia_record_republication_interval))
279                .set_record_ttl(Some(kademlia_ttl));
280
281            // allowing panic here because something is very wrong if this fails
282            #[allow(clippy::panic)]
283            if let Some(factor) = config.replication_factor {
284                kconfig.set_replication_factor(factor);
285            } else {
286                panic!("Replication factor not set");
287            }
288
289            // Create the DHT behaviour with the given persistent storage
290            let mut kadem = Behaviour::with_config(
291                peer_id,
292                PersistentStore::new(
293                    ValidatedStore::new(MemoryStore::new(peer_id)),
294                    dht_persistent_storage,
295                    5,
296                )
297                .await,
298                kconfig,
299            );
300            kadem.set_mode(Some(Mode::Server));
301
302            let rrconfig = Libp2pRequestResponseConfig::default();
303
304            // Create a new `cbor` codec with the given request and response sizes
305            let cbor = Cbor::new(
306                config.request_response_config.request_size_maximum,
307                config.request_response_config.response_size_maximum,
308            );
309
310            let direct_message: super::cbor::Behaviour<Vec<u8>, Vec<u8>> =
311                RequestResponse::with_codec(
312                    cbor,
313                    [(
314                        StreamProtocol::new("/HotShot/direct_message/1.0"),
315                        ProtocolSupport::Full,
316                    )],
317                    rrconfig.clone(),
318                );
319
320            let autonat_config = autonat::Config {
321                only_global_ips: false,
322                ..Default::default()
323            };
324
325            let network = NetworkDef::new(
326                gossipsub,
327                kadem,
328                identify,
329                direct_message,
330                autonat::Behaviour::new(peer_id, autonat_config),
331            );
332
333            // build swarm
334            let swarm = SwarmBuilder::with_existing_identity(keypair.clone());
335            let swarm = swarm.with_tokio();
336
337            swarm
338                .with_other_transport(|_| transport)
339                .unwrap()
340                .with_behaviour(|_| network)
341                .unwrap()
342                .with_swarm_config(|cfg| {
343                    cfg.with_idle_connection_timeout(Duration::from_secs(10))
344                        .with_substream_upgrade_protocol_override(V1Lazy)
345                })
346                .build()
347        };
348        for (peer, addr) in &config.to_connect_addrs {
349            if peer != swarm.local_peer_id() {
350                swarm.behaviour_mut().add_address(peer, addr.clone());
351                swarm.add_peer_address(*peer, addr.clone());
352            }
353        }
354
355        for addr in &config.announce_addresses {
356            info!("Adding announce address {addr}");
357            swarm.add_external_address(addr.clone());
358        }
359
360        Ok(Self {
361            peer_id,
362            swarm,
363            kademlia_record_ttl: kademlia_ttl,
364            consensus_key_to_pid_map,
365            listener_id: None,
366            direct_message_state: DMBehaviour::default(),
367            dht_handler: DHTBehaviour::new(
368                peer_id,
369                config
370                    .replication_factor
371                    .unwrap_or(NonZeroUsize::new(4).unwrap()),
372            ),
373            resend_tx: None,
374            autonat_private_logged: false,
375        })
376    }
377
378    /// Publish a key/value to the record store.
379    ///
380    /// # Panics
381    /// If the default replication factor is `None`
382    pub fn put_record(&mut self, mut query: KadPutQuery) {
383        // Create the new record
384        let mut record = Record::new(query.key.clone(), query.value.clone());
385
386        // Set the record's expiration time to the proper time
387        record.expires = Some(Instant::now() + self.kademlia_record_ttl);
388
389        match self.swarm.behaviour_mut().dht.put_record(
390            record,
391            libp2p::kad::Quorum::N(
392                NonZeroUsize::try_from(self.dht_handler.replication_factor().get() / 2)
393                    .expect("replication factor should be bigger than 0"),
394            ),
395        ) {
396            Err(e) => {
397                // failed try again later
398                query.progress = DHTProgress::NotStarted;
399                query.backoff.start_next(false);
400                error!("Error publishing to DHT: {e:?} for peer {:?}", self.peer_id);
401            },
402            Ok(qid) => {
403                debug!("Published record to DHT with qid {qid:?}");
404                let query = KadPutQuery {
405                    progress: DHTProgress::InProgress(qid),
406                    ..query
407                };
408                self.dht_handler.put_record(qid, query);
409            },
410        }
411    }
412
413    /// event handler for client events
414    /// currently supported actions include
415    /// - shutting down the swarm
416    /// - gossipping a message to known peers on the `global` topic
417    /// - returning the id of the current peer
418    /// - subscribing to a topic
419    /// - unsubscribing from a toipc
420    /// - direct messaging a peer
421    #[instrument(skip(self))]
422    async fn handle_client_requests(
423        &mut self,
424        msg: Option<ClientRequest>,
425    ) -> Result<bool, NetworkError> {
426        let behaviour = self.swarm.behaviour_mut();
427        match msg {
428            Some(msg) => {
429                match msg {
430                    ClientRequest::BeginBootstrap => {
431                        debug!("Beginning Libp2p bootstrap");
432                        let _ = self.swarm.behaviour_mut().dht.bootstrap();
433                    },
434                    ClientRequest::LookupPeer(pid, chan) => {
435                        let id = self.swarm.behaviour_mut().dht.get_closest_peers(pid);
436                        self.dht_handler
437                            .in_progress_get_closest_peers
438                            .insert(id, chan);
439                    },
440                    ClientRequest::GetRoutingTable(chan) => {
441                        self.dht_handler
442                            .print_routing_table(&mut self.swarm.behaviour_mut().dht);
443                        if chan.send(()).is_err() {
444                            warn!("Tried to notify client but client not tracking anymore");
445                        }
446                    },
447                    ClientRequest::PutDHT { key, value, notify } => {
448                        let query = KadPutQuery {
449                            progress: DHTProgress::NotStarted,
450                            notify,
451                            key,
452                            value,
453                            backoff: ExponentialBackoff::default(),
454                        };
455                        self.put_record(query);
456                    },
457                    ClientRequest::GetConnectedPeerNum(s) => {
458                        if s.send(self.num_connected()).is_err() {
459                            error!("error sending peer number to client");
460                        }
461                    },
462                    ClientRequest::GetConnectedPeers(s) => {
463                        if s.send(self.connected_pids()).is_err() {
464                            error!("error sending peer set to client");
465                        }
466                    },
467                    ClientRequest::GetDHT {
468                        key,
469                        notify,
470                        retry_count,
471                    } => {
472                        self.dht_handler.get_record(
473                            key,
474                            notify,
475                            ExponentialBackoff::default(),
476                            retry_count,
477                            &mut self.swarm.behaviour_mut().dht,
478                        );
479                    },
480                    ClientRequest::IgnorePeers(_peers) => {
481                        // NOTE used by test with conductor only
482                    },
483                    ClientRequest::Shutdown => {
484                        if let Some(listener_id) = self.listener_id {
485                            self.swarm.remove_listener(listener_id);
486                        }
487
488                        return Ok(true);
489                    },
490                    ClientRequest::GossipMsg(topic, contents) => {
491                        behaviour.publish_gossip(Topic::new(topic.clone()), contents.clone());
492                    },
493                    ClientRequest::Subscribe(t, chan) => {
494                        behaviour.subscribe_gossip(&t);
495                        if let Some(chan) = chan
496                            && chan.send(()).is_err()
497                        {
498                            error!("finished subscribing but response channel dropped");
499                        }
500                    },
501                    ClientRequest::Unsubscribe(t, chan) => {
502                        behaviour.unsubscribe_gossip(&t);
503                        if let Some(chan) = chan
504                            && chan.send(()).is_err()
505                        {
506                            error!("finished unsubscribing but response channel dropped");
507                        }
508                    },
509                    ClientRequest::DirectRequest {
510                        pid,
511                        contents,
512                        retry_count,
513                    } => {
514                        debug!("Sending direct request to {pid:?}");
515                        let id = behaviour.add_direct_request(pid, contents.clone());
516                        let req = DMRequest {
517                            peer_id: pid,
518                            data: contents,
519                            backoff: ExponentialBackoff::default(),
520                            retry_count,
521                        };
522                        self.direct_message_state.add_direct_request(req, id);
523                    },
524                    ClientRequest::DirectResponse(chan, msg) => {
525                        behaviour.add_direct_response(chan, msg);
526                    },
527                    ClientRequest::AddKnownPeers(peers) => {
528                        self.add_known_peers(&peers);
529                    },
530                    ClientRequest::Prune(pid) => {
531                        if self.swarm.disconnect_peer_id(pid).is_err() {
532                            warn!("Could not disconnect from {pid:?}");
533                        }
534                    },
535                }
536            },
537            None => {
538                error!("Error receiving msg in main behaviour loop: channel closed");
539            },
540        }
541        Ok(false)
542    }
543
544    /// event handler for events emitted from the swarm
545    #[allow(clippy::type_complexity)]
546    #[instrument(skip(self))]
547    async fn handle_swarm_events(
548        &mut self,
549        event: SwarmEvent<NetworkEventInternal>,
550        send_to_client: &UnboundedSender<NetworkEvent>,
551    ) -> Result<(), NetworkError> {
552        // Make the match cleaner
553        debug!("Swarm event observed {:?}", event);
554
555        #[allow(deprecated)]
556        match event {
557            SwarmEvent::ConnectionEstablished {
558                connection_id: _,
559                peer_id,
560                endpoint,
561                num_established,
562                concurrent_dial_errors,
563                established_in: _established_in,
564            } => {
565                if num_established > ESTABLISHED_LIMIT {
566                    error!(
567                        "Num concurrent connections to a single peer exceeding \
568                         {ESTABLISHED_LIMIT:?} at {num_established:?}!"
569                    );
570                } else {
571                    debug!(
572                        "Connection established with {peer_id:?} at {endpoint:?} with \
573                         {concurrent_dial_errors:?} concurrent dial errors"
574                    );
575                }
576
577                // Send the number of connected peers to the client
578                send_to_client
579                    .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
580                    .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
581            },
582            SwarmEvent::ConnectionClosed {
583                connection_id: _,
584                peer_id,
585                endpoint,
586                num_established,
587                cause,
588            } => {
589                if num_established > ESTABLISHED_LIMIT_UNWR {
590                    error!(
591                        "Num concurrent connections to a single peer exceeding \
592                         {ESTABLISHED_LIMIT:?} at {num_established:?}!"
593                    );
594                } else {
595                    debug!("Connection closed with {peer_id:?} at {endpoint:?} due to {cause:?}");
596                }
597
598                // If we are no longer connected to the peer, remove the consensus key from the map
599                if num_established == 0 {
600                    self.consensus_key_to_pid_map
601                        .lock()
602                        .remove_by_right(&peer_id);
603                }
604
605                // Send the number of connected peers to the client
606                send_to_client
607                    .send(NetworkEvent::ConnectedPeersUpdate(self.num_connected()))
608                    .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
609            },
610            SwarmEvent::Dialing {
611                peer_id,
612                connection_id: _,
613            } => {
614                debug!("Attempting to dial {peer_id:?}");
615            },
616            SwarmEvent::ListenerClosed {
617                listener_id: _,
618                addresses: _,
619                reason: _,
620            }
621            | SwarmEvent::NewListenAddr {
622                listener_id: _,
623                address: _,
624            }
625            | SwarmEvent::ExpiredListenAddr {
626                listener_id: _,
627                address: _,
628            }
629            | SwarmEvent::NewExternalAddrCandidate { .. }
630            | SwarmEvent::ExternalAddrExpired { .. }
631            | SwarmEvent::IncomingConnection {
632                connection_id: _,
633                local_addr: _,
634                send_back_addr: _,
635            } => {},
636            SwarmEvent::Behaviour(b) => {
637                let maybe_event = match b {
638                    NetworkEventInternal::DHTEvent(e) => self
639                        .dht_handler
640                        .dht_handle_event(e, self.swarm.behaviour_mut().dht.store_mut()),
641                    NetworkEventInternal::IdentifyEvent(e) => {
642                        // NOTE feed identified peers into kademlia's routing table for peer discovery.
643                        if let IdentifyEvent::Received {
644                            peer_id,
645                            info:
646                                IdentifyInfo {
647                                    listen_addrs,
648                                    protocols: _,
649                                    public_key: _,
650                                    protocol_version: _,
651                                    agent_version: _,
652                                    observed_addr: _,
653                                    signed_peer_record: _,
654                                },
655                            connection_id: _,
656                        } = *e
657                        {
658                            let behaviour = self.swarm.behaviour_mut();
659
660                            // into hashset to delete duplicates (I checked: there are duplicates)
661                            for addr in listen_addrs.iter().collect::<HashSet<_>>() {
662                                behaviour.dht.add_address(&peer_id, addr.clone());
663                            }
664                        }
665                        None
666                    },
667                    NetworkEventInternal::GossipEvent(e) => match *e {
668                        GossipEvent::Message {
669                            propagation_source: _peer_id,
670                            message_id: _id,
671                            message,
672                        } => Some(NetworkEvent::GossipMsg(message.data)),
673                        GossipEvent::Subscribed { peer_id, topic } => {
674                            debug!("Peer {peer_id:?} subscribed to topic {topic:?}");
675                            None
676                        },
677                        GossipEvent::Unsubscribed { peer_id, topic } => {
678                            debug!("Peer {peer_id:?} unsubscribed from topic {topic:?}");
679                            None
680                        },
681                        GossipEvent::GossipsubNotSupported { peer_id } => {
682                            LogEvent::GossipsubNotSupported.record();
683                            debug!("Peer {peer_id:?} does not support gossipsub");
684                            None
685                        },
686                        GossipEvent::SlowPeer {
687                            peer_id,
688                            failed_messages: _,
689                        } => {
690                            LogEvent::GossipsubSlowPeer.record();
691                            debug!("Peer {peer_id:?} is slow");
692                            None
693                        },
694                    },
695                    NetworkEventInternal::DMEvent(e) => self
696                        .direct_message_state
697                        .handle_dm_event(e, self.resend_tx.clone()),
698                    NetworkEventInternal::AutonatEvent(e) => {
699                        match e {
700                            autonat::Event::InboundProbe(_) => {},
701                            autonat::Event::OutboundProbe(e) => match e {
702                                autonat::OutboundProbeEvent::Request { .. }
703                                | autonat::OutboundProbeEvent::Response { .. } => {},
704                                autonat::OutboundProbeEvent::Error {
705                                    probe_id: _,
706                                    peer,
707                                    error,
708                                } => {
709                                    debug!("AutoNAT outbound probe to {peer:?} failed: {error:?}");
710                                },
711                            },
712                            autonat::Event::StatusChanged { old, new } => match &new {
713                                autonat::NatStatus::Public(addr) => {
714                                    info!(
715                                        "AutoNAT: this node is publicly reachable at {addr} (was \
716                                         {old:?})"
717                                    );
718                                    self.autonat_private_logged = false;
719                                },
720                                autonat::NatStatus::Private => {
721                                    warn!(
722                                        "AutoNAT: probe reports this node may not be publicly \
723                                         reachable (was {old:?}). Treating as transient until \
724                                         confirmed by repeated probes."
725                                    );
726                                },
727                                autonat::NatStatus::Unknown => {
728                                    debug!("AutoNAT status: {old:?} -> Unknown");
729                                    self.autonat_private_logged = false;
730                                },
731                            },
732                        };
733                        let autonat = &self.swarm.behaviour().autonat;
734                        if matches!(autonat.nat_status(), autonat::NatStatus::Private)
735                            && autonat.confidence() >= AUTONAT_CONFIDENCE_MAX
736                            && !self.autonat_private_logged
737                        {
738                            error!(
739                                "AutoNAT: this node is NOT publicly reachable (confirmed by \
740                                 repeated probes). Peers cannot direct-message us, so leader \
741                                 views may fail and we may accumulate missed slots. Verify \
742                                 --libp2p-advertise-address (env \
743                                 ESPRESSO_NODE_LIBP2P_ADVERTISE_ADDRESS) is set a publicly \
744                                 reachable host:port, and ensure inbound UDP at that port is open \
745                                 from the public internet (firewall/NAT/security group)."
746                            );
747                            self.autonat_private_logged = true;
748                        }
749                        None
750                    },
751                };
752
753                if let Some(event) = maybe_event {
754                    // forward messages directly to Client
755                    send_to_client
756                        .send(event)
757                        .map_err(|err| NetworkError::ChannelSendError(err.to_string()))?;
758                }
759            },
760            SwarmEvent::OutgoingConnectionError {
761                connection_id: _,
762                peer_id,
763                error,
764            } => {
765                LogEvent::DialFailure.record();
766                debug!("Outgoing connection error to {peer_id:?}: {error:?}");
767            },
768            SwarmEvent::IncomingConnectionError {
769                connection_id: _,
770                local_addr: _,
771                send_back_addr: _,
772                error,
773                peer_id: _,
774            } => {
775                LogEvent::IncomingConnError.record();
776                debug!("Incoming connection error: {error:?}");
777            },
778            SwarmEvent::ListenerError {
779                listener_id: _,
780                error,
781            } => {
782                LogEvent::ListenerError.record();
783                debug!("Listener error: {error:?}");
784            },
785            SwarmEvent::ExternalAddrConfirmed { address } => {
786                let my_id = *self.swarm.local_peer_id();
787                self.swarm
788                    .behaviour_mut()
789                    .dht
790                    .add_address(&my_id, address.clone());
791            },
792            SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
793                self.swarm
794                    .behaviour_mut()
795                    .dht
796                    .add_address(&peer_id, address.clone());
797            },
798            _ => {
799                debug!("Unhandled swarm event {event:?}");
800            },
801        }
802        Ok(())
803    }
804
805    /// Spawn a task to listen for requests on the returned channel
806    /// as well as any events produced by libp2p
807    ///
808    /// # Errors
809    /// - If we fail to create the channels or the bootstrap channel
810    pub fn spawn_listeners(
811        mut self,
812    ) -> Result<
813        (
814            UnboundedSender<ClientRequest>,
815            UnboundedReceiver<NetworkEvent>,
816        ),
817        NetworkError,
818    > {
819        let (s_input, mut s_output) = unbounded_channel::<ClientRequest>();
820        let (r_input, r_output) = unbounded_channel::<NetworkEvent>();
821        let (mut bootstrap_tx, bootstrap_rx) = mpsc::channel(100);
822        self.resend_tx = Some(s_input.clone());
823        self.dht_handler.set_bootstrap_sender(bootstrap_tx.clone());
824
825        DHTBootstrapTask::run(bootstrap_rx, s_input.clone());
826        spawn(
827            async move {
828                loop {
829                    select! {
830                        event = self.swarm.next() => {
831                            debug!("peerid {:?}\t\thandling maybe event {:?}", self.peer_id, event);
832                            if let Some(event) = event {
833                                debug!("peerid {:?}\t\thandling event {:?}", self.peer_id, event);
834                                self.handle_swarm_events(event, &r_input).await?;
835                            }
836                        },
837                        msg = s_output.recv() => {
838                            debug!("peerid {:?}\t\thandling msg {:?}", self.peer_id, msg);
839                            let shutdown = self.handle_client_requests(msg).await?;
840                            if shutdown {
841                                let _ = bootstrap_tx.send(InputEvent::ShutdownBootstrap).await;
842                                break
843                            }
844                        }
845                    }
846                }
847                Ok::<(), NetworkError>(())
848            }
849            .instrument(info_span!("Libp2p NetworkBehaviour Handler")),
850        );
851        Ok((s_input, r_output))
852    }
853
854    /// Get a reference to the network node's peer id.
855    pub fn peer_id(&self) -> PeerId {
856        self.peer_id
857    }
858}